You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/01/28 21:25:27 UTC

[airflow] 14/17: Avoid unintentional data loss when deleting DAGs (#20758)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0740c08da7ef5b8d98bbd09f473724816ab1e61c
Author: Sam Wheating <sa...@shopify.com>
AuthorDate: Mon Jan 10 11:55:51 2022 -0800

    Avoid unintentional data loss when deleting DAGs (#20758)
    
    (cherry picked from commit 5980d2b05eee484256c634d5efae9410265c65e9)
---
 airflow/api/common/delete_dag.py    | 18 +++++++++++++++---
 tests/api/common/test_delete_dag.py | 14 ++++++++++++++
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py
index c448127..5e0afa8 100644
--- a/airflow/api/common/delete_dag.py
+++ b/airflow/api/common/delete_dag.py
@@ -18,7 +18,7 @@
 """Delete DAGs APIs."""
 import logging
 
-from sqlalchemy import or_
+from sqlalchemy import and_, or_
 
 from airflow import models
 from airflow.exceptions import AirflowException, DagNotFound
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also delete all of its subdags
+    dags_to_delete_query = session.query(DagModel.dag_id).filter(
+        or_(
+            DagModel.dag_id == dag_id,
+            and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
+        )
+    )
+    dags_to_delete = [dag_id for dag_id, in dags_to_delete_query]
+
     # Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
     # There may be a lag, so explicitly removes serialized DAG here.
     if SerializedDagModel.has_dag(dag_id=dag_id, session=session):
@@ -65,8 +74,11 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
         if hasattr(model, "dag_id"):
             if keep_records_in_log and model.__name__ == 'Log':
                 continue
-            cond = or_(model.dag_id == dag_id, model.dag_id.like(dag_id + ".%"))
-            count += session.query(model).filter(cond).delete(synchronize_session='fetch')
+            count += (
+                session.query(model)
+                .filter(model.dag_id.in_(dags_to_delete))
+                .delete(synchronize_session='fetch')
+            )
     if dag.is_subdag:
         parent_dag_id, task_id = dag_id.rsplit(".", 1)
         for model in TaskFail, models.TaskInstance:
diff --git a/tests/api/common/test_delete_dag.py b/tests/api/common/test_delete_dag.py
index 0eb058a..d9dc0b0 100644
--- a/tests/api/common/test_delete_dag.py
+++ b/tests/api/common/test_delete_dag.py
@@ -162,3 +162,17 @@ class TestDeleteDAGSuccessfulDelete:
         self.check_dag_models_exists()
         delete_dag(dag_id=self.key, keep_records_in_log=False)
         self.check_dag_models_removed(expect_logs=0)
+
+    def test_delete_dag_preserves_other_dags(self):
+
+        self.setup_dag_models()
+
+        with create_session() as session:
+            session.add(DM(dag_id=self.key + ".other_dag", fileloc=self.dag_file_path))
+            session.add(DM(dag_id=self.key + ".subdag", fileloc=self.dag_file_path, is_subdag=True))
+
+        delete_dag(self.key)
+
+        with create_session() as session:
+            assert session.query(DM).filter(DM.dag_id == self.key + ".other_dag").count() == 1
+            assert session.query(DM).filter(DM.dag_id.like(self.key + "%")).count() == 1