You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/14 03:10:34 UTC

[GitHub] [airflow] kaxil opened a new pull request #13662: Stop creating duplicate Dag File Processors

kaxil opened a new pull request #13662:
URL: https://github.com/apache/airflow/pull/13662


   When a dag file is executed via Dag File Processors and multiple callbacks are
   created either via zombies or executor events, the dag file is added to
   the _file_path_queue and the manager will launch a new process to
   process it, which it should not since the dag file is currently under
   processing. This will bypass the _parallelism eventually especially when
   it takes long time to process some dag files.
   
   This address the same issue as https://github.com/apache/airflow/pull/11875
   but instead does not exlucde filepaths that are recently processed and that
   run at limit (which is only used in tests) when Callbacks are sent by the
   Agent. This is by design as execution of Callbacks is critical. This is done
   with a caveat to avoid duplicate processor -- i.e. if a processor exists,
   instead of removing the file path from the queue it is removed from the
   beginning of the queue to the end. This means that the processor with
   the filepath to run callback is still run before other filepaths are added.
   
   Tests are added to check the same.
   
   closes https://github.com/apache/airflow/issues/13047
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #13662: Stop creating duplicate Dag File Processors

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#discussion_r557486424



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -142,6 +142,72 @@ def test_max_runs_when_no_files(self):
         child_pipe.close()
         parent_pipe.close()
 
+    @pytest.mark.backend("mysql", "postgres")
+    def test_start_new_processes_with_same_filepath(self):
+        """
+        Test that when a processor already exist with a filepath, a new processor won't be created
+        with that filepath. The filepath will just be removed from the list.
+        """
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            max_runs=1,
+            processor_factory=MagicMock().return_value,
+            processor_timeout=timedelta.max,
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        file_1 = 'file_1.py'
+        file_2 = 'file_2.py'
+        file_3 = 'file_3.py'
+        manager._file_path_queue = [file_1, file_2, file_3]
+
+        manager._processors[file_1] = MagicMock()
+        manager.start_new_processes()
+
+        assert file_1 in manager._processors.keys()
+        assert file_2 in manager._processors.keys()
+        assert [file_3] == manager._file_path_queue

Review comment:
       Check processor_factory is only called once




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13662: Stop creating duplicate Dag File Processors

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#issuecomment-760482623


   [The Workflow run](https://github.com/apache/airflow/actions/runs/486235244) is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #13662: Stop creating duplicate Dag File Processors

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#discussion_r557436671



##########
File path: airflow/utils/dag_processing.py
##########
@@ -988,6 +993,15 @@ def start_new_processes(self):
         """Start more processors if we have enough slots and files to process"""
         while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
             file_path = self._file_path_queue.pop(0)
+            # Stop creating duplicate processor i.e. processor with the same filepath
+            if file_path in self._processors.keys():
+                # If filepath is no longer in the queue and if callback exists to execute
+                # we add the filepath to the end of queue so it still runs before
+                # new filepaths are added to the queue as we should runs callbacks asap
+                if file_path not in self._file_path_queue and self._callback_to_execute[file_path]:
+                    self._file_path_queue.append(file_path)

Review comment:
       I am in two minds about this. We can remove these lines -- and let it get executed when the file is added again to file_path_queue




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13662: Stop creating duplicate Dag File Processors

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#issuecomment-761024998


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #13662: Stop creating duplicate Dag File Processors

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#discussion_r557436671



##########
File path: airflow/utils/dag_processing.py
##########
@@ -988,6 +993,15 @@ def start_new_processes(self):
         """Start more processors if we have enough slots and files to process"""
         while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
             file_path = self._file_path_queue.pop(0)
+            # Stop creating duplicate processor i.e. processor with the same filepath
+            if file_path in self._processors.keys():
+                # If filepath is no longer in the queue and if callback exists to execute
+                # we add the filepath to the end of queue so it still runs before
+                # new filepaths are added to the queue as we should runs callbacks asap
+                if file_path not in self._file_path_queue and self._callback_to_execute[file_path]:
+                    self._file_path_queue.append(file_path)

Review comment:
       I am in two minds about this. We can remove these lines -- and let it get executed when the file is added again to file_path_queue.
   
   (The case is tested / explained in `test_start_new_processes_with_same_filepath_with_callback` test below)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #13662: Stop creating duplicate Dag File Processors

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #13662:
URL: https://github.com/apache/airflow/pull/13662


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #13662: Stop creating duplicate Dag File Processors

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#discussion_r557648539



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -142,6 +142,72 @@ def test_max_runs_when_no_files(self):
         child_pipe.close()
         parent_pipe.close()
 
+    @pytest.mark.backend("mysql", "postgres")
+    def test_start_new_processes_with_same_filepath(self):
+        """
+        Test that when a processor already exist with a filepath, a new processor won't be created
+        with that filepath. The filepath will just be removed from the list.
+        """
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            max_runs=1,
+            processor_factory=MagicMock().return_value,
+            processor_timeout=timedelta.max,
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        file_1 = 'file_1.py'
+        file_2 = 'file_2.py'
+        file_3 = 'file_3.py'
+        manager._file_path_queue = [file_1, file_2, file_3]
+
+        manager._processors[file_1] = MagicMock()
+        manager.start_new_processes()
+
+        assert file_1 in manager._processors.keys()
+        assert file_2 in manager._processors.keys()
+        assert [file_3] == manager._file_path_queue

Review comment:
       @ashb Updated in https://github.com/apache/airflow/pull/13662/commits/b7fd3749d2db2ff8edc12fecec2cb7f6534a0e5f




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org