You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/14 14:32:10 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #13662: Stop creating duplicate Dag File Processors

kaxil commented on a change in pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#discussion_r557436671



##########
File path: airflow/utils/dag_processing.py
##########
@@ -988,6 +993,15 @@ def start_new_processes(self):
         """Start more processors if we have enough slots and files to process"""
         while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
             file_path = self._file_path_queue.pop(0)
+            # Stop creating duplicate processor i.e. processor with the same filepath
+            if file_path in self._processors.keys():
+                # If filepath is no longer in the queue and if callback exists to execute
+                # we add the filepath to the end of queue so it still runs before
+                # new filepaths are added to the queue as we should runs callbacks asap
+                if file_path not in self._file_path_queue and self._callback_to_execute[file_path]:
+                    self._file_path_queue.append(file_path)

Review comment:
       I am in two minds about this. We can remove these lines -- and let it get executed when the file is added again to file_path_queue




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org