You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/10 18:33:05 UTC

[airflow] 10/18: Fix the incorrect scheduling time for the first run of dag (#21011)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9e806aa4930cf819b1e3b3f3f8a5237b915fd53f
Author: wano <55...@users.noreply.github.com>
AuthorDate: Mon Feb 7 02:02:57 2022 +0800

    Fix the incorrect scheduling time for the first run of dag (#21011)
    
    When Catchup_by_default is set to false and start_date in the DAG is the
    previous day, the first schedule time for this DAG may be incorrect
    
    Co-authored-by: wanlce <wh...@foxmail.com>
    (cherry picked from commit 0bcca55f4881bacc3fbe86f69e71981f5552b398)
---
 airflow/timetables/interval.py              |  2 +-
 tests/timetables/test_interval_timetable.py | 21 +++++++++++++++++++++
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py
index d669cb6..01fac3a 100644
--- a/airflow/timetables/interval.py
+++ b/airflow/timetables/interval.py
@@ -218,7 +218,7 @@ class CronDataIntervalTimetable(_DataIntervalTimetable):
             raise AssertionError("next schedule shouldn't be earlier")
         if earliest is None:
             return new_start
-        return max(new_start, earliest)
+        return max(new_start, self._align(earliest))
 
     def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
         # Get the last complete period before run_after, e.g. if a DAG run is
diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py
index 842cc1f2..fe09e0c 100644
--- a/tests/timetables/test_interval_timetable.py
+++ b/tests/timetables/test_interval_timetable.py
@@ -35,11 +35,32 @@ PREV_DATA_INTERVAL_END = START_DATE + datetime.timedelta(days=1)
 PREV_DATA_INTERVAL = DataInterval(start=PREV_DATA_INTERVAL_START, end=PREV_DATA_INTERVAL_END)
 
 CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE)
+YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1)
 
 HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE)
 HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1))
 HOURLY_RELATIVEDELTA_TIMETABLE = DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(hours=1))
 
+CRON_TIMETABLE = CronDataIntervalTimetable("30 16 * * *", TIMEZONE)
+DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16)
+
+
+@pytest.mark.parametrize(
+    "last_automated_data_interval",
+    [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL, id="subsequent")],
+)
+@freezegun.freeze_time(CURRENT_TIME)
+def test_no_catchup_first_starts_at_current_time(
+    last_automated_data_interval: Optional[DataInterval],
+) -> None:
+    """If ``catchup=False`` and start_date is a day before"""
+    next_info = CRON_TIMETABLE.next_dagrun_info(
+        last_automated_data_interval=last_automated_data_interval,
+        restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False),
+    )
+    expected_start = YESTERDAY + DELTA_FROM_MIDNIGHT
+    assert next_info == DagRunInfo.interval(start=expected_start, end=CURRENT_TIME + DELTA_FROM_MIDNIGHT)
+
 
 @pytest.mark.parametrize(
     "timetable",