You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2021/09/15 13:34:03 UTC

[airflow] branch main updated: Fix deleting of zipped Dags in Serialized Dag Table (#18243)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f7c10b  Fix deleting of zipped Dags in Serialized Dag Table (#18243)
9f7c10b is described below

commit 9f7c10bb88b1dd18069ebb671ae7972cffd180d6
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Sep 15 14:33:40 2021 +0100

    Fix deleting of zipped Dags in Serialized Dag Table (#18243)
    
    The file locations of DAGs in zipped folders are not correctly listed when removing deleted dags from
    serialized dag table thus the delete query for deleting deleted dags from serialized DAGs is deleting
    dags in zipped folders. Likewise DagCode.remove_deleted_code
    
    This PR fixes it by listing all the file paths as stored in SDM so that the delete query will work properly
    
    Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
---
 airflow/dag_processing/manager.py    | 24 +++++++++++++++++++++---
 tests/dag_processing/test_manager.py | 30 ++++++++++++++++++++++++++++++
 2 files changed, 51 insertions(+), 3 deletions(-)

diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index b235250..5c39277 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -26,6 +26,7 @@ import random
 import signal
 import sys
 import time
+import zipfile
 from collections import defaultdict
 from datetime import datetime, timedelta
 from importlib import import_module
@@ -45,7 +46,7 @@ from airflow.models.taskinstance import SimpleTaskInstance
 from airflow.stats import Stats
 from airflow.utils import timezone
 from airflow.utils.callback_requests import CallbackRequest, SlaCallbackRequest, TaskCallbackRequest
-from airflow.utils.file import list_py_file_paths
+from airflow.utils.file import list_py_file_paths, might_contain_dag
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.mixins import MultiprocessingStartMethodMixin
 from airflow.utils.net import get_hostname
@@ -661,12 +662,29 @@ class DagFileProcessorManager(LoggingMixin):
             except Exception:
                 self.log.exception("Error removing old import errors")
 
-            SerializedDagModel.remove_deleted_dags(self._file_paths)
+            # Check if file path is a zipfile and get the full path of the python file.
+            # Without this, SerializedDagModel.remove_deleted_files would delete zipped dags.
+            # Likewise DagCode.remove_deleted_code
+            dag_filelocs = []
+            for fileloc in self._file_paths:
+                if zipfile.is_zipfile(fileloc):
+                    with zipfile.ZipFile(fileloc) as z:
+                        dag_filelocs.extend(
+                            [
+                                os.path.join(fileloc, info.filename)
+                                for info in z.infolist()
+                                if might_contain_dag(info.filename, True, z)
+                            ]
+                        )
+                else:
+                    dag_filelocs.append(fileloc)
+
+            SerializedDagModel.remove_deleted_dags(dag_filelocs)
             DagModel.deactivate_deleted_dags(self._file_paths)
 
             from airflow.models.dagcode import DagCode
 
-            DagCode.remove_deleted_code(self._file_paths)
+            DagCode.remove_deleted_code(dag_filelocs)
 
     def _print_stat(self):
         """Occasionally print out stats about how fast the files are getting processed"""
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index 2c62939..3023fe5 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -45,6 +45,7 @@ from airflow.dag_processing.manager import (
 from airflow.dag_processing.processor import DagFileProcessorProcess
 from airflow.jobs.local_task_job import LocalTaskJob as LJ
 from airflow.models import DagBag, DagModel, TaskInstance as TI, errors
+from airflow.models.dagcode import DagCode
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import SimpleTaskInstance
 from airflow.utils import timezone
@@ -54,6 +55,7 @@ from airflow.utils.session import create_session
 from airflow.utils.state import DagRunState, State
 from airflow.utils.types import DagRunType
 from tests.core.test_logging_config import SETTINGS_FILE_VALID, settings_context
+from tests.models import TEST_DAGS_FOLDER
 from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags
 
@@ -110,9 +112,13 @@ class FakeDagFileProcessorRunner(DagFileProcessorProcess):
 class TestDagFileProcessorManager:
     def setup_method(self):
         clear_db_runs()
+        clear_db_serialized_dags()
+        clear_db_dags()
 
     def teardown_class(self):
         clear_db_runs()
+        clear_db_serialized_dags()
+        clear_db_dags()
 
     def run_processor_manager_one_loop(self, manager, parent_pipe):
         if not manager._async_mode:
@@ -747,6 +753,30 @@ class TestDagFileProcessorManager:
 
         statsd_timing_mock.assert_called_with('dag_processing.last_duration.temp_dag', last_runtime)
 
+    def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir):
+        """Test DagFileProcessorManager._refresh_dag_dir method"""
+        manager = DagFileProcessorManager(
+            dag_directory=TEST_DAG_FOLDER,
+            max_runs=1,
+            processor_timeout=timedelta.max,
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+        dagbag = DagBag(dag_folder=tmpdir, include_examples=False)
+        zipped_dag_path = os.path.join(TEST_DAGS_FOLDER, "test_zip.zip")
+        dagbag.process_file(zipped_dag_path)
+        dag = dagbag.get_dag("test_zip_dag")
+        dag.sync_to_db()
+        SerializedDagModel.write_dag(dag)
+        manager.last_dag_dir_refresh_time = timezone.utcnow() - timedelta(minutes=10)
+        manager._refresh_dag_dir()
+        # Assert dag not deleted in SDM
+        assert SerializedDagModel.has_dag('test_zip_dag')
+        # assert code not delted
+        assert DagCode.has_dag(dag.fileloc)
+
 
 class TestDagFileProcessorAgent(unittest.TestCase):
     def setUp(self):