You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/07/27 09:50:54 UTC

[airflow] branch main updated: Dont use TaskInstance in CeleryExecutor.trigger_tasks (#16248)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0801322  Dont use TaskInstance in CeleryExecutor.trigger_tasks (#16248)
0801322 is described below

commit 080132254b06127a6e2e8a2e23ceed6a7859d498
Author: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
AuthorDate: Tue Jul 27 11:50:18 2021 +0200

    Dont use TaskInstance in CeleryExecutor.trigger_tasks (#16248)
    
    * Dont use TaskInstance in CeleryExecutor.trigger_tasks
    
    * fixup! Merge branch 'main' into celery-multiprocessing-2
---
 airflow/executors/celery_executor.py    | 14 +++++++-------
 tests/executors/test_celery_executor.py |  6 ++----
 2 files changed, 9 insertions(+), 11 deletions(-)

diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index b2c5016..e3fc398 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -46,7 +46,7 @@ from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, AirflowTaskTimeout
 from airflow.executors.base_executor import BaseExecutor, CommandType, EventBufferValueType
-from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey
+from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
 from airflow.stats import Stats
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.net import get_hostname
@@ -154,15 +154,15 @@ class ExceptionWithTraceback:
 
 
 # Task instance that is sent over Celery queues
-# TaskInstanceKey, SimpleTaskInstance, Command, queue_name, CallableTask
-TaskInstanceInCelery = Tuple[TaskInstanceKey, SimpleTaskInstance, CommandType, Optional[str], Task]
+# TaskInstanceKey, Command, queue_name, CallableTask
+TaskInstanceInCelery = Tuple[TaskInstanceKey, CommandType, Optional[str], Task]
 
 
 def send_task_to_executor(
     task_tuple: TaskInstanceInCelery,
 ) -> Tuple[TaskInstanceKey, CommandType, Union[AsyncResult, ExceptionWithTraceback]]:
     """Sends task to executor."""
-    key, _, command, queue, task_to_run = task_tuple
+    key, command, queue, task_to_run = task_tuple
     try:
         with timeout(seconds=OPERATION_TIMEOUT):
             result = task_to_run.apply_async(args=[command], queue=queue)
@@ -250,8 +250,8 @@ class CeleryExecutor(BaseExecutor):
         task_tuples_to_send: List[TaskInstanceInCelery] = []
 
         for _ in range(min(open_slots, len(self.queued_tasks))):
-            key, (command, _, queue, simple_ti) = sorted_queue.pop(0)
-            task_tuple = (key, simple_ti, command, queue, execute_command)
+            key, (command, _, queue, _) = sorted_queue.pop(0)
+            task_tuple = (key, command, queue, execute_command)
             task_tuples_to_send.append(task_tuple)
             if key not in self.task_publish_retries:
                 self.task_publish_retries[key] = 1
@@ -260,7 +260,7 @@ class CeleryExecutor(BaseExecutor):
             self._process_tasks(task_tuples_to_send)
 
     def _process_tasks(self, task_tuples_to_send: List[TaskInstanceInCelery]) -> None:
-        first_task = next(t[4] for t in task_tuples_to_send)
+        first_task = next(t[3] for t in task_tuples_to_send)
 
         # Celery state queries will stuck if we do not use one same backend
         # for all tasks.
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 88ea95c..cdda8ee 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -126,14 +126,12 @@ class TestCeleryExecutor(unittest.TestCase):
                 task_tuples_to_send = [
                     (
                         ('success', 'fake_simple_ti', execute_date, 0),
-                        None,
                         success_command,
                         celery_executor.celery_configuration['task_default_queue'],
                         celery_executor.execute_command,
                     ),
                     (
                         ('fail', 'fake_simple_ti', execute_date, 0),
-                        None,
                         fail_command,
                         celery_executor.celery_configuration['task_default_queue'],
                         celery_executor.execute_command,
@@ -141,8 +139,8 @@ class TestCeleryExecutor(unittest.TestCase):
                 ]
 
                 # "Enqueue" them. We don't have a real SimpleTaskInstance, so directly edit the dict
-                for (key, simple_ti, command, queue, task) in task_tuples_to_send:
-                    executor.queued_tasks[key] = (command, 1, queue, simple_ti)
+                for (key, command, queue, task) in task_tuples_to_send:
+                    executor.queued_tasks[key] = (command, 1, queue, None)
                     executor.task_publish_retries[key] = 1
 
                 executor._process_tasks(task_tuples_to_send)