You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2022/08/02 23:25:12 UTC

[airflow] branch main updated: Implement CronTriggerTimetable (#23662)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d0048414a6 Implement CronTriggerTimetable (#23662)
d0048414a6 is described below

commit d0048414a6d3bdc282cc738af0185a9a1cd63ef8
Author: Mai Nakagawa <na...@gmail.com>
AuthorDate: Wed Aug 3 08:25:04 2022 +0900

    Implement CronTriggerTimetable (#23662)
    
    Relates #15432
    
    I also have gone through the above discussion and the related documents introduced there:
    - [AIP-39 Richer scheduler_interval](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval)
    - [Scoping out a new feature for 2.1: improving schedule_interval](https://lists.apache.org/thread.html/rb4e004e68574e5fb77ee5b51f4fd5bfb4b3392d884c178bc767681bf%40%3Cdev.airflow.apache.org%3E)
    - [[DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs](https://lists.apache.org/thread.html/rf8eeb06493681f48dc9e82ce605a9c3a930cfee0b4ca19462a4e55b3%40%3Cdev.airflow.apache.org%3E)
    
    The default behavior of [`CronDataIntervalTimetable`](https://github.com/apache/airflow/blob/2.2.5/airflow/timetables/interval.py#L116) is a little bit confusing - a DAG Run is triggered immediately after the DAG is enabled (unpaused), unlike normal cron's behavior.
    
    Hence I'd like to add another cron timetable which does not care about "data interval" and which starts a DAG Run at the start of the period.
    
    I know even without this PR, each Airflow user can [customize DAG scheduling with Timetables](https://airflow.apache.org/docs/apache-airflow/stable/howto/timetable.html#). However, I assume this PR worth adding to the main repository so that the users can use the timetable easier.
---
 airflow/timetables/interval.py              |  92 +++++++++++++++++------
 docs/apache-airflow/concepts/timetable.rst  |  86 ++++++++++++++++++++--
 tests/timetables/test_interval_timetable.py | 109 +++++++++++++++++++++++++++-
 3 files changed, 257 insertions(+), 30 deletions(-)

diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py
index 5ed9cd21d2..4e16ca8b40 100644
--- a/airflow/timetables/interval.py
+++ b/airflow/timetables/interval.py
@@ -41,14 +41,6 @@ class _DataIntervalTimetable(Timetable):
     instance), and schedule a DagRun at the end of each interval.
     """
 
-    def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime:
-        """Bound the earliest time a run can be scheduled.
-
-        This is called when ``catchup=False``. See docstring of subclasses for
-        exact skipping behaviour of a schedule.
-        """
-        raise NotImplementedError()
-
     def _align(self, current: DateTime) -> DateTime:
         """Align given time to the scheduled.
 
@@ -66,6 +58,14 @@ class _DataIntervalTimetable(Timetable):
         """Get the last schedule before the current time."""
         raise NotImplementedError()
 
+    def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime:
+        """Bound the earliest time a run can be scheduled.
+
+        This is called when ``catchup=False``. See docstring of subclasses for
+        exact skipping behaviour of a schedule.
+        """
+        raise NotImplementedError()
+
     def next_dagrun_info(
         self,
         *,
@@ -114,19 +114,7 @@ def _is_schedule_fixed(expression: str) -> bool:
     return next_b.minute == next_a.minute and next_b.hour == next_a.hour
 
 
-class CronDataIntervalTimetable(_DataIntervalTimetable):
-    """Timetable that schedules data intervals with a cron expression.
-
-    This corresponds to ``schedule_interval=<cron>``, where ``<cron>`` is either
-    a five/six-segment representation, or one of ``cron_presets``.
-
-    The implementation extends on croniter to add timezone awareness. This is
-    because croniter works only with naive timestamps, and cannot consider DST
-    when determining the next/previous time.
-
-    Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
-    """
-
+class _CronMixin:
     def __init__(self, cron: str, timezone: Union[str, Timezone]) -> None:
         self._expression = cron_presets.get(cron, cron)
 
@@ -151,14 +139,16 @@ class CronDataIntervalTimetable(_DataIntervalTimetable):
     def deserialize(cls, data: Dict[str, Any]) -> "Timetable":
         from airflow.serialization.serialized_objects import decode_timezone
 
-        return cls(data["expression"], decode_timezone(data["timezone"]))
+        # We ignore typing on the next line because mypy expects it to return _CronMixin type.
+        # However, this should return Timetable since it should only be called against a timetable subclass
+        return cls(data["expression"], decode_timezone(data["timezone"]))  # type: ignore
 
     def __eq__(self, other: Any) -> bool:
         """Both expression and timezone should match.
 
         This is only for testing purposes and should not be relied on otherwise.
         """
-        if not isinstance(other, CronDataIntervalTimetable):
+        if not isinstance(other, type(self)):
             return NotImplemented
         return self._expression == other._expression and self._timezone == other._timezone
 
@@ -214,6 +204,20 @@ class CronDataIntervalTimetable(_DataIntervalTimetable):
             return next_time
         return current
 
+
+class CronDataIntervalTimetable(_CronMixin, _DataIntervalTimetable):
+    """Timetable that schedules data intervals with a cron expression.
+
+    This corresponds to ``schedule_interval=<cron>``, where ``<cron>`` is either
+    a five/six-segment representation, or one of ``cron_presets``.
+
+    The implementation extends on croniter to add timezone awareness. This is
+    because croniter works only with naive timestamps, and cannot consider DST
+    when determining the next/previous time.
+
+    Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
+    """
+
     def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime:
         """Bound the earliest time a run can be scheduled.
 
@@ -245,6 +249,48 @@ class CronDataIntervalTimetable(_DataIntervalTimetable):
         return DataInterval(start=self._get_prev(end), end=end)
 
 
+class CronTriggerTimetable(_CronMixin, Timetable):
+    """A cron-compliant timetable.
+
+    The main difference from ``CronDataIntervalTimetable`` is that a first
+    DAG Run is kicked off at the start of the period like a normal cron,
+    while a first DAG Run of ``CronDataIntervalTimetable`` starts immediately
+    after the DAG is registered.
+
+    Note that this timetable does not care the idea of *data interval*. It
+    means the value of ``data_interval_start``, ``data_interval_end`` and
+    legacy ``execution_date`` are the same - the time when a DAG run is triggered.
+
+    Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
+    """
+
+    def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
+        return DataInterval.exact(run_after)
+
+    def next_dagrun_info(
+        self,
+        *,
+        last_automated_data_interval: Optional[DataInterval],
+        restriction: TimeRestriction,
+    ) -> Optional[DagRunInfo]:
+        if restriction.catchup:
+            if last_automated_data_interval is None:
+                if restriction.earliest is None:
+                    return None
+                next_start_time = self._align(restriction.earliest)
+            else:
+                next_start_time = self._get_next(last_automated_data_interval.end)
+        else:
+            current_time = DateTime.utcnow()
+            if restriction.earliest is not None and current_time < restriction.earliest:
+                next_start_time = self._align(restriction.earliest)
+            else:
+                next_start_time = self._align(current_time)
+        if restriction.latest is not None and restriction.latest < next_start_time:
+            return None
+        return DagRunInfo.exact(next_start_time)
+
+
 class DeltaDataIntervalTimetable(_DataIntervalTimetable):
     """Timetable that schedules data intervals with a time delta.
 
diff --git a/docs/apache-airflow/concepts/timetable.rst b/docs/apache-airflow/concepts/timetable.rst
index 986213df81..263afd3912 100644
--- a/docs/apache-airflow/concepts/timetable.rst
+++ b/docs/apache-airflow/concepts/timetable.rst
@@ -26,7 +26,7 @@ directly. The timetable also dictates the data interval and the logical time of
 run created for the DAG.
 
 Cron expressions and timedeltas are still supported (using
-``CronDataIntervalTimetable`` and ``DeltaDataIntervalTimetable`` under the hood
+`CronDataIntervalTimetable`_ and `DeltaDataIntervalTimetable`_ under the hood
 respectively), however, there are situations where they cannot properly express
 the schedule. Some examples are:
 
@@ -55,18 +55,28 @@ Built In Timetables
 Airflow comes with several common timetables built in to cover the most common use cases. Additional timetables
 may be available in plugins.
 
-CronDataIntervalTimetable
-^^^^^^^^^^^^^^^^^^^^^^^^^
+.. _CronTriggerTimetable:
 
-Set schedule based on a cron expression. Can be selected by providing a string that is a valid
-cron expression to the ``schedule_interval`` parameter of a DAG as described in the :doc:`/concepts/dags` documentation.
+CronTriggerTimetable
+^^^^^^^^^^^^^^^^^^^^
+
+A timetable which accepts a cron expression.
+
+Note `CronDataIntervalTimetable`_ also accepts a cron expression. See `Differences between the two cron timetables`_.
 
 .. code-block:: python
 
-    @dag(schedule_interval="0 1 * * 3")  # At 01:00 on Wednesday.
+    from airflow.timetables.interval import CronTriggerTimetable
+
+
+    @dag(
+        timetable=CronTriggerTimetable(cron='0 1 * * 3', timezone='UTC'),  # At 01:00 on Wednesday
+    )
     def example_dag():
         pass
 
+.. _DeltaDataIntervalTimetable:
+
 DeltaDataIntervalTimetable
 ^^^^^^^^^^^^^^^^^^^^^^^^^^
 
@@ -79,6 +89,23 @@ Schedules data intervals with a time delta. Can be selected by providing a
     def example_dag():
         pass
 
+.. _CronDataIntervalTimetable:
+
+CronDataIntervalTimetable
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Another timetable which accepts a cron expression. Can be selected by providing a string that is a valid
+cron expression to the ``schedule_interval`` parameter of a DAG as described in the :doc:`/concepts/dags` documentation.
+
+
+Note `CronTriggerTimetable`_ also accepts a cron expression. See `Differences between the two cron timetables`_.
+
+.. code-block:: python
+
+    @dag(schedule_interval="0 1 * * 3")  # At 01:00 on Wednesday.
+    def example_dag():
+        pass
+
 EventsTimetable
 ^^^^^^^^^^^^^^^
 
@@ -109,3 +136,50 @@ first) event for the data interval, otherwise manual runs will run with a ``data
     )
     def example_dag():
         pass
+
+.. _Differences between the two cron timetables:
+
+Differences between the two cron timetables
+-------------------------------------------
+
+There are two timetables `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ that accepts a cron expression.
+There are some differences between the two:
+- `CronTriggerTimetable`_ does not take care of *Data Interval*, while `CronDataIntervalTimetable`_ does.
+- The time when a DAG run is triggered by `CronTriggerTimetable`_ is more intuitive and more similar to what people
+expect cron to behave than that of `CronDataIntervalTimetable`_ (when ``catchup`` is ``False``).
+
+Whether taking care of *Data Interval*
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+`CronTriggerTimetable`_ *does not* care the idea of *data interval*. It means the value of ``data_interval_start``,
+``data_interval_end`` and legacy ``execution_date`` are the same - the time when a DAG run is triggered.
+
+On the other hand, `CronDataIntervalTimetable`_ *does* care the idea of *data interval*. It means the value of
+``data_interval_start`` and ``data_interval_end`` (and legacy ``execution_date``) are different. They are the start
+and end of the interval respectively.
+
+The time when a DAG run is triggered
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+There is no difference between the two when ``catchup`` is ``True``. :ref:`dag-catchup` tells you how DAG runs are
+triggered when ``catchup`` is ``True``.
+
+When ``catchup`` is ``False``, there is difference in how a new DAG run is triggered. `CronTriggerTimetable`_ triggers
+a new DAG run *after* the current time, while `CronDataIntervalTimetable`_ does *before* the current time (assuming
+the value of ``start_date`` is past time).
+
+Here is an example showing how the first DAG run is triggered. Supposes there is a cron expression ``@daily`` or
+``0 0 * * *``, which is aimed to run at 12AM every day. If you enable DAGs using the two timetables at 3PM on January
+31st, `CronTriggerTimetable`_ will trigger a new DAG run at 12AM on February 1st. `CronDataIntervalTimetable`_, on the other
+hand, will immediately trigger a new DAG run which is supposed to trigger at 12AM on January 31st if the DAG had been
+enabled beforehand.
+
+This is another example showing the difference in the case of skipping DAG runs. Suppose there are two running DAGs
+using the two timetables with a cron expression ``@daily`` or ``0 0 * * *``. If you pause the DAGs at 3PM on January
+31st and re-enable them at 3PM on February 2nd, `CronTriggerTimetable`_ skips the DAG runs which are supposed to
+trigger on February 1st and 2nd. The next DAG run will be triggered at 12AM on February 3rd. `CronDataIntervalTimetable`_,
+on the other hand, skips the DAG runs which are supposed to trigger on February 1st only. A DAG run for February 2nd
+is immediately triggered after you re-enable the DAG.
+
+By these examples, you see how `CronTriggerTimetable`_ triggers DAG runs is more intuitive and more similar to what
+people expect cron to behave than how `CronDataIntervalTimetable`_ does.
diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py
index 53e5ec352d..4d6335e866 100644
--- a/tests/timetables/test_interval_timetable.py
+++ b/tests/timetables/test_interval_timetable.py
@@ -26,22 +26,29 @@ import pytest
 from airflow.exceptions import AirflowTimetableInvalid
 from airflow.settings import TIMEZONE
 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
-from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable
+from airflow.timetables.interval import (
+    CronDataIntervalTimetable,
+    CronTriggerTimetable,
+    DeltaDataIntervalTimetable,
+)
 
 START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE)
 
 PREV_DATA_INTERVAL_START = START_DATE
 PREV_DATA_INTERVAL_END = START_DATE + datetime.timedelta(days=1)
 PREV_DATA_INTERVAL = DataInterval(start=PREV_DATA_INTERVAL_START, end=PREV_DATA_INTERVAL_END)
+PREV_DATA_INTERVAL_EXACT = DataInterval.exact(PREV_DATA_INTERVAL_END)
 
 CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE)
 YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1)
 
 HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE)
+HOURLY_CRON_TRIGGER_TIMETABLE = CronTriggerTimetable("@hourly", TIMEZONE)
 HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1))
 HOURLY_RELATIVEDELTA_TIMETABLE = DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(hours=1))
 
 CRON_TIMETABLE = CronDataIntervalTimetable("30 16 * * *", TIMEZONE)
+CRON_TRIGGER_TIMETABLE = CronTriggerTimetable("30 16 * * *", TIMEZONE)
 DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16)
 
 
@@ -62,6 +69,23 @@ def test_no_catchup_first_starts_at_current_time(
     assert next_info == DagRunInfo.interval(start=expected_start, end=CURRENT_TIME + DELTA_FROM_MIDNIGHT)
 
 
+@pytest.mark.parametrize(
+    "last_automated_data_interval",
+    [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL_EXACT, id="subsequent")],
+)
+@freezegun.freeze_time(CURRENT_TIME)
+def test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule(
+    last_automated_data_interval: Optional[DataInterval],
+) -> None:
+    """If ``catchup=False`` and start_date is a day before"""
+    next_info = CRON_TRIGGER_TIMETABLE.next_dagrun_info(
+        last_automated_data_interval=last_automated_data_interval,
+        restriction=TimeRestriction(earliest=YESTERDAY, latest=None, catchup=False),
+    )
+    expected = CURRENT_TIME + DELTA_FROM_MIDNIGHT
+    assert next_info == DagRunInfo.exact(expected)
+
+
 @pytest.mark.parametrize(
     "timetable",
     [
@@ -106,10 +130,88 @@ def test_catchup_next_info_starts_at_previous_interval_end(timetable: Timetable)
     assert next_info == DagRunInfo.interval(start=PREV_DATA_INTERVAL_END, end=expected_end)
 
 
+@pytest.mark.parametrize(
+    "current_time, earliest, expected",
+    [
+        pytest.param(
+            pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE),
+            START_DATE,
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE)),
+            id="current_time_on_boundary",
+        ),
+        pytest.param(
+            pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=TIMEZONE),
+            START_DATE,
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)),
+            id="current_time_not_on_boundary",
+        ),
+        pytest.param(
+            pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=TIMEZONE),
+            pendulum.DateTime(2199, 12, 31, 22, 30, 0, tzinfo=TIMEZONE),
+            DagRunInfo.exact(pendulum.DateTime(2199, 12, 31, 23, 0, 0, tzinfo=TIMEZONE)),
+            id="future_start_date",
+        ),
+    ],
+)
+def test_hourly_cron_trigger_no_catchup_next_info(
+    current_time: pendulum.DateTime,
+    earliest: pendulum.DateTime,
+    expected: DagRunInfo,
+) -> None:
+    with freezegun.freeze_time(current_time):
+        next_info = HOURLY_CRON_TRIGGER_TIMETABLE.next_dagrun_info(
+            last_automated_data_interval=PREV_DATA_INTERVAL_EXACT,
+            restriction=TimeRestriction(earliest=earliest, latest=None, catchup=False),
+        )
+    assert next_info == expected
+
+
+@pytest.mark.parametrize(
+    "last_automated_data_interval, earliest, expected",
+    [
+        pytest.param(
+            DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE)),
+            START_DATE,
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)),
+            id="last_automated_on_boundary",
+        ),
+        pytest.param(
+            DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=TIMEZONE)),
+            START_DATE,
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)),
+            id="last_automated_not_on_boundary",
+        ),
+        pytest.param(
+            None,
+            pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE),
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=TIMEZONE)),
+            id="no_last_automated_with_earliest_on_boundary",
+        ),
+        pytest.param(
+            None,
+            pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=TIMEZONE),
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=TIMEZONE)),
+            id="no_last_automated_with_earliest_not_on_boundary",
+        ),
+    ],
+)
+def test_hourly_cron_trigger_catchup_next_info(
+    last_automated_data_interval: DataInterval,
+    earliest: pendulum.DateTime,
+    expected: DagRunInfo,
+) -> None:
+    next_info = HOURLY_CRON_TRIGGER_TIMETABLE.next_dagrun_info(
+        last_automated_data_interval=last_automated_data_interval,
+        restriction=TimeRestriction(earliest=earliest, latest=None, catchup=True),
+    )
+    assert next_info == expected
+
+
 @pytest.mark.parametrize(
     "timetable",
     [
         pytest.param(HOURLY_CRON_TIMETABLE, id="cron"),
+        pytest.param(HOURLY_CRON_TRIGGER_TIMETABLE, id="cron_trigger"),
         pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"),
         pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"),
     ],
@@ -126,6 +228,11 @@ def test_validate_success(timetable: Timetable) -> None:
             "[0 0 1 13 0] is not acceptable, out of range",
             id="invalid-cron",
         ),
+        pytest.param(
+            CronTriggerTimetable("0 0 1 13 0", TIMEZONE),
+            "[0 0 1 13 0] is not acceptable, out of range",
+            id="invalid-cron-trigger",
+        ),
         pytest.param(
             DeltaDataIntervalTimetable(datetime.timedelta()),
             "schedule interval must be positive, not datetime.timedelta(0)",