You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/14 04:01:41 UTC

[airflow] branch master updated: Do not create a separate process for one task in CeleryExecutor (#8855)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new fc862a3  Do not create a separate process for one task in CeleryExecutor (#8855)
fc862a3 is described below

commit fc862a3edd010e65b9b3fe586855fe81807ee4e8
Author: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
AuthorDate: Thu May 14 06:01:13 2020 +0200

    Do not create a separate process for one task in CeleryExecutor (#8855)
---
 airflow/executors/celery_executor.py | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 98fff4b..9ad762e 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -170,11 +170,11 @@ class CeleryExecutor(BaseExecutor):
             task_tuples_to_send.append((key, simple_ti, command, queue, execute_command))
 
         if task_tuples_to_send:
-            tasks = [t[4] for t in task_tuples_to_send]
+            first_task = next(t[4] for t in task_tuples_to_send)
 
             # Celery state queries will stuck if we do not use one same backend
             # for all tasks.
-            cached_celery_backend = tasks[0].backend
+            cached_celery_backend = first_task.backend
 
             key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
             self.log.debug('Sent all tasks.')
@@ -194,6 +194,11 @@ class CeleryExecutor(BaseExecutor):
                     self.last_state[key] = celery_states.PENDING
 
     def _send_tasks_to_celery(self, task_tuples_to_send):
+        if len(task_tuples_to_send) == 1:
+            # One tuple, so send it in the main thread.
+            return [
+                send_task_to_executor(task_tuples_to_send[0])
+            ]
         # Use chunks instead of a work queue to reduce context switching
         # since tasks are roughly uniform in size
         chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send))