You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/02/13 19:26:02 UTC
[airflow] branch main updated: Ensure Serialized DAG is deleted (#29407)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 02a2efeae4 Ensure Serialized DAG is deleted (#29407)
02a2efeae4 is described below
commit 02a2efeae409bddcfedafe273fffc353595815cc
Author: Michael Petro <40...@users.noreply.github.com>
AuthorDate: Mon Feb 13 14:25:49 2023 -0500
Ensure Serialized DAG is deleted (#29407)
* dag processing manager, dag serialization delete, only filter on dag folder when dag processing is in a standalone processor
* Remove redundant change
* remove deleted dags from serialized dags, fix None check
* serialized_dag tests, add processor_subdir to remove_deleted_dags test
---
airflow/models/serialized_dag.py | 2 +-
tests/models/test_serialized_dag.py | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/airflow/models/serialized_dag.py b/airflow/models/serialized_dag.py
index 3e1ec0c70c..53e5e2ccbd 100644
--- a/airflow/models/serialized_dag.py
+++ b/airflow/models/serialized_dag.py
@@ -251,7 +251,7 @@ class SerializedDagModel(Base):
cls.fileloc_hash.notin_(alive_fileloc_hashes),
cls.fileloc.notin_(alive_dag_filelocs),
or_(
- cls.processor_subdir is None,
+ cls.processor_subdir.is_(None),
cls.processor_subdir == processor_subdir,
),
)
diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py
index d4e7d4e923..b425cd8f65 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -169,7 +169,7 @@ class TestSerializedDagModel:
# remove repeated files for those DAGs that define multiple dags in the same file (set comprehension)
example_dag_files = list({dag.fileloc for dag in filtered_example_dags_list})
example_dag_files.remove(dag_removed_by_file.fileloc)
- SDM.remove_deleted_dags(example_dag_files)
+ SDM.remove_deleted_dags(example_dag_files, processor_subdir="/tmp/test")
assert not SDM.has_dag(dag_removed_by_file.dag_id)
def test_bulk_sync_to_db(self):