You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/27 05:33:03 UTC

[GitHub] [airflow] pingzh opened a new pull request #11875: Dont create duplicate dag file processors

pingzh opened a new pull request #11875:
URL: https://github.com/apache/airflow/pull/11875


   Context: when a dag file is under processing and multiple callbacks are
   created either via zombies or executor events, the dag file is added to
   the _file_path_queue and the manager will launch a new process to
   process it, which it should not since the dag file is currently under
   processing. This will bypass the _parallelism eventually especially when
   it takes long time to process some dag files. We have seen ~200 dag
   processors on the scheduler even we set the _parallelism as 60. More dag
   file processors make CPU spike and in turn it makes the dag file
   processing even slower. In the end, the scheduler is taken down.
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #11875: Dont create duplicate dag file processors

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #11875:
URL: https://github.com/apache/airflow/pull/11875#issuecomment-739794104


   I'm not yet convinced that this is a) actually a bug, or b) the right solution.
   
   Changing the milestone for now, and it can come in 2.0.x or 2.1


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil closed pull request #11875: Dont create duplicate dag file processors

Posted by GitBox <gi...@apache.org>.
kaxil closed pull request #11875:
URL: https://github.com/apache/airflow/pull/11875


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #11875: Dont create duplicate dag file processors

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #11875:
URL: https://github.com/apache/airflow/pull/11875#issuecomment-717244661


   Are you sure this is still a problem on mainline. Looking at start_new_processes:
   
   ```python
       def start_new_processes(self):
           """Start more processors if we have enough slots and files to process"""
           while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
               file_path = self._file_path_queue.pop(0)
               callback_to_execute_for_file = self._callback_to_execute[file_path]
               processor = self._processor_factory(
                   file_path,
                   callback_to_execute_for_file,
                   self._dag_ids,
                   self._pickle_dags)
   
               del self._callback_to_execute[file_path]
               Stats.incr('dag_processing.processes')
   
               processor.start()
               self.log.debug(
                   "Started a process (PID: %s) to generate tasks for %s",
                   processor.pid, file_path
               )
               self._processors[file_path] = processor
               self.waitables[processor.waitable_handle] = processor
   ```
   
   I can't see at first glance how `self._parallelism - len(self._processors) > 0` would ever lead to too many processes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh edited a comment on pull request #11875: Dont create duplicate dag file processors

Posted by GitBox <gi...@apache.org>.
pingzh edited a comment on pull request #11875:
URL: https://github.com/apache/airflow/pull/11875#issuecomment-717392179


   > Are you sure this is still a problem on mainline. Looking at start_new_processes:
   > 
   > ```python
   >     def start_new_processes(self):
   >         """Start more processors if we have enough slots and files to process"""
   >         while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
   >             file_path = self._file_path_queue.pop(0)
   >             callback_to_execute_for_file = self._callback_to_execute[file_path]
   >             processor = self._processor_factory(
   >                 file_path,
   >                 callback_to_execute_for_file,
   >                 self._dag_ids,
   >                 self._pickle_dags)
   > 
   >             del self._callback_to_execute[file_path]
   >             Stats.incr('dag_processing.processes')
   > 
   >             processor.start()
   >             self.log.debug(
   >                 "Started a process (PID: %s) to generate tasks for %s",
   >                 processor.pid, file_path
   >             )
   >             self._processors[file_path] = processor
   >             self.waitables[processor.waitable_handle] = processor
   > ```
   > 
   > I can't see at first glance how `self._parallelism - len(self._processors) > 0` would ever lead to too many processes.
   
   Yes, it still an issue, the main logic to launch new dag file processes does not change much between `1.10.4` and the `master` branch. We also cherry-picked this PR https://github.com/apache/airflow/pull/7597 to our `1.10.4` version.
   
   The issue does not happen often.
   
   This is the incident leading us to find this issue. As you can see, the same dag file is processing many times (the dag process for this dag file usually takes more than 15 min)
   
   ![image](https://user-images.githubusercontent.com/8662365/97335898-9cef5600-183b-11eb-99fa-6e21e6ef78a2.png)
   
   when the manager adds the callback to the `_file_path_queue`, it does not care whether this dag file is currently under processing or in the cool down period, which leads to multiple dag processes processing the same dag file.
   
   As for the exceed of the `_parallelism`, I have lost some context about how exactly it got into that state :(
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on pull request #11875: Dont create duplicate dag file processors

Posted by GitBox <gi...@apache.org>.
pingzh commented on pull request #11875:
URL: https://github.com/apache/airflow/pull/11875#issuecomment-717392179


   > Are you sure this is still a problem on mainline. Looking at start_new_processes:
   > 
   > ```python
   >     def start_new_processes(self):
   >         """Start more processors if we have enough slots and files to process"""
   >         while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
   >             file_path = self._file_path_queue.pop(0)
   >             callback_to_execute_for_file = self._callback_to_execute[file_path]
   >             processor = self._processor_factory(
   >                 file_path,
   >                 callback_to_execute_for_file,
   >                 self._dag_ids,
   >                 self._pickle_dags)
   > 
   >             del self._callback_to_execute[file_path]
   >             Stats.incr('dag_processing.processes')
   > 
   >             processor.start()
   >             self.log.debug(
   >                 "Started a process (PID: %s) to generate tasks for %s",
   >                 processor.pid, file_path
   >             )
   >             self._processors[file_path] = processor
   >             self.waitables[processor.waitable_handle] = processor
   > ```
   > 
   > I can't see at first glance how `self._parallelism - len(self._processors) > 0` would ever lead to too many processes.
   
   Yes, it still an issue, the main logic to launch new dag file processes does not change much between `1.10.4` and the `master` branch. We also cherry-picked this PR https://github.com/apache/airflow/pull/7597 to our `1.10.4` version.
   
   The issue does not happen often.
   
   This is the incident leading us to find this issue. As you can see, the same dag file is processing many times (the dag process for this dag file usually takes more than 15 min)
   
   ![image](https://user-images.githubusercontent.com/8662365/97335898-9cef5600-183b-11eb-99fa-6e21e6ef78a2.png)
   
   I have lost some context about how exactly it got into that state.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #11875: Dont create duplicate dag file processors

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #11875:
URL: https://github.com/apache/airflow/pull/11875#issuecomment-739793454


   Hey @kaxil @ashb - do your want to make it part of 2.0.0rc1 ? O should we change the milestone?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #11875: Dont create duplicate dag file processors

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #11875:
URL: https://github.com/apache/airflow/pull/11875#issuecomment-717922254


   > when the manager adds the callback to the _file_path_queue, it does not care whether this dag file is currently under processing or in the cool down period,
   
   Yes, this is "by design" -- if there's a callback we need to execute it "now" to run it.
   
   https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L713-L718
   
   Given we remove it form the list if it's already there, and the existing checks I'm not sure the two "issues" are related (1- jumping up the queue/ignoring cool down period, and 2- going beyond concurrency limits)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on pull request #11875: Dont create duplicate dag file processors

Posted by GitBox <gi...@apache.org>.
pingzh commented on pull request #11875:
URL: https://github.com/apache/airflow/pull/11875#issuecomment-727037517


   @kaxil and @ashb friendly remind of this PR. thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on a change in pull request #11875: Dont create duplicate dag file processors

Posted by GitBox <gi...@apache.org>.
pingzh commented on a change in pull request #11875:
URL: https://github.com/apache/airflow/pull/11875#discussion_r512879073



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -138,6 +138,29 @@ def test_max_runs_when_no_files(self):
         child_pipe.close()
         parent_pipe.close()
 
+    def test_start_new_processes_with_files_to_exclude(self):
+        processor_factory = MagicMock()
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            max_runs=1,
+            processor_factory=processor_factory,
+            processor_timeout=timedelta.max,
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True)
+        f1 = '/tmp/f1.py'
+        f2 = '/tmp/f2.py'
+        manager._file_path_queue = [f1, f2]
+        files_paths_to_exclude_in_this_loop = {f1}
+
+        manager.start_new_processes(files_paths_to_exclude_in_this_loop)

Review comment:
       if the manager loop calls `prepare_file_path_queue()` in every loop, we don't need this change (passing `files_paths_to_exclude_in_this_loop` to the `start_new_processes`. 
   
   It is only called when `if not self._file_path_queue`. If we remove this, we will also need to to some dupe work in the `prepare_file_path_queue`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] pingzh commented on a change in pull request #11875: Dont create duplicate dag file processors

Posted by GitBox <gi...@apache.org>.
pingzh commented on a change in pull request #11875:
URL: https://github.com/apache/airflow/pull/11875#discussion_r512880535



##########
File path: tests/utils/test_dag_processing.py
##########
@@ -138,6 +138,29 @@ def test_max_runs_when_no_files(self):
         child_pipe.close()
         parent_pipe.close()
 
+    def test_start_new_processes_with_files_to_exclude(self):
+        processor_factory = MagicMock()
+        manager = DagFileProcessorManager(
+            dag_directory='directory',
+            max_runs=1,
+            processor_factory=processor_factory,
+            processor_timeout=timedelta.max,
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True)
+        f1 = '/tmp/f1.py'
+        f2 = '/tmp/f2.py'
+        manager._file_path_queue = [f1, f2]
+        files_paths_to_exclude_in_this_loop = {f1}
+
+        manager.start_new_processes(files_paths_to_exclude_in_this_loop)

Review comment:
       i chose to have `files_paths_to_exclude_in_this_loop` inside the `start_new_processes` as i was thinking that is single place we do a `pop(0)` and start new process, so adding a check will guarantee it won't double process dag files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org