You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/15 16:40:35 UTC

[airflow] branch master updated: Stop creating duplicate Dag File Processors (#13662)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 32f5953  Stop creating duplicate Dag File Processors (#13662)
32f5953 is described below

commit 32f59534cbdb8188e4c8f49d7dfbb4b915eaeb4d
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Fri Jan 15 16:40:20 2021 +0000

    Stop creating duplicate Dag File Processors (#13662)
    
    When a dag file is executed via Dag File Processors and multiple callbacks are
    created either via zombies or executor events, the dag file is added to
    the _file_path_queue and the manager will launch a new process to
    process it, which it should not since the dag file is currently under
    processing. This will bypass the _parallelism eventually especially when
    it takes a long time to process some dag files and since self._processors
    is just a dict with file path as the key. So multiple processors with the same key
    count as one and hence parallelism is bypassed.
    
    This address the same issue as https://github.com/apache/airflow/pull/11875
    but instead does not exclude file paths that are recently processed and that
    run at the limit (which is only used in tests) when Callbacks are sent by the
    Agent. This is by design as the execution of Callbacks is critical. This is done
    with a caveat to avoid duplicate processor -- i.e. if a processor exists,
    the file path is removed from the queue. This means that the processor with
    the file path to run callback will be still run when the file path is added again in the
    next loop
    
    Tests are added to check the same.
    
    closes https://github.com/apache/airflow/issues/13047
    closes https://github.com/apache/airflow/pull/11875
---
 airflow/utils/dag_processing.py    | 11 ++++++++++-
 tests/utils/test_dag_processing.py | 39 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index e66803a..7e98c11 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -714,7 +714,12 @@ class DagFileProcessorManager(LoggingMixin):  # pylint: disable=too-many-instanc
         self._callback_to_execute[request.full_filepath].append(request)
         # Callback has a higher priority over DAG Run scheduling
         if request.full_filepath in self._file_path_queue:
-            self._file_path_queue.remove(request.full_filepath)
+            # Remove file paths matching request.full_filepath from self._file_path_queue
+            # Since we are already going to use that filepath to run callback,
+            # there is no need to have same file path again in the queue
+            self._file_path_queue = [
+                file_path for file_path in self._file_path_queue if file_path != request.full_filepath
+            ]
         self._file_path_queue.insert(0, request.full_filepath)
 
     def _refresh_dag_dir(self):
@@ -988,6 +993,10 @@ class DagFileProcessorManager(LoggingMixin):  # pylint: disable=too-many-instanc
         """Start more processors if we have enough slots and files to process"""
         while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
             file_path = self._file_path_queue.pop(0)
+            # Stop creating duplicate processor i.e. processor with the same filepath
+            if file_path in self._processors.keys():
+                continue
+
             callback_to_execute_for_file = self._callback_to_execute[file_path]
             processor = self._processor_factory(
                 file_path, callback_to_execute_for_file, self._dag_ids, self._pickle_dags
diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py
index ffcf00e..ad8ef5a 100644
--- a/tests/utils/test_dag_processing.py
+++ b/tests/utils/test_dag_processing.py
@@ -142,6 +142,45 @@ class TestDagFileProcessorManager(unittest.TestCase):
         child_pipe.close()
         parent_pipe.close()
 
+    @pytest.mark.backend("mysql", "postgres")
+    def test_start_new_processes_with_same_filepath(self):
+        """
+        Test that when a processor already exist with a filepath, a new processor won't be created
+        with that filepath. The filepath will just be removed from the list.
+        """
+        processor_factory_mock = MagicMock()
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            max_runs=1,
+            processor_factory=processor_factory_mock,
+            processor_timeout=timedelta.max,
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        file_1 = 'file_1.py'
+        file_2 = 'file_2.py'
+        file_3 = 'file_3.py'
+        manager._file_path_queue = [file_1, file_2, file_3]
+
+        # Mock that only one processor exists. This processor runs with 'file_1'
+        manager._processors[file_1] = MagicMock()
+        # Start New Processes
+        manager.start_new_processes()
+
+        # Because of the config: '[scheduler] parsing_processes = 2'
+        # verify that only one extra process is created
+        # and since a processor with 'file_1' already exists,
+        # even though it is first in '_file_path_queue'
+        # a new processor is created with 'file_2' and not 'file_1'.
+        processor_factory_mock.assert_called_once_with('file_2.py', [], [], False)
+
+        assert file_1 in manager._processors.keys()
+        assert file_2 in manager._processors.keys()
+        assert [file_3] == manager._file_path_queue
+
     def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
         manager = DagFileProcessorManager(
             dag_directory='directory',