You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/01/10 19:58:29 UTC
[airflow] branch main updated: Avoid unintentional data loss when deleting DAGs (#20758)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5980d2b Avoid unintentional data loss when deleting DAGs (#20758)
5980d2b is described below
commit 5980d2b05eee484256c634d5efae9410265c65e9
Author: Sam Wheating <sa...@shopify.com>
AuthorDate: Mon Jan 10 11:55:51 2022 -0800
Avoid unintentional data loss when deleting DAGs (#20758)
---
airflow/api/common/delete_dag.py | 18 +++++++++++++++---
tests/api/common/test_delete_dag.py | 14 ++++++++++++++
2 files changed, 29 insertions(+), 3 deletions(-)
diff --git a/airflow/api/common/delete_dag.py b/airflow/api/common/delete_dag.py
index c448127..5e0afa8 100644
--- a/airflow/api/common/delete_dag.py
+++ b/airflow/api/common/delete_dag.py
@@ -18,7 +18,7 @@
"""Delete DAGs APIs."""
import logging
-from sqlalchemy import or_
+from sqlalchemy import and_, or_
from airflow import models
from airflow.exceptions import AirflowException, DagNotFound
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
if dag is None:
raise DagNotFound(f"Dag id {dag_id} not found")
+ # deleting a DAG should also delete all of its subdags
+ dags_to_delete_query = session.query(DagModel.dag_id).filter(
+ or_(
+ DagModel.dag_id == dag_id,
+ and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
+ )
+ )
+ dags_to_delete = [dag_id for dag_id, in dags_to_delete_query]
+
# Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
# There may be a lag, so explicitly removes serialized DAG here.
if SerializedDagModel.has_dag(dag_id=dag_id, session=session):
@@ -65,8 +74,11 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
if hasattr(model, "dag_id"):
if keep_records_in_log and model.__name__ == 'Log':
continue
- cond = or_(model.dag_id == dag_id, model.dag_id.like(dag_id + ".%"))
- count += session.query(model).filter(cond).delete(synchronize_session='fetch')
+ count += (
+ session.query(model)
+ .filter(model.dag_id.in_(dags_to_delete))
+ .delete(synchronize_session='fetch')
+ )
if dag.is_subdag:
parent_dag_id, task_id = dag_id.rsplit(".", 1)
for model in TaskFail, models.TaskInstance:
diff --git a/tests/api/common/test_delete_dag.py b/tests/api/common/test_delete_dag.py
index 0eb058a..d9dc0b0 100644
--- a/tests/api/common/test_delete_dag.py
+++ b/tests/api/common/test_delete_dag.py
@@ -162,3 +162,17 @@ class TestDeleteDAGSuccessfulDelete:
self.check_dag_models_exists()
delete_dag(dag_id=self.key, keep_records_in_log=False)
self.check_dag_models_removed(expect_logs=0)
+
+ def test_delete_dag_preserves_other_dags(self):
+
+ self.setup_dag_models()
+
+ with create_session() as session:
+ session.add(DM(dag_id=self.key + ".other_dag", fileloc=self.dag_file_path))
+ session.add(DM(dag_id=self.key + ".subdag", fileloc=self.dag_file_path, is_subdag=True))
+
+ delete_dag(self.key)
+
+ with create_session() as session:
+ assert session.query(DM).filter(DM.dag_id == self.key + ".other_dag").count() == 1
+ assert session.query(DM).filter(DM.dag_id.like(self.key + "%")).count() == 1