You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/01/11 21:25:53 UTC
[airflow] 12/27: Consider previous run in CronTriggerTimetable (#28532)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 348994550623a5fae6b5cb25a3be6ecc033cbad0
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Mon Dec 26 17:30:02 2022 +0800
Consider previous run in CronTriggerTimetable (#28532)
(cherry picked from commit 6dc28fb0278c1bdb096b75b6e19acbcb1019db02)
---
airflow/timetables/trigger.py | 21 ++++++++---------
tests/timetables/test_trigger_timetable.py | 36 ++++++++++++++++++++++++------
2 files changed, 40 insertions(+), 17 deletions(-)
diff --git a/airflow/timetables/trigger.py b/airflow/timetables/trigger.py
index 86ab58cc4d..7807542da5 100644
--- a/airflow/timetables/trigger.py
+++ b/airflow/timetables/trigger.py
@@ -82,18 +82,19 @@ class CronTriggerTimetable(CronMixin, Timetable):
restriction: TimeRestriction,
) -> DagRunInfo | None:
if restriction.catchup:
- if last_automated_data_interval is None:
- if restriction.earliest is None:
- return None
- next_start_time = self._align_to_next(restriction.earliest)
- else:
+ if last_automated_data_interval is not None:
next_start_time = self._get_next(last_automated_data_interval.end)
- else:
- current_time = DateTime.utcnow()
- if restriction.earliest is not None and current_time < restriction.earliest:
- next_start_time = self._align_to_next(restriction.earliest)
+ elif restriction.earliest is None:
+ return None # Don't know where to catch up from, give up.
else:
- next_start_time = self._align_to_next(current_time)
+ next_start_time = self._align_to_next(restriction.earliest)
+ else:
+ start_time_candidates = [self._align_to_next(DateTime.utcnow())]
+ if last_automated_data_interval is not None:
+ start_time_candidates.append(self._get_next(last_automated_data_interval.end))
+ if restriction.earliest is not None:
+ start_time_candidates.append(self._align_to_next(restriction.earliest))
+ next_start_time = max(start_time_candidates)
if restriction.latest is not None and restriction.latest < next_start_time:
return None
return DagRunInfo.interval(next_start_time - self._interval, next_start_time)
diff --git a/tests/timetables/test_trigger_timetable.py b/tests/timetables/test_trigger_timetable.py
index 3e3c97a5e0..cabb1198ef 100644
--- a/tests/timetables/test_trigger_timetable.py
+++ b/tests/timetables/test_trigger_timetable.py
@@ -44,12 +44,29 @@ DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16)
@pytest.mark.parametrize(
- "last_automated_data_interval",
- [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL_EXACT, id="subsequent")],
+ "last_automated_data_interval, next_start_time",
+ [
+ pytest.param(
+ None,
+ CURRENT_TIME + DELTA_FROM_MIDNIGHT,
+ id="first-run",
+ ),
+ pytest.param(
+ PREV_DATA_INTERVAL_EXACT,
+ CURRENT_TIME + DELTA_FROM_MIDNIGHT,
+ id="before-now",
+ ),
+ pytest.param(
+ DataInterval.exact(CURRENT_TIME + DELTA_FROM_MIDNIGHT),
+ CURRENT_TIME + datetime.timedelta(days=1) + DELTA_FROM_MIDNIGHT,
+ id="after-now",
+ ),
+ ],
)
@freezegun.freeze_time(CURRENT_TIME)
def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule(
last_automated_data_interval: DataInterval | None,
+ next_start_time: pendulum.DateTime,
) -> None:
"""If ``catchup=False`` and start_date is a day before"""
timetable = CronTriggerTimetable("30 16 * * *", timezone=TIMEZONE)
@@ -57,8 +74,7 @@ def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule(
last_automated_data_interval=last_automated_data_interval,
restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False),
)
- expected = CURRENT_TIME + DELTA_FROM_MIDNIGHT
- assert next_info == DagRunInfo.exact(expected)
+ assert next_info == DagRunInfo.exact(next_start_time)
@pytest.mark.parametrize(
@@ -124,12 +140,18 @@ def test_hourly_cron_trigger_no_catchup_next_info(
DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)),
id="no_last_automated_with_earliest_not_on_boundary",
),
+ pytest.param(
+ None,
+ None,
+ None,
+ id="no_last_automated_no_earliest",
+ ),
],
)
def test_hourly_cron_trigger_catchup_next_info(
- last_automated_data_interval: DataInterval,
- earliest: pendulum.DateTime,
- expected: DagRunInfo,
+ last_automated_data_interval: DataInterval | None,
+ earliest: pendulum.DateTime | None,
+ expected: DagRunInfo | None,
) -> None:
next_info = HOURLY_CRON_TRIGGER_TIMETABLE.next_dagrun_info(
last_automated_data_interval=last_automated_data_interval,