You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/30 00:14:25 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #16352: AIP-39: ``DagRun.data_interval_start`` and ``data_interval_end``

kaxil commented on a change in pull request #16352:
URL: https://github.com/apache/airflow/pull/16352#discussion_r679537509



##########
File path: airflow/models/dag.py
##########
@@ -596,23 +649,17 @@ def get_run_dates(self, start_date, end_date=None, *, align: bool = True):
         :return: A list of dates within the interval following the dag's schedule.

Review comment:
       We should remove align from docstring
   
   https://github.com/apache/airflow/blob/47908878a96775db49eac49339926c0ccff1e3be/airflow/models/dag.py#L644-L648

##########
File path: airflow/models/dag.py
##########
@@ -579,7 +580,59 @@ def timetable(self) -> Timetable:
         type_name = type(interval).__name__
         raise TypeError(f"{type_name} is not a valid DAG.schedule_interval.")
 
-    def get_run_dates(self, start_date, end_date=None, *, align: bool = True):
+    def iter_dagrun_infos_between(
+        self,
+        earliest: Optional[pendulum.DateTime],
+        latest: pendulum.DateTime,
+        *,
+        align: bool = True,
+    ) -> Iterable[DagRunInfo]:
+        """Yield DagRunInfo using this DAG's timetable between given interval.
+
+        DagRunInfo instances yielded if their ``schedule_date`` is not earlier
+        than ``earliest``, nor later than ``latest``. The instances are ordered
+        by their ``schedule_date`` from earliest to latest.
+
+        If ``align`` is ``False``, the first run will happen immediately on
+        ``ealiest``, even if it does not fall on the logical timetable schedule.

Review comment:
       ```suggestion
           ``earliest``, even if it does not fall on the logical timetable schedule.
   ```

##########
File path: airflow/timetables/base.py
##########
@@ -79,6 +74,18 @@ def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo":
         """
         return cls(run_after=end, data_interval=DataInterval(start, end))
 
+    @property
+    def schedule_date(self) -> DateTime:

Review comment:
       Shouldn't this be `run_date` based on our proposal?
   
   https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval#AIP39Richerscheduler_interval-AbstractTimetableinterface

##########
File path: airflow/models/taskinstance.py
##########
@@ -1670,37 +1667,97 @@ def get(
                 except AirflowNotFoundException:
                     return default_conn
 
+        def deprecated_proxy(func, *, key, replacement=None) -> lazy_object_proxy.Proxy:
+            def deprecated_func():
+                message = (
+                    f"Accessing {key!r} from the template is deprecated and "
+                    f"will be removed in a future version."

Review comment:
       ```suggestion
                       "will be removed in a future version."
   ```

##########
File path: airflow/timetables/interval.py
##########
@@ -43,6 +43,13 @@ def __eq__(self, other: Any) -> bool:
     def validate(self) -> None:
         self._schedule.validate()
 
+    def infer_data_interval(self, run_after: DateTime) -> Optional[DataInterval]:
+        # Get the last complete period before run_after, e.g. if a DAG run is
+        # scheduled at each midnight, the data interval of a manually trioggered

Review comment:
       ```suggestion
           # scheduled at each midnight, the data interval of a manually triggered
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2403,6 +2461,24 @@ def __init__(self, concurrency=None, **kwargs):
     def __repr__(self):
         return f"<DAG: {self.dag_id}>"
 
+    @property
+    def next_data_interval(self) -> Optional[Tuple[datetime, datetime]]:
+        if self.next_dagrun_data_interval_start is None:
+            if self.next_dagrun_data_interval_end is not None:
+                raise AirflowException(
+                    "Inconsistent DagModel: next_dagrun_data_interval_start and "
+                    "next_dagrun_data_interval_end must be either both None or both datetime"
+                )

Review comment:
       Let's show the values that are passed too, should help in debugging

##########
File path: airflow/models/dag.py
##########
@@ -579,7 +580,59 @@ def timetable(self) -> Timetable:
         type_name = type(interval).__name__
         raise TypeError(f"{type_name} is not a valid DAG.schedule_interval.")
 
-    def get_run_dates(self, start_date, end_date=None, *, align: bool = True):
+    def iter_dagrun_infos_between(
+        self,
+        earliest: Optional[pendulum.DateTime],
+        latest: pendulum.DateTime,
+        *,
+        align: bool = True,
+    ) -> Iterable[DagRunInfo]:
+        """Yield DagRunInfo using this DAG's timetable between given interval.
+
+        DagRunInfo instances yielded if their ``schedule_date`` is not earlier
+        than ``earliest``, nor later than ``latest``. The instances are ordered
+        by their ``schedule_date`` from earliest to latest.
+
+        If ``align`` is ``False``, the first run will happen immediately on
+        ``ealiest``, even if it does not fall on the logical timetable schedule.
+        The default is ``True``, but subdags will ignore this value and always
+        behave as if this is set to ``False`` for backward compatibility.
+        """
+        if earliest is None:
+            earliest = self._time_restriction.earliest
+        earliest = timezone.coerce_datetime(earliest)
+        latest = timezone.coerce_datetime(latest)
+
+        restriction = TimeRestriction(earliest, latest, catchup=True)
+
+        # HACK: Sub-DAGs are currently scheduled differently. For example, say
+        # the schedule is @daily and start is 2021-06-03 22:16:00, a top-level
+        # DAG should be first scheduled to run on midnight 2021-06-04, but a
+        # sub-DAG should be first scheduled to run RIGHT NOW. We can change
+        # this, but since sub-DAGs are going away in 3.0 anyway, let's keep
+        # compatibility for now and remove this entirely later.
+        if self.is_subdag:
+            align = False
+
+        info = self.timetable.next_dagrun_info(None, restriction)
+        if info is None:
+            # No runs to be scheduled between the user-supplied timeframe. But
+            # if align=False, "invent" a data interval for the timeframe itself.
+            if not align:
+                yield DagRunInfo.interval(earliest, latest)
+            return
+
+        # If align=False and earliest is not a logical schedule date, "invent"
+        # a data interval betwwen it and the first schedule date.

Review comment:
       ```suggestion
           # a data interval between it and the first schedule date.
   ```

##########
File path: airflow/jobs/backfill_job.py
##########
@@ -755,8 +759,13 @@ def _execute(self, session=None):
 
         start_date = self.bf_start_date
 
-        # Get intervals between the start/end dates, which will turn into dag runs
-        run_dates = self.dag.get_run_dates(start_date=start_date, end_date=self.bf_end_date, align=True)
+        # Get DagRun schedule between the start/end dates, which will turn into dag runs.
+        dagrun_start_date = timezone.coerce_datetime(start_date)
+        if self.bf_end_date is None:
+            dagrun_end_date = pendulum.now(timezone.utc)
+        else:
+            dagrun_end_date = pendulum.instance(self.bf_end_date)

Review comment:
       What do you think about handling this inside `iter_dagrun_infos_between` (similar to what we had earlier in `get_run_dates`) vs not

##########
File path: airflow/models/dag.py
##########
@@ -579,7 +580,59 @@ def timetable(self) -> Timetable:
         type_name = type(interval).__name__
         raise TypeError(f"{type_name} is not a valid DAG.schedule_interval.")
 
-    def get_run_dates(self, start_date, end_date=None, *, align: bool = True):
+    def iter_dagrun_infos_between(
+        self,
+        earliest: Optional[pendulum.DateTime],
+        latest: pendulum.DateTime,
+        *,
+        align: bool = True,
+    ) -> Iterable[DagRunInfo]:
+        """Yield DagRunInfo using this DAG's timetable between given interval.
+
+        DagRunInfo instances yielded if their ``schedule_date`` is not earlier
+        than ``earliest``, nor later than ``latest``. The instances are ordered
+        by their ``schedule_date`` from earliest to latest.
+
+        If ``align`` is ``False``, the first run will happen immediately on
+        ``ealiest``, even if it does not fall on the logical timetable schedule.

Review comment:
       Also, an example here with the actual dates might make it easier to understand

##########
File path: airflow/models/taskinstance.py
##########
@@ -1530,21 +1532,25 @@ def get_template_context(self, session=None) -> Context:
         integrate_macros_plugins()
 
         params = {}  # type: Dict[str, Any]
-        run_id = ''
-        dag_run = None
-        if hasattr(task, 'dag'):
-            if task.dag.params:
-                params.update(task.dag.params)
-            from airflow.models.dagrun import DagRun  # Avoid circular import
-
-            dag_run = (
-                session.query(DagRun)
-                .filter_by(dag_id=task.dag.dag_id, execution_date=self.execution_date)
-                .first()
+        with contextlib.suppress(AttributeError):
+            params.update(task.dag.params)
+
+        dag_run = self.get_dagrun()

Review comment:
       ```suggestion
           dag_run = self.get_dagrun(session=session)
   ```

##########
File path: airflow/migrations/versions/142555e44c17_add_data_interval_start_end_to_dagrun.py
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add data_interval_[start|end] to DagRun.
+
+Revision ID: 142555e44c17
+Revises: e9304a3141f0
+Create Date: 2021-06-09 08:28:02.089817
+
+"""
+
+from alembic import op
+from sqlalchemy import Column
+from sqlalchemy.dialects import mssql
+
+from airflow.utils.sqlalchemy import UtcDateTime
+
+# Revision identifiers, used by Alembic.
+revision = "142555e44c17"
+down_revision = "97cdd93827b8"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply data_interval fields to DagModel and DagRun."""
+    if op.get_bind().dialect.name == "mssql":
+        column_type = mssql.DATETIME2(precision=6)
+    else:
+        column_type = UtcDateTime

Review comment:
       https://github.com/apache/airflow/blob/c384f9b0f509bab704a70380465be18754800a52/airflow/migrations/versions/98271e7606e2_add_scheduling_decision_to_dagrun_and_.py#L48-L55
   
   Do we need something like the above to take care of MySQL too for precision to avoid issues like https://github.com/apache/airflow/pull/9336 . Worth testing this with MySQL

##########
File path: airflow/models/taskinstance.py
##########
@@ -1530,21 +1532,25 @@ def get_template_context(self, session=None) -> Context:
         integrate_macros_plugins()
 
         params = {}  # type: Dict[str, Any]
-        run_id = ''
-        dag_run = None
-        if hasattr(task, 'dag'):
-            if task.dag.params:
-                params.update(task.dag.params)
-            from airflow.models.dagrun import DagRun  # Avoid circular import
-
-            dag_run = (
-                session.query(DagRun)
-                .filter_by(dag_id=task.dag.dag_id, execution_date=self.execution_date)
-                .first()
+        with contextlib.suppress(AttributeError):
+            params.update(task.dag.params)
+
+        dag_run = self.get_dagrun()
+
+        # FIXME: Many tests don't create a DagRun. We should fix the tests.
+        if dag_run is None:
+            FakeDagRun = namedtuple(
+                "FakeDagRun",
+                # A minimal set of attributes to keep things working.
+                "conf data_interval_start data_interval_end external_trigger run_id",
+            )
+            dag_run = FakeDagRun(
+                conf=None,
+                data_interval_start=None,
+                data_interval_end=None,
+                external_trigger=False,
+                run_id="",

Review comment:
       ```suggestion
                   run_id=None,
   ```

##########
File path: airflow/models/taskinstance.py
##########
@@ -1670,37 +1667,97 @@ def get(
                 except AirflowNotFoundException:
                     return default_conn
 
+        def deprecated_proxy(func, *, key, replacement=None) -> lazy_object_proxy.Proxy:
+            def deprecated_func():
+                message = (
+                    f"Accessing {key!r} from the template is deprecated and "
+                    f"will be removed in a future version."
+                )
+                if replacement:
+                    message += f" Please use {replacement!r} instead."
+                warnings.warn(message, DeprecationWarning)
+                return func()
+
+            return lazy_object_proxy.Proxy(deprecated_func)
+
+        def get_previous_schedule_date() -> Optional[pendulum.DateTime]:
+            ti = self.get_previous_ti(None)

Review comment:
       ```suggestion
               ti = self.get_previous_ti(session=session)
   ```

##########
File path: airflow/timetables/interval.py
##########
@@ -43,6 +43,13 @@ def __eq__(self, other: Any) -> bool:
     def validate(self) -> None:
         self._schedule.validate()
 
+    def infer_data_interval(self, run_after: DateTime) -> Optional[DataInterval]:
+        # Get the last complete period before run_after, e.g. if a DAG run is
+        # scheduled at each midnight, the data interval of a manually trioggered
+        # run at 1am 25th is between 0am 24th and 0am 25th.

Review comment:
       Options:
   
   (1) Shouldn't the `data_interval_start_date` be 24th 1 am and `data_interval_end_date` be 25th 1am so `data_interval_end_date` = `schedule_data` / `run_date`. Why the offset of 1 hour? 

##########
File path: tests/core/test_core.py
##########
@@ -279,23 +281,16 @@ def test_task_get_template(self):
         assert context['ds'] == '2015-01-01'
         assert context['ds_nodash'] == '20150101'
 
-        # next_ds is 2015-01-02 as the dag interval is daily
+        # next_ds is 2015-01-02 as the dag schedule is daily.
         assert context['next_ds'] == '2015-01-02'
         assert context['next_ds_nodash'] == '20150102'
 
-        # prev_ds is 2014-12-31 as the dag interval is daily
-        assert context['prev_ds'] == '2014-12-31'
-        assert context['prev_ds_nodash'] == '20141231'
-
         assert context['ts'] == '2015-01-01T00:00:00+00:00'
         assert context['ts_nodash'] == '20150101T000000'
         assert context['ts_nodash_with_tz'] == '20150101T000000+0000'
 
-        assert context['yesterday_ds'] == '2014-12-31'
-        assert context['yesterday_ds_nodash'] == '20141231'
-
-        assert context['tomorrow_ds'] == '2015-01-02'
-        assert context['tomorrow_ds_nodash'] == '20150102'
+        assert context['data_interval_start'].isoformat() == '2015-01-01T00:00:00+00:00'
+        assert context['data_interval_end'].isoformat() == '2015-01-02T00:00:00+00:00'

Review comment:
       Let's add test for deprecated configs too

##########
File path: tests/models/test_dag.py
##########
@@ -2015,3 +2016,50 @@ def get_task_instance(session, task):
         assert dagrun.get_state() == State.QUEUED
 
     assert {t.key for t in altered} == {('test_set_task_instance_state', 'task_1', start_date, 1)}
+
+
+@pytest.mark.parametrize(
+    "start_date, expected_infos",
+    [
+        (
+            DEFAULT_DATE,
+            [DagRunInfo.interval(DEFAULT_DATE, DEFAULT_DATE + datetime.timedelta(hours=1))],
+        ),
+        (
+            DEFAULT_DATE - datetime.timedelta(hours=3),
+            [
+                DagRunInfo.interval(
+                    DEFAULT_DATE - datetime.timedelta(hours=3),
+                    DEFAULT_DATE - datetime.timedelta(hours=2),
+                ),
+                DagRunInfo.interval(
+                    DEFAULT_DATE - datetime.timedelta(hours=2),
+                    DEFAULT_DATE - datetime.timedelta(hours=1),
+                ),
+                DagRunInfo.interval(
+                    DEFAULT_DATE - datetime.timedelta(hours=1),
+                    DEFAULT_DATE,
+                ),
+                DagRunInfo.interval(
+                    DEFAULT_DATE,
+                    DEFAULT_DATE + datetime.timedelta(hours=1),
+                ),
+            ],
+        ),
+    ],
+    ids=["in-dag-restriction", "out-of-dag-restriction"],
+)
+def test_iter_dagrun_infos_between(start_date, expected_infos):
+    dag = DAG(dag_id='test_get_dates', start_date=DEFAULT_DATE, schedule_interval="@hourly")
+    DummyOperator(
+        task_id='dummy',
+        dag=dag,
+        owner='airflow',
+    )

Review comment:
       ```suggestion
       DummyOperator(task_id='dummy', dag=dag)
   ```
   
   for simplicity -- we don't need owner field

##########
File path: airflow/models/taskinstance.py
##########
@@ -1670,37 +1667,97 @@ def get(
                 except AirflowNotFoundException:
                     return default_conn
 
+        def deprecated_proxy(func, *, key, replacement=None) -> lazy_object_proxy.Proxy:
+            def deprecated_func():
+                message = (
+                    f"Accessing {key!r} from the template is deprecated and "
+                    f"will be removed in a future version."
+                )
+                if replacement:
+                    message += f" Please use {replacement!r} instead."
+                warnings.warn(message, DeprecationWarning)
+                return func()
+
+            return lazy_object_proxy.Proxy(deprecated_func)
+
+        def get_previous_schedule_date() -> Optional[pendulum.DateTime]:
+            ti = self.get_previous_ti(None)
+            if ti is None:
+                return None
+            return timezone.coerce_datetime(ti.execution_date)
+
+        def get_prev_execution_date():
+            if dag_run.external_trigger:
+                dt = self.execution_date
+            else:
+                dt = task.dag.previous_schedule(self.execution_date)
+            return timezone.coerce_datetime(dt)

Review comment:
       Should we suppress deprecation warnings from the following otherwise User will see the deprecation warning but will have no control or can change nothing to change it as the context dict is passed for each operator?
   
   https://github.com/apache/airflow/blob/47908878a96775db49eac49339926c0ccff1e3be/airflow/models/taskinstance.py#L812-L819




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org