You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/06/22 13:46:15 UTC

[airflow] 25/38: Don't show stale Serialized DAGs if they are deleted in DB (#16368)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5f478eca8c1b804a630f8f2eb71b37c0e4b27445
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jun 11 21:29:56 2021 +0100

    Don't show stale Serialized DAGs if they are deleted in DB (#16368)
    
    If `DagBag.get_dag()` is called currently, it will return the DAG
    even if the DAG does not exist in `serialized_dag` table.
    
    This PR changes that logic to remove the dag from local cache too
    when `DagBag.get_dag()` is called. This happens after
    `min_serialized_dag_fetch_secs`.
    
    (cherry picked from commit e3b3c1fd1cf61b5d1bbe7aef11ddc85b9a7aa171)
---
 airflow/models/dagbag.py         | 11 ++++++++++-
 airflow/models/serialized_dag.py |  4 ++--
 tests/models/test_dagbag.py      | 31 ++++++++++++++++++++++++++++++-
 3 files changed, 42 insertions(+), 4 deletions(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index b78463b..be2701b 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -190,6 +190,8 @@ class DagBag(LoggingMixin):
             # 1. if time has come to check if DAG is updated (controlled by min_serialized_dag_fetch_secs)
             # 2. check the last_updated column in SerializedDag table to see if Serialized DAG is updated
             # 3. if (2) is yes, fetch the Serialized DAG.
+            # 4. if (2) returns None (i.e. Serialized DAG is deleted), remove dag from dagbag
+            # if it exists and return None.
             min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
             if (
                 dag_id in self.dags_last_fetched
@@ -199,7 +201,14 @@ class DagBag(LoggingMixin):
                     dag_id=dag_id,
                     session=session,
                 )
-                if sd_last_updated_datetime and sd_last_updated_datetime > self.dags_last_fetched[dag_id]:
+                if not sd_last_updated_datetime:
+                    self.log.warning("Serialized DAG %s no longer exists", dag_id)
+                    del self.dags[dag_id]
+                    del self.dags_last_fetched[dag_id]
+                    del self.dags_hash[dag_id]
+                    return None
+
+                if sd_last_updated_datetime > self.dags_last_fetched[dag_id]:
                     self._add_dag_from_db(dag_id=dag_id, session=session)
 
             return self.dags.get(dag_id)
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 81448b6..4e8ebc4 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -261,7 +261,7 @@ class SerializedDagModel(Base):
 
     @classmethod
     @provide_session
-    def get_last_updated_datetime(cls, dag_id: str, session: Session = None) -> datetime:
+    def get_last_updated_datetime(cls, dag_id: str, session: Session = None) -> Optional[datetime]:
         """
         Get the date when the Serialized DAG associated to DAG was last updated
         in serialized_dag table
@@ -295,7 +295,7 @@ class SerializedDagModel(Base):
         :param session: ORM Session
         :type session: Session
         :return: DAG Hash
-        :rtype: str
+        :rtype: str | None
         """
         return session.query(cls.dag_hash).filter(cls.dag_id == dag_id).scalar()
 
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 359cd5c..0c52c49 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -19,7 +19,7 @@ import os
 import shutil
 import textwrap
 import unittest
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
 from tempfile import NamedTemporaryFile, mkdtemp
 from unittest import mock
 from unittest.mock import patch
@@ -33,6 +33,7 @@ from airflow import models
 from airflow.exceptions import SerializationError
 from airflow.models import DagBag, DagModel
 from airflow.models.serialized_dag import SerializedDagModel
+from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.utils.dates import timezone as tz
 from airflow.utils.session import create_session
 from airflow.www.security import ApplessAirflowSecurityManager
@@ -311,6 +312,34 @@ class TestDagBag(unittest.TestCase):
         assert dag_id == dag.dag_id
         assert 2 == dagbag.process_file_calls
 
+    def test_dag_removed_if_serialized_dag_is_removed(self):
+        """
+        Test that if a DAG does not exist in serialized_dag table (as the DAG file was removed),
+        remove dags from the DagBag
+        """
+        from airflow.operators.dummy import DummyOperator
+
+        dag = models.DAG(
+            dag_id="test_dag_removed_if_serialized_dag_is_removed",
+            schedule_interval=None,
+            start_date=tz.datetime(2021, 10, 12),
+        )
+
+        with dag:
+            DummyOperator(task_id="task_1")
+
+        dagbag = DagBag(dag_folder=self.empty_dir, include_examples=False, read_dags_from_db=True)
+        dagbag.dags = {dag.dag_id: SerializedDAG.from_dict(SerializedDAG.to_dict(dag))}
+        dagbag.dags_last_fetched = {dag.dag_id: (tz.utcnow() - timedelta(minutes=2))}
+        dagbag.dags_hash = {dag.dag_id: mock.ANY}
+
+        assert SerializedDagModel.has_dag(dag.dag_id) is False
+
+        assert dagbag.get_dag(dag.dag_id) is None
+        assert dag.dag_id not in dagbag.dags
+        assert dag.dag_id not in dagbag.dags_last_fetched
+        assert dag.dag_id not in dagbag.dags_hash
+
     def process_dag(self, create_dag):
         """
         Helper method to process a file generated from the input create_dag function.