You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/14 04:01:41 UTC
[airflow] branch master updated: Do not create a separate process
for one task in CeleryExecutor (#8855)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new fc862a3 Do not create a separate process for one task in CeleryExecutor (#8855)
fc862a3 is described below
commit fc862a3edd010e65b9b3fe586855fe81807ee4e8
Author: Kamil BreguĊa <mi...@users.noreply.github.com>
AuthorDate: Thu May 14 06:01:13 2020 +0200
Do not create a separate process for one task in CeleryExecutor (#8855)
---
airflow/executors/celery_executor.py | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 98fff4b..9ad762e 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -170,11 +170,11 @@ class CeleryExecutor(BaseExecutor):
task_tuples_to_send.append((key, simple_ti, command, queue, execute_command))
if task_tuples_to_send:
- tasks = [t[4] for t in task_tuples_to_send]
+ first_task = next(t[4] for t in task_tuples_to_send)
# Celery state queries will stuck if we do not use one same backend
# for all tasks.
- cached_celery_backend = tasks[0].backend
+ cached_celery_backend = first_task.backend
key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
self.log.debug('Sent all tasks.')
@@ -194,6 +194,11 @@ class CeleryExecutor(BaseExecutor):
self.last_state[key] = celery_states.PENDING
def _send_tasks_to_celery(self, task_tuples_to_send):
+ if len(task_tuples_to_send) == 1:
+ # One tuple, so send it in the main thread.
+ return [
+ send_task_to_executor(task_tuples_to_send[0])
+ ]
# Use chunks instead of a work queue to reduce context switching
# since tasks are roughly uniform in size
chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send))