You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2021/11/03 21:30:09 UTC

[airflow] 10/17: Bugfix: Check next run exists before reading data interval (#19307)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 06c1cea45bb0f94189b2f3488884d1b566b08fb2
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Nov 2 03:53:37 2021 +0800

    Bugfix: Check next run exists before reading data interval (#19307)
    
    Fix #19304, and also an issue on scheduling a DAG's first-ever run introduced in #18897. We could fix it outside this function, but if `next_dagrun` is None, the next run's data interval is supposed to be None in the first place, so checking inside this function just makes sense.
    
    closes https://github.com/apache/airflow/issues/19343
    closes https://github.com/apache/airflow/issues/19304
    
    (cherry picked from commit dc4dcaa9ccbec6a1b1ce84d5ee42322ce1fbb081)
---
 airflow/models/dag.py    | 13 ++++++++-----
 tests/models/test_dag.py | 37 +++++++++++++++++++++++++++++++++++++
 2 files changed, 45 insertions(+), 5 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 55be0ec..d8fa1d0 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -607,12 +607,12 @@ class DAG(LoggingMixin):
             return None
         return self.timetable._get_prev(timezone.coerce_datetime(dttm))
 
-    def get_next_data_interval(self, dag_model: "DagModel") -> DataInterval:
+    def get_next_data_interval(self, dag_model: "DagModel") -> Optional[DataInterval]:
         """Get the data interval of the next scheduled run.
 
         For compatibility, this method infers the data interval from the DAG's
-        schedule if the run does not have an explicit one set, which is possible for
-        runs created prior to AIP-39.
+        schedule if the run does not have an explicit one set, which is possible
+        for runs created prior to AIP-39.
 
         This function is private to Airflow core and should not be depended as a
         part of the Python API.
@@ -621,11 +621,14 @@ class DAG(LoggingMixin):
         """
         if self.dag_id != dag_model.dag_id:
             raise ValueError(f"Arguments refer to different DAGs: {self.dag_id} != {dag_model.dag_id}")
+        if dag_model.next_dagrun is None:  # Next run not scheduled.
+            return None
         data_interval = dag_model.next_dagrun_data_interval
         if data_interval is not None:
             return data_interval
-        # Compatibility: runs scheduled before AIP-39 implementation don't have an
-        # explicit data interval. Try to infer from the logical date.
+        # Compatibility: A run was scheduled without an explicit data interval.
+        # This means the run was scheduled before AIP-39 implementation. Try to
+        # infer from the logical date.
         return self.infer_automated_data_interval(dag_model.next_dagrun)
 
     def get_run_data_interval(self, run: DagRun) -> DataInterval:
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 5bea2b7..efdff89 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -2234,3 +2234,40 @@ def test_iter_dagrun_infos_between_error(caplog):
         ),
     ]
     assert caplog.records[0].exc_info is not None, "should contain exception context"
+
+
+@pytest.mark.parametrize(
+    "logical_date, data_interval_start, data_interval_end, expected_data_interval",
+    [
+        pytest.param(None, None, None, None, id="no-next-run"),
+        pytest.param(
+            DEFAULT_DATE,
+            DEFAULT_DATE,
+            DEFAULT_DATE + timedelta(days=2),
+            DataInterval(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=2)),
+            id="modern",
+        ),
+        pytest.param(
+            DEFAULT_DATE,
+            None,
+            None,
+            DataInterval(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=1)),
+            id="legacy",
+        ),
+    ],
+)
+def test_get_next_data_interval(
+    logical_date,
+    data_interval_start,
+    data_interval_end,
+    expected_data_interval,
+):
+    dag = DAG(dag_id="test_get_next_data_interval", schedule_interval="@daily")
+    dag_model = DagModel(
+        dag_id="test_get_next_data_interval",
+        next_dagrun=logical_date,
+        next_dagrun_data_interval_start=data_interval_start,
+        next_dagrun_data_interval_end=data_interval_end,
+    )
+
+    assert dag.get_next_data_interval(dag_model) == expected_data_interval