You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/06/15 09:17:05 UTC
[airflow] branch master updated: Send Celery tasks from main
process when sync_parallelism is 1 (#9253)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new e5ce87a Send Celery tasks from main process when sync_parallelism is 1 (#9253)
e5ce87a is described below
commit e5ce87ac25d3a2f5a87a14284f528072d19055f6
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Mon Jun 15 10:16:04 2020 +0100
Send Celery tasks from main process when sync_parallelism is 1 (#9253)
In attempting to debug some other behaviour I discovered that we spawn a
"pool" of 1 process (main process, plus another) -- which makes it
hard to use a debugger against.
This changes it so that if a single process is configured that it is
just run directly, preserving the ability to step through with a
debugger
---
airflow/executors/celery_executor.py | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index cf3f18c..532b0ca 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -199,11 +199,10 @@ class CeleryExecutor(BaseExecutor):
self.last_state[key] = celery_states.PENDING
def _send_tasks_to_celery(self, task_tuples_to_send):
- if len(task_tuples_to_send) == 1:
- # One tuple, so send it in the main thread.
- return [
- send_task_to_executor(task_tuples_to_send[0])
- ]
+ if len(task_tuples_to_send) == 1 or self._sync_parallelism == 1:
+ # One tuple, or max one process -> send it in the main thread.
+ return list(map(send_task_to_executor, task_tuples_to_send))
+
# Use chunks instead of a work queue to reduce context switching
# since tasks are roughly uniform in size
chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send))