You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/01/11 21:25:53 UTC

[airflow] 12/27: Consider previous run in CronTriggerTimetable (#28532)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 348994550623a5fae6b5cb25a3be6ecc033cbad0
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Mon Dec 26 17:30:02 2022 +0800

    Consider previous run in CronTriggerTimetable (#28532)
    
    (cherry picked from commit 6dc28fb0278c1bdb096b75b6e19acbcb1019db02)
---
 airflow/timetables/trigger.py              | 21 ++++++++---------
 tests/timetables/test_trigger_timetable.py | 36 ++++++++++++++++++++++++------
 2 files changed, 40 insertions(+), 17 deletions(-)

diff --git a/airflow/timetables/trigger.py b/airflow/timetables/trigger.py
index 86ab58cc4d..7807542da5 100644
--- a/airflow/timetables/trigger.py
+++ b/airflow/timetables/trigger.py
@@ -82,18 +82,19 @@ class CronTriggerTimetable(CronMixin, Timetable):
         restriction: TimeRestriction,
     ) -> DagRunInfo | None:
         if restriction.catchup:
-            if last_automated_data_interval is None:
-                if restriction.earliest is None:
-                    return None
-                next_start_time = self._align_to_next(restriction.earliest)
-            else:
+            if last_automated_data_interval is not None:
                 next_start_time = self._get_next(last_automated_data_interval.end)
-        else:
-            current_time = DateTime.utcnow()
-            if restriction.earliest is not None and current_time < restriction.earliest:
-                next_start_time = self._align_to_next(restriction.earliest)
+            elif restriction.earliest is None:
+                return None  # Don't know where to catch up from, give up.
             else:
-                next_start_time = self._align_to_next(current_time)
+                next_start_time = self._align_to_next(restriction.earliest)
+        else:
+            start_time_candidates = [self._align_to_next(DateTime.utcnow())]
+            if last_automated_data_interval is not None:
+                start_time_candidates.append(self._get_next(last_automated_data_interval.end))
+            if restriction.earliest is not None:
+                start_time_candidates.append(self._align_to_next(restriction.earliest))
+            next_start_time = max(start_time_candidates)
         if restriction.latest is not None and restriction.latest < next_start_time:
             return None
         return DagRunInfo.interval(next_start_time - self._interval, next_start_time)
diff --git a/tests/timetables/test_trigger_timetable.py b/tests/timetables/test_trigger_timetable.py
index 3e3c97a5e0..cabb1198ef 100644
--- a/tests/timetables/test_trigger_timetable.py
+++ b/tests/timetables/test_trigger_timetable.py
@@ -44,12 +44,29 @@ DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16)
 
 
 @pytest.mark.parametrize(
-    "last_automated_data_interval",
-    [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL_EXACT, id="subsequent")],
+    "last_automated_data_interval, next_start_time",
+    [
+        pytest.param(
+            None,
+            CURRENT_TIME + DELTA_FROM_MIDNIGHT,
+            id="first-run",
+        ),
+        pytest.param(
+            PREV_DATA_INTERVAL_EXACT,
+            CURRENT_TIME + DELTA_FROM_MIDNIGHT,
+            id="before-now",
+        ),
+        pytest.param(
+            DataInterval.exact(CURRENT_TIME + DELTA_FROM_MIDNIGHT),
+            CURRENT_TIME + datetime.timedelta(days=1) + DELTA_FROM_MIDNIGHT,
+            id="after-now",
+        ),
+    ],
 )
 @freezegun.freeze_time(CURRENT_TIME)
 def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule(
     last_automated_data_interval: DataInterval | None,
+    next_start_time: pendulum.DateTime,
 ) -> None:
     """If ``catchup=False`` and start_date is a day before"""
     timetable = CronTriggerTimetable("30 16 * * *", timezone=TIMEZONE)
@@ -57,8 +74,7 @@ def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule(
         last_automated_data_interval=last_automated_data_interval,
         restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False),
     )
-    expected = CURRENT_TIME + DELTA_FROM_MIDNIGHT
-    assert next_info == DagRunInfo.exact(expected)
+    assert next_info == DagRunInfo.exact(next_start_time)
 
 
 @pytest.mark.parametrize(
@@ -124,12 +140,18 @@ def test_hourly_cron_trigger_no_catchup_next_info(
             DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)),
             id="no_last_automated_with_earliest_not_on_boundary",
         ),
+        pytest.param(
+            None,
+            None,
+            None,
+            id="no_last_automated_no_earliest",
+        ),
     ],
 )
 def test_hourly_cron_trigger_catchup_next_info(
-    last_automated_data_interval: DataInterval,
-    earliest: pendulum.DateTime,
-    expected: DagRunInfo,
+    last_automated_data_interval: DataInterval | None,
+    earliest: pendulum.DateTime | None,
+    expected: DagRunInfo | None,
 ) -> None:
     next_info = HOURLY_CRON_TRIGGER_TIMETABLE.next_dagrun_info(
         last_automated_data_interval=last_automated_data_interval,