You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/06/22 13:46:15 UTC
[airflow] 25/38: Don't show stale Serialized DAGs if they are
deleted in DB (#16368)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5f478eca8c1b804a630f8f2eb71b37c0e4b27445
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jun 11 21:29:56 2021 +0100
Don't show stale Serialized DAGs if they are deleted in DB (#16368)
If `DagBag.get_dag()` is called currently, it will return the DAG
even if the DAG does not exist in `serialized_dag` table.
This PR changes that logic to remove the dag from local cache too
when `DagBag.get_dag()` is called. This happens after
`min_serialized_dag_fetch_secs`.
(cherry picked from commit e3b3c1fd1cf61b5d1bbe7aef11ddc85b9a7aa171)
---
airflow/models/dagbag.py | 11 ++++++++++-
airflow/models/serialized_dag.py | 4 ++--
tests/models/test_dagbag.py | 31 ++++++++++++++++++++++++++++++-
3 files changed, 42 insertions(+), 4 deletions(-)
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index b78463b..be2701b 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -190,6 +190,8 @@ class DagBag(LoggingMixin):
# 1. if time has come to check if DAG is updated (controlled by min_serialized_dag_fetch_secs)
# 2. check the last_updated column in SerializedDag table to see if Serialized DAG is updated
# 3. if (2) is yes, fetch the Serialized DAG.
+ # 4. if (2) returns None (i.e. Serialized DAG is deleted), remove dag from dagbag
+ # if it exists and return None.
min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
if (
dag_id in self.dags_last_fetched
@@ -199,7 +201,14 @@ class DagBag(LoggingMixin):
dag_id=dag_id,
session=session,
)
- if sd_last_updated_datetime and sd_last_updated_datetime > self.dags_last_fetched[dag_id]:
+ if not sd_last_updated_datetime:
+ self.log.warning("Serialized DAG %s no longer exists", dag_id)
+ del self.dags[dag_id]
+ del self.dags_last_fetched[dag_id]
+ del self.dags_hash[dag_id]
+ return None
+
+ if sd_last_updated_datetime > self.dags_last_fetched[dag_id]:
self._add_dag_from_db(dag_id=dag_id, session=session)
return self.dags.get(dag_id)
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 81448b6..4e8ebc4 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -261,7 +261,7 @@ class SerializedDagModel(Base):
@classmethod
@provide_session
- def get_last_updated_datetime(cls, dag_id: str, session: Session = None) -> datetime:
+ def get_last_updated_datetime(cls, dag_id: str, session: Session = None) -> Optional[datetime]:
"""
Get the date when the Serialized DAG associated to DAG was last updated
in serialized_dag table
@@ -295,7 +295,7 @@ class SerializedDagModel(Base):
:param session: ORM Session
:type session: Session
:return: DAG Hash
- :rtype: str
+ :rtype: str | None
"""
return session.query(cls.dag_hash).filter(cls.dag_id == dag_id).scalar()
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 359cd5c..0c52c49 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -19,7 +19,7 @@ import os
import shutil
import textwrap
import unittest
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
from tempfile import NamedTemporaryFile, mkdtemp
from unittest import mock
from unittest.mock import patch
@@ -33,6 +33,7 @@ from airflow import models
from airflow.exceptions import SerializationError
from airflow.models import DagBag, DagModel
from airflow.models.serialized_dag import SerializedDagModel
+from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils.dates import timezone as tz
from airflow.utils.session import create_session
from airflow.www.security import ApplessAirflowSecurityManager
@@ -311,6 +312,34 @@ class TestDagBag(unittest.TestCase):
assert dag_id == dag.dag_id
assert 2 == dagbag.process_file_calls
+ def test_dag_removed_if_serialized_dag_is_removed(self):
+ """
+ Test that if a DAG does not exist in serialized_dag table (as the DAG file was removed),
+ remove dags from the DagBag
+ """
+ from airflow.operators.dummy import DummyOperator
+
+ dag = models.DAG(
+ dag_id="test_dag_removed_if_serialized_dag_is_removed",
+ schedule_interval=None,
+ start_date=tz.datetime(2021, 10, 12),
+ )
+
+ with dag:
+ DummyOperator(task_id="task_1")
+
+ dagbag = DagBag(dag_folder=self.empty_dir, include_examples=False, read_dags_from_db=True)
+ dagbag.dags = {dag.dag_id: SerializedDAG.from_dict(SerializedDAG.to_dict(dag))}
+ dagbag.dags_last_fetched = {dag.dag_id: (tz.utcnow() - timedelta(minutes=2))}
+ dagbag.dags_hash = {dag.dag_id: mock.ANY}
+
+ assert SerializedDagModel.has_dag(dag.dag_id) is False
+
+ assert dagbag.get_dag(dag.dag_id) is None
+ assert dag.dag_id not in dagbag.dags
+ assert dag.dag_id not in dagbag.dags_last_fetched
+ assert dag.dag_id not in dagbag.dags_hash
+
def process_dag(self, create_dag):
"""
Helper method to process a file generated from the input create_dag function.