You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/07/27 09:50:54 UTC
[airflow] branch main updated: Dont use TaskInstance in
CeleryExecutor.trigger_tasks (#16248)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0801322 Dont use TaskInstance in CeleryExecutor.trigger_tasks (#16248)
0801322 is described below
commit 080132254b06127a6e2e8a2e23ceed6a7859d498
Author: Kamil BreguĊa <mi...@users.noreply.github.com>
AuthorDate: Tue Jul 27 11:50:18 2021 +0200
Dont use TaskInstance in CeleryExecutor.trigger_tasks (#16248)
* Dont use TaskInstance in CeleryExecutor.trigger_tasks
* fixup! Merge branch 'main' into celery-multiprocessing-2
---
airflow/executors/celery_executor.py | 14 +++++++-------
tests/executors/test_celery_executor.py | 6 ++----
2 files changed, 9 insertions(+), 11 deletions(-)
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index b2c5016..e3fc398 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -46,7 +46,7 @@ from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowTaskTimeout
from airflow.executors.base_executor import BaseExecutor, CommandType, EventBufferValueType
-from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey
+from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
@@ -154,15 +154,15 @@ class ExceptionWithTraceback:
# Task instance that is sent over Celery queues
-# TaskInstanceKey, SimpleTaskInstance, Command, queue_name, CallableTask
-TaskInstanceInCelery = Tuple[TaskInstanceKey, SimpleTaskInstance, CommandType, Optional[str], Task]
+# TaskInstanceKey, Command, queue_name, CallableTask
+TaskInstanceInCelery = Tuple[TaskInstanceKey, CommandType, Optional[str], Task]
def send_task_to_executor(
task_tuple: TaskInstanceInCelery,
) -> Tuple[TaskInstanceKey, CommandType, Union[AsyncResult, ExceptionWithTraceback]]:
"""Sends task to executor."""
- key, _, command, queue, task_to_run = task_tuple
+ key, command, queue, task_to_run = task_tuple
try:
with timeout(seconds=OPERATION_TIMEOUT):
result = task_to_run.apply_async(args=[command], queue=queue)
@@ -250,8 +250,8 @@ class CeleryExecutor(BaseExecutor):
task_tuples_to_send: List[TaskInstanceInCelery] = []
for _ in range(min(open_slots, len(self.queued_tasks))):
- key, (command, _, queue, simple_ti) = sorted_queue.pop(0)
- task_tuple = (key, simple_ti, command, queue, execute_command)
+ key, (command, _, queue, _) = sorted_queue.pop(0)
+ task_tuple = (key, command, queue, execute_command)
task_tuples_to_send.append(task_tuple)
if key not in self.task_publish_retries:
self.task_publish_retries[key] = 1
@@ -260,7 +260,7 @@ class CeleryExecutor(BaseExecutor):
self._process_tasks(task_tuples_to_send)
def _process_tasks(self, task_tuples_to_send: List[TaskInstanceInCelery]) -> None:
- first_task = next(t[4] for t in task_tuples_to_send)
+ first_task = next(t[3] for t in task_tuples_to_send)
# Celery state queries will stuck if we do not use one same backend
# for all tasks.
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 88ea95c..cdda8ee 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -126,14 +126,12 @@ class TestCeleryExecutor(unittest.TestCase):
task_tuples_to_send = [
(
('success', 'fake_simple_ti', execute_date, 0),
- None,
success_command,
celery_executor.celery_configuration['task_default_queue'],
celery_executor.execute_command,
),
(
('fail', 'fake_simple_ti', execute_date, 0),
- None,
fail_command,
celery_executor.celery_configuration['task_default_queue'],
celery_executor.execute_command,
@@ -141,8 +139,8 @@ class TestCeleryExecutor(unittest.TestCase):
]
# "Enqueue" them. We don't have a real SimpleTaskInstance, so directly edit the dict
- for (key, simple_ti, command, queue, task) in task_tuples_to_send:
- executor.queued_tasks[key] = (command, 1, queue, simple_ti)
+ for (key, command, queue, task) in task_tuples_to_send:
+ executor.queued_tasks[key] = (command, 1, queue, None)
executor.task_publish_retries[key] = 1
executor._process_tasks(task_tuples_to_send)