You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/08 00:16:22 UTC
[airflow] 01/07: Fix the incorrect scheduling time for the first run of dag (#21011)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a04ead208da620040b718151c0800bb2ff59bbc4
Author: wano <55...@users.noreply.github.com>
AuthorDate: Mon Feb 7 02:02:57 2022 +0800
Fix the incorrect scheduling time for the first run of dag (#21011)
When Catchup_by_default is set to false and start_date in the DAG is the
previous day, the first schedule time for this DAG may be incorrect
Co-authored-by: wanlce <wh...@foxmail.com>
(cherry picked from commit 0bcca55f4881bacc3fbe86f69e71981f5552b398)
---
airflow/timetables/interval.py | 2 +-
tests/timetables/test_interval_timetable.py | 21 +++++++++++++++++++++
2 files changed, 22 insertions(+), 1 deletion(-)
diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py
index d669cb6..01fac3a 100644
--- a/airflow/timetables/interval.py
+++ b/airflow/timetables/interval.py
@@ -218,7 +218,7 @@ class CronDataIntervalTimetable(_DataIntervalTimetable):
raise AssertionError("next schedule shouldn't be earlier")
if earliest is None:
return new_start
- return max(new_start, earliest)
+ return max(new_start, self._align(earliest))
def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
# Get the last complete period before run_after, e.g. if a DAG run is
diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py
index 842cc1f2..fe09e0c 100644
--- a/tests/timetables/test_interval_timetable.py
+++ b/tests/timetables/test_interval_timetable.py
@@ -35,11 +35,32 @@ PREV_DATA_INTERVAL_END = START_DATE + datetime.timedelta(days=1)
PREV_DATA_INTERVAL = DataInterval(start=PREV_DATA_INTERVAL_START, end=PREV_DATA_INTERVAL_END)
CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE)
+YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1)
HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE)
HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1))
HOURLY_RELATIVEDELTA_TIMETABLE = DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(hours=1))
+CRON_TIMETABLE = CronDataIntervalTimetable("30 16 * * *", TIMEZONE)
+DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16)
+
+
+@pytest.mark.parametrize(
+ "last_automated_data_interval",
+ [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL, id="subsequent")],
+)
+@freezegun.freeze_time(CURRENT_TIME)
+def test_no_catchup_first_starts_at_current_time(
+ last_automated_data_interval: Optional[DataInterval],
+) -> None:
+ """If ``catchup=False`` and start_date is a day before"""
+ next_info = CRON_TIMETABLE.next_dagrun_info(
+ last_automated_data_interval=last_automated_data_interval,
+ restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False),
+ )
+ expected_start = YESTERDAY + DELTA_FROM_MIDNIGHT
+ assert next_info == DagRunInfo.interval(start=expected_start, end=CURRENT_TIME + DELTA_FROM_MIDNIGHT)
+
@pytest.mark.parametrize(
"timetable",