You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/06/15 09:17:05 UTC

[airflow] branch master updated: Send Celery tasks from main process when sync_parallelism is 1 (#9253)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new e5ce87a  Send Celery tasks from main process when sync_parallelism is 1 (#9253)
e5ce87a is described below

commit e5ce87ac25d3a2f5a87a14284f528072d19055f6
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Mon Jun 15 10:16:04 2020 +0100

    Send Celery tasks from main process when sync_parallelism is 1 (#9253)
    
    In attempting to debug some other behaviour I discovered that we spawn a
    "pool" of 1 process (main process, plus another) -- which makes it
    hard to use a debugger against.
    
    This changes it so that if a single process is configured that it is
    just run directly, preserving the ability to step through with a
    debugger
---
 airflow/executors/celery_executor.py | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index cf3f18c..532b0ca 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -199,11 +199,10 @@ class CeleryExecutor(BaseExecutor):
                     self.last_state[key] = celery_states.PENDING
 
     def _send_tasks_to_celery(self, task_tuples_to_send):
-        if len(task_tuples_to_send) == 1:
-            # One tuple, so send it in the main thread.
-            return [
-                send_task_to_executor(task_tuples_to_send[0])
-            ]
+        if len(task_tuples_to_send) == 1 or self._sync_parallelism == 1:
+            # One tuple, or max one process -> send it in the main thread.
+            return list(map(send_task_to_executor, task_tuples_to_send))
+
         # Use chunks instead of a work queue to reduce context switching
         # since tasks are roughly uniform in size
         chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send))