You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/06/29 15:19:48 UTC

[airflow] 03/45: Check bag DAG schedule_interval match tiemtable (#23113)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 90d338fb186c1c543f579c14e770564da86a22dd
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Wed Jun 8 17:58:20 2022 +0800

    Check bag DAG schedule_interval match tiemtable (#23113)
    
    This guards against the DAG's timetable or schedule_interval from being
    changed after it's created. Validation is done by creating a timetable
    and check its summary matches schedule_interval. The logic is not
    bullet-proof, especially if a custom timetable does not provide a useful
    summary. But this is the best we can do.
    
    (cherry picked from commit a1a9a8f9a3adc63e783cf3fd699066f35e488d4f)
---
 airflow/exceptions.py    |  4 ++++
 airflow/models/dag.py    | 43 ++++++++++++++++++++++++++++++++++++++++++-
 airflow/models/dagbag.py | 13 +++++++------
 tests/models/test_dag.py | 26 ++++++++++++++++++++++++++
 4 files changed, 79 insertions(+), 7 deletions(-)

diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index fa7acf61da..bfb5835fda 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -149,6 +149,10 @@ class AirflowDagDuplicatedIdException(AirflowException):
         return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}"
 
 
+class AirflowDagInconsistent(AirflowException):
+    """Raise when a DAG has inconsistent attributes."""
+
+
 class AirflowClusterPolicyViolation(AirflowException):
     """Raise when there is a violation of a Cluster Policy in DAG definition."""
 
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 54f5b06675..823287dcb1 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -63,7 +63,7 @@ import airflow.templates
 from airflow import settings, utils
 from airflow.compat.functools import cached_property
 from airflow.configuration import conf
-from airflow.exceptions import AirflowException, DuplicateTaskIdFound, TaskNotFound
+from airflow.exceptions import AirflowDagInconsistent, AirflowException, DuplicateTaskIdFound, TaskNotFound
 from airflow.models.abstractoperator import AbstractOperator
 from airflow.models.base import ID_LEN, Base
 from airflow.models.dagbag import DagBag
@@ -484,6 +484,47 @@ class DAG(LoggingMixin):
         self._task_group = TaskGroup.create_root(self)
         self.validate_schedule_and_params()
 
+    def _check_schedule_interval_matches_timetable(self) -> bool:
+        """Check ``schedule_interval`` and ``timetable`` match.
+
+        This is done as a part of the DAG validation done before it's bagged, to
+        guard against the DAG's ``timetable`` (or ``schedule_interval``) from
+        being changed after it's created, e.g.
+
+        .. code-block:: python
+
+            dag1 = DAG("d1", timetable=MyTimetable())
+            dag1.schedule_interval = "@once"
+
+            dag2 = DAG("d2", schedule_interval="@once")
+            dag2.timetable = MyTimetable()
+
+        Validation is done by creating a timetable and check its summary matches
+        ``schedule_interval``. The logic is not bullet-proof, especially if a
+        custom timetable does not provide a useful ``summary``. But this is the
+        best we can do.
+        """
+        if self.schedule_interval == self.timetable.summary:
+            return True
+        try:
+            timetable = create_timetable(self.schedule_interval, self.timezone)
+        except ValueError:
+            return False
+        return timetable.summary == self.timetable.summary
+
+    def validate(self):
+        """Validate the DAG has a coherent setup.
+
+        This is called by the DAG bag before bagging the DAG.
+        """
+        if not self._check_schedule_interval_matches_timetable():
+            raise AirflowDagInconsistent(
+                f"inconsistent schedule: timetable {self.timetable.summary!r} "
+                f"does not match schedule_interval {self.schedule_interval!r}",
+            )
+        self.params.validate()
+        self.timetable.validate()
+
     def __repr__(self):
         return f"<DAG: {self.dag_id}>"
 
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 3673ce095e..c0ef0941b6 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -39,6 +39,7 @@ from airflow.exceptions import (
     AirflowClusterPolicyViolation,
     AirflowDagCycleException,
     AirflowDagDuplicatedIdException,
+    AirflowDagInconsistent,
     AirflowTimetableInvalid,
     ParamValidationError,
 )
@@ -402,25 +403,25 @@ class DagBag(LoggingMixin):
         for (dag, mod) in top_level_dags:
             dag.fileloc = mod.__file__
             try:
-                dag.timetable.validate()
-                # validate dag params
-                dag.params.validate()
+                dag.validate()
                 self.bag_dag(dag=dag, root_dag=dag)
-                found_dags.append(dag)
-                found_dags += dag.subdags
             except AirflowTimetableInvalid as exception:
                 self.log.exception("Failed to bag_dag: %s", dag.fileloc)
                 self.import_errors[dag.fileloc] = f"Invalid timetable expression: {exception}"
                 self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
             except (
+                AirflowClusterPolicyViolation,
                 AirflowDagCycleException,
                 AirflowDagDuplicatedIdException,
-                AirflowClusterPolicyViolation,
+                AirflowDagInconsistent,
                 ParamValidationError,
             ) as exception:
                 self.log.exception("Failed to bag_dag: %s", dag.fileloc)
                 self.import_errors[dag.fileloc] = str(exception)
                 self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
+            else:
+                found_dags.append(dag)
+                found_dags += dag.subdags
         return found_dags
 
     def bag_dag(self, dag, root_dag):
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 9e3c46a602..0164ce0f87 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -2243,6 +2243,32 @@ class TestDagDecorator:
         assert dag.params['value'] == value
 
 
+@pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()])
+def test_dag_timetable_match_schedule_interval(timetable):
+    dag = DAG("my-dag", timetable=timetable)
+    assert dag._check_schedule_interval_matches_timetable()
+
+
+@pytest.mark.parametrize("schedule_interval", [None, "@once", "@daily", timedelta(days=1)])
+def test_dag_schedule_interval_match_timetable(schedule_interval):
+    dag = DAG("my-dag", schedule_interval=schedule_interval)
+    assert dag._check_schedule_interval_matches_timetable()
+
+
+@pytest.mark.parametrize("schedule_interval", [None, "@daily", timedelta(days=1)])
+def test_dag_schedule_interval_change_after_init(schedule_interval):
+    dag = DAG("my-dag", timetable=OnceTimetable())
+    dag.schedule_interval = schedule_interval
+    assert not dag._check_schedule_interval_matches_timetable()
+
+
+@pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()])
+def test_dag_timetable_change_after_init(timetable):
+    dag = DAG("my-dag")  # Default is timedelta(days=1).
+    dag.timetable = timetable
+    assert not dag._check_schedule_interval_matches_timetable()
+
+
 @pytest.mark.parametrize("run_id, execution_date", [(None, datetime_tz(2020, 1, 1)), ('test-run-id', None)])
 def test_set_task_instance_state(run_id, execution_date, session, dag_maker):
     """Test that set_task_instance_state updates the TaskInstance state and clear downstream failed"""