You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/06/29 15:19:48 UTC
[airflow] 03/45: Check bag DAG schedule_interval match tiemtable (#23113)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 90d338fb186c1c543f579c14e770564da86a22dd
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Wed Jun 8 17:58:20 2022 +0800
Check bag DAG schedule_interval match tiemtable (#23113)
This guards against the DAG's timetable or schedule_interval from being
changed after it's created. Validation is done by creating a timetable
and check its summary matches schedule_interval. The logic is not
bullet-proof, especially if a custom timetable does not provide a useful
summary. But this is the best we can do.
(cherry picked from commit a1a9a8f9a3adc63e783cf3fd699066f35e488d4f)
---
airflow/exceptions.py | 4 ++++
airflow/models/dag.py | 43 ++++++++++++++++++++++++++++++++++++++++++-
airflow/models/dagbag.py | 13 +++++++------
tests/models/test_dag.py | 26 ++++++++++++++++++++++++++
4 files changed, 79 insertions(+), 7 deletions(-)
diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index fa7acf61da..bfb5835fda 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -149,6 +149,10 @@ class AirflowDagDuplicatedIdException(AirflowException):
return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}"
+class AirflowDagInconsistent(AirflowException):
+ """Raise when a DAG has inconsistent attributes."""
+
+
class AirflowClusterPolicyViolation(AirflowException):
"""Raise when there is a violation of a Cluster Policy in DAG definition."""
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 54f5b06675..823287dcb1 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -63,7 +63,7 @@ import airflow.templates
from airflow import settings, utils
from airflow.compat.functools import cached_property
from airflow.configuration import conf
-from airflow.exceptions import AirflowException, DuplicateTaskIdFound, TaskNotFound
+from airflow.exceptions import AirflowDagInconsistent, AirflowException, DuplicateTaskIdFound, TaskNotFound
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.base import ID_LEN, Base
from airflow.models.dagbag import DagBag
@@ -484,6 +484,47 @@ class DAG(LoggingMixin):
self._task_group = TaskGroup.create_root(self)
self.validate_schedule_and_params()
+ def _check_schedule_interval_matches_timetable(self) -> bool:
+ """Check ``schedule_interval`` and ``timetable`` match.
+
+ This is done as a part of the DAG validation done before it's bagged, to
+ guard against the DAG's ``timetable`` (or ``schedule_interval``) from
+ being changed after it's created, e.g.
+
+ .. code-block:: python
+
+ dag1 = DAG("d1", timetable=MyTimetable())
+ dag1.schedule_interval = "@once"
+
+ dag2 = DAG("d2", schedule_interval="@once")
+ dag2.timetable = MyTimetable()
+
+ Validation is done by creating a timetable and check its summary matches
+ ``schedule_interval``. The logic is not bullet-proof, especially if a
+ custom timetable does not provide a useful ``summary``. But this is the
+ best we can do.
+ """
+ if self.schedule_interval == self.timetable.summary:
+ return True
+ try:
+ timetable = create_timetable(self.schedule_interval, self.timezone)
+ except ValueError:
+ return False
+ return timetable.summary == self.timetable.summary
+
+ def validate(self):
+ """Validate the DAG has a coherent setup.
+
+ This is called by the DAG bag before bagging the DAG.
+ """
+ if not self._check_schedule_interval_matches_timetable():
+ raise AirflowDagInconsistent(
+ f"inconsistent schedule: timetable {self.timetable.summary!r} "
+ f"does not match schedule_interval {self.schedule_interval!r}",
+ )
+ self.params.validate()
+ self.timetable.validate()
+
def __repr__(self):
return f"<DAG: {self.dag_id}>"
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 3673ce095e..c0ef0941b6 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -39,6 +39,7 @@ from airflow.exceptions import (
AirflowClusterPolicyViolation,
AirflowDagCycleException,
AirflowDagDuplicatedIdException,
+ AirflowDagInconsistent,
AirflowTimetableInvalid,
ParamValidationError,
)
@@ -402,25 +403,25 @@ class DagBag(LoggingMixin):
for (dag, mod) in top_level_dags:
dag.fileloc = mod.__file__
try:
- dag.timetable.validate()
- # validate dag params
- dag.params.validate()
+ dag.validate()
self.bag_dag(dag=dag, root_dag=dag)
- found_dags.append(dag)
- found_dags += dag.subdags
except AirflowTimetableInvalid as exception:
self.log.exception("Failed to bag_dag: %s", dag.fileloc)
self.import_errors[dag.fileloc] = f"Invalid timetable expression: {exception}"
self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
except (
+ AirflowClusterPolicyViolation,
AirflowDagCycleException,
AirflowDagDuplicatedIdException,
- AirflowClusterPolicyViolation,
+ AirflowDagInconsistent,
ParamValidationError,
) as exception:
self.log.exception("Failed to bag_dag: %s", dag.fileloc)
self.import_errors[dag.fileloc] = str(exception)
self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
+ else:
+ found_dags.append(dag)
+ found_dags += dag.subdags
return found_dags
def bag_dag(self, dag, root_dag):
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 9e3c46a602..0164ce0f87 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -2243,6 +2243,32 @@ class TestDagDecorator:
assert dag.params['value'] == value
+@pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()])
+def test_dag_timetable_match_schedule_interval(timetable):
+ dag = DAG("my-dag", timetable=timetable)
+ assert dag._check_schedule_interval_matches_timetable()
+
+
+@pytest.mark.parametrize("schedule_interval", [None, "@once", "@daily", timedelta(days=1)])
+def test_dag_schedule_interval_match_timetable(schedule_interval):
+ dag = DAG("my-dag", schedule_interval=schedule_interval)
+ assert dag._check_schedule_interval_matches_timetable()
+
+
+@pytest.mark.parametrize("schedule_interval", [None, "@daily", timedelta(days=1)])
+def test_dag_schedule_interval_change_after_init(schedule_interval):
+ dag = DAG("my-dag", timetable=OnceTimetable())
+ dag.schedule_interval = schedule_interval
+ assert not dag._check_schedule_interval_matches_timetable()
+
+
+@pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()])
+def test_dag_timetable_change_after_init(timetable):
+ dag = DAG("my-dag") # Default is timedelta(days=1).
+ dag.timetable = timetable
+ assert not dag._check_schedule_interval_matches_timetable()
+
+
@pytest.mark.parametrize("run_id, execution_date", [(None, datetime_tz(2020, 1, 1)), ('test-run-id', None)])
def test_set_task_instance_state(run_id, execution_date, session, dag_maker):
"""Test that set_task_instance_state updates the TaskInstance state and clear downstream failed"""