You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/06/17 13:06:18 UTC
[airflow] branch master updated: Refactor CeleryExecutor to avoid
duplication of code in test (#9345)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 54667d1 Refactor CeleryExecutor to avoid duplication of code in test (#9345)
54667d1 is described below
commit 54667d1eaa626358702d07051f9cb4b1754a1481
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Jun 17 14:05:39 2020 +0100
Refactor CeleryExecutor to avoid duplication of code in test (#9345)
The test code had duplicated most of the code in "trigger_tasks" --
meaning we weren't strictly speaking testing the executor anymore.
This removes the duplicated code in the test by refactoring the method
in the executor and calling that instead.
---
airflow/executors/celery_executor.py | 47 ++++++++++++++++++---------------
tests/executors/test_celery_executor.py | 28 +++++---------------
2 files changed, 31 insertions(+), 44 deletions(-)
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 532b0ca..1dda3cb 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -175,28 +175,31 @@ class CeleryExecutor(BaseExecutor):
task_tuples_to_send.append((key, simple_ti, command, queue, execute_command))
if task_tuples_to_send:
- first_task = next(t[4] for t in task_tuples_to_send)
-
- # Celery state queries will stuck if we do not use one same backend
- # for all tasks.
- cached_celery_backend = first_task.backend
-
- key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
- self.log.debug('Sent all tasks.')
-
- for key, command, result in key_and_async_results:
- if isinstance(result, ExceptionWithTraceback):
- self.log.error( # pylint: disable=logging-not-lazy
- CELERY_SEND_ERR_MSG_HEADER + ":%s\n%s\n", result.exception, result.traceback
- )
- elif result is not None:
- # Only pops when enqueued successfully, otherwise keep it
- # and expect scheduler loop to deal with it.
- self.queued_tasks.pop(key)
- result.backend = cached_celery_backend
- self.running.add(key)
- self.tasks[key] = result
- self.last_state[key] = celery_states.PENDING
+ self._process_tasks(task_tuples_to_send)
+
+ def _process_tasks(self, task_tuples_to_send: List[TaskInstanceInCelery]) -> None:
+ first_task = next(t[4] for t in task_tuples_to_send)
+
+ # Celery state queries will stuck if we do not use one same backend
+ # for all tasks.
+ cached_celery_backend = first_task.backend
+
+ key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
+ self.log.debug('Sent all tasks.')
+
+ for key, _, result in key_and_async_results:
+ if isinstance(result, ExceptionWithTraceback):
+ self.log.error( # pylint: disable=logging-not-lazy
+ CELERY_SEND_ERR_MSG_HEADER + ":%s\n%s\n", result.exception, result.traceback
+ )
+ elif result is not None:
+ # Only pops when enqueued successfully, otherwise keep it
+ # and expect scheduler loop to deal with it.
+ self.queued_tasks.pop(key)
+ result.backend = cached_celery_backend
+ self.running.add(key)
+ self.tasks[key] = result
+ self.last_state[key] = celery_states.PENDING
def _send_tasks_to_celery(self, task_tuples_to_send):
if len(task_tuples_to_send) == 1 or self._sync_parallelism == 1:
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index fc88c00..4fe0178 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -21,14 +21,13 @@ import json
import os
import sys
import unittest
-from multiprocessing import Pool
from unittest import mock
# leave this it is used by the test worker
# noinspection PyUnresolvedReferences
import celery.contrib.testing.tasks # noqa: F401 pylint: disable=unused-import
import pytest
-from celery import Celery, states as celery_states
+from celery import Celery
from celery.backends.base import BaseBackend, BaseKeyValueStoreBackend
from celery.backends.database import DatabaseBackend
from celery.contrib.testing.worker import start_worker
@@ -116,7 +115,6 @@ class TestCeleryExecutor(unittest.TestCase):
with start_worker(app=app, logfile=sys.stdout, loglevel='info'):
execute_date = datetime.datetime.now()
- cached_celery_backend = celery_executor.execute_command.backend
task_tuples_to_send = [
(('success', 'fake_simple_ti', execute_date, 0),
None, success_command, celery_executor.celery_configuration['task_default_queue'],
@@ -126,25 +124,11 @@ class TestCeleryExecutor(unittest.TestCase):
celery_executor.execute_command)
]
- chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send))
- num_processes = min(len(task_tuples_to_send), executor._sync_parallelism)
-
- with Pool(processes=num_processes) as send_pool:
- key_and_async_results = send_pool.map(
- celery_executor.send_task_to_executor,
- task_tuples_to_send,
- chunksize=chunksize)
-
- for task_instance_key, _, result in key_and_async_results:
- # Only pops when enqueued successfully, otherwise keep it
- # and expect scheduler loop to deal with it.
- result.backend = cached_celery_backend
- executor.running.add(task_instance_key)
- executor.tasks[task_instance_key] = result
- executor.last_state[task_instance_key] = celery_states.PENDING
-
- executor.running.add(('success', 'fake_simple_ti', execute_date, 0))
- executor.running.add(('fail', 'fake_simple_ti', execute_date, 0))
+ # "Enqueue" them. We don't have a real SimpleTaskInstance, so directly edit the dict
+ for (key, simple_ti, command, queue, task) in task_tuples_to_send:
+ executor.queued_tasks[key] = (command, 1, queue, simple_ti)
+
+ executor._process_tasks(task_tuples_to_send)
executor.end(synchronous=True)