You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/11/02 17:44:44 UTC
[airflow] branch v2-2-test updated: Bugfix: Check next run exists
before reading data interval (#19307)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-2-test by this push:
new f3d06b1 Bugfix: Check next run exists before reading data interval (#19307)
f3d06b1 is described below
commit f3d06b122129c44a995b845853d847248947bff5
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Tue Nov 2 03:53:37 2021 +0800
Bugfix: Check next run exists before reading data interval (#19307)
Fix #19304, and also an issue on scheduling a DAG's first-ever run introduced in #18897. We could fix it outside this function, but if `next_dagrun` is None, the next run's data interval is supposed to be None in the first place, so checking inside this function just makes sense.
closes https://github.com/apache/airflow/issues/19343
closes https://github.com/apache/airflow/issues/19304
(cherry picked from commit dc4dcaa9ccbec6a1b1ce84d5ee42322ce1fbb081)
---
airflow/models/dag.py | 13 ++++++++-----
tests/models/test_dag.py | 37 +++++++++++++++++++++++++++++++++++++
2 files changed, 45 insertions(+), 5 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 55be0ec..d8fa1d0 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -607,12 +607,12 @@ class DAG(LoggingMixin):
return None
return self.timetable._get_prev(timezone.coerce_datetime(dttm))
- def get_next_data_interval(self, dag_model: "DagModel") -> DataInterval:
+ def get_next_data_interval(self, dag_model: "DagModel") -> Optional[DataInterval]:
"""Get the data interval of the next scheduled run.
For compatibility, this method infers the data interval from the DAG's
- schedule if the run does not have an explicit one set, which is possible for
- runs created prior to AIP-39.
+ schedule if the run does not have an explicit one set, which is possible
+ for runs created prior to AIP-39.
This function is private to Airflow core and should not be depended as a
part of the Python API.
@@ -621,11 +621,14 @@ class DAG(LoggingMixin):
"""
if self.dag_id != dag_model.dag_id:
raise ValueError(f"Arguments refer to different DAGs: {self.dag_id} != {dag_model.dag_id}")
+ if dag_model.next_dagrun is None: # Next run not scheduled.
+ return None
data_interval = dag_model.next_dagrun_data_interval
if data_interval is not None:
return data_interval
- # Compatibility: runs scheduled before AIP-39 implementation don't have an
- # explicit data interval. Try to infer from the logical date.
+ # Compatibility: A run was scheduled without an explicit data interval.
+ # This means the run was scheduled before AIP-39 implementation. Try to
+ # infer from the logical date.
return self.infer_automated_data_interval(dag_model.next_dagrun)
def get_run_data_interval(self, run: DagRun) -> DataInterval:
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 5bea2b7..efdff89 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -2234,3 +2234,40 @@ def test_iter_dagrun_infos_between_error(caplog):
),
]
assert caplog.records[0].exc_info is not None, "should contain exception context"
+
+
+@pytest.mark.parametrize(
+ "logical_date, data_interval_start, data_interval_end, expected_data_interval",
+ [
+ pytest.param(None, None, None, None, id="no-next-run"),
+ pytest.param(
+ DEFAULT_DATE,
+ DEFAULT_DATE,
+ DEFAULT_DATE + timedelta(days=2),
+ DataInterval(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=2)),
+ id="modern",
+ ),
+ pytest.param(
+ DEFAULT_DATE,
+ None,
+ None,
+ DataInterval(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=1)),
+ id="legacy",
+ ),
+ ],
+)
+def test_get_next_data_interval(
+ logical_date,
+ data_interval_start,
+ data_interval_end,
+ expected_data_interval,
+):
+ dag = DAG(dag_id="test_get_next_data_interval", schedule_interval="@daily")
+ dag_model = DagModel(
+ dag_id="test_get_next_data_interval",
+ next_dagrun=logical_date,
+ next_dagrun_data_interval_start=data_interval_start,
+ next_dagrun_data_interval_end=data_interval_end,
+ )
+
+ assert dag.get_next_data_interval(dag_model) == expected_data_interval