You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/06/17 13:06:18 UTC

[airflow] branch master updated: Refactor CeleryExecutor to avoid duplication of code in test (#9345)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 54667d1  Refactor CeleryExecutor to avoid duplication of code in test (#9345)
54667d1 is described below

commit 54667d1eaa626358702d07051f9cb4b1754a1481
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Wed Jun 17 14:05:39 2020 +0100

    Refactor CeleryExecutor to avoid duplication of code in test (#9345)
    
    The test code had duplicated most of the code in "trigger_tasks" --
    meaning we weren't strictly speaking testing the executor anymore.
    
    This removes the duplicated code in the test by refactoring the method
    in the executor and calling that instead.
---
 airflow/executors/celery_executor.py    | 47 ++++++++++++++++++---------------
 tests/executors/test_celery_executor.py | 28 +++++---------------
 2 files changed, 31 insertions(+), 44 deletions(-)

diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 532b0ca..1dda3cb 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -175,28 +175,31 @@ class CeleryExecutor(BaseExecutor):
             task_tuples_to_send.append((key, simple_ti, command, queue, execute_command))
 
         if task_tuples_to_send:
-            first_task = next(t[4] for t in task_tuples_to_send)
-
-            # Celery state queries will stuck if we do not use one same backend
-            # for all tasks.
-            cached_celery_backend = first_task.backend
-
-            key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
-            self.log.debug('Sent all tasks.')
-
-            for key, command, result in key_and_async_results:
-                if isinstance(result, ExceptionWithTraceback):
-                    self.log.error(  # pylint: disable=logging-not-lazy
-                        CELERY_SEND_ERR_MSG_HEADER + ":%s\n%s\n", result.exception, result.traceback
-                    )
-                elif result is not None:
-                    # Only pops when enqueued successfully, otherwise keep it
-                    # and expect scheduler loop to deal with it.
-                    self.queued_tasks.pop(key)
-                    result.backend = cached_celery_backend
-                    self.running.add(key)
-                    self.tasks[key] = result
-                    self.last_state[key] = celery_states.PENDING
+            self._process_tasks(task_tuples_to_send)
+
+    def _process_tasks(self, task_tuples_to_send: List[TaskInstanceInCelery]) -> None:
+        first_task = next(t[4] for t in task_tuples_to_send)
+
+        # Celery state queries will stuck if we do not use one same backend
+        # for all tasks.
+        cached_celery_backend = first_task.backend
+
+        key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
+        self.log.debug('Sent all tasks.')
+
+        for key, _, result in key_and_async_results:
+            if isinstance(result, ExceptionWithTraceback):
+                self.log.error(  # pylint: disable=logging-not-lazy
+                    CELERY_SEND_ERR_MSG_HEADER + ":%s\n%s\n", result.exception, result.traceback
+                )
+            elif result is not None:
+                # Only pops when enqueued successfully, otherwise keep it
+                # and expect scheduler loop to deal with it.
+                self.queued_tasks.pop(key)
+                result.backend = cached_celery_backend
+                self.running.add(key)
+                self.tasks[key] = result
+                self.last_state[key] = celery_states.PENDING
 
     def _send_tasks_to_celery(self, task_tuples_to_send):
         if len(task_tuples_to_send) == 1 or self._sync_parallelism == 1:
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index fc88c00..4fe0178 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -21,14 +21,13 @@ import json
 import os
 import sys
 import unittest
-from multiprocessing import Pool
 from unittest import mock
 
 # leave this it is used by the test worker
 # noinspection PyUnresolvedReferences
 import celery.contrib.testing.tasks  # noqa: F401 pylint: disable=unused-import
 import pytest
-from celery import Celery, states as celery_states
+from celery import Celery
 from celery.backends.base import BaseBackend, BaseKeyValueStoreBackend
 from celery.backends.database import DatabaseBackend
 from celery.contrib.testing.worker import start_worker
@@ -116,7 +115,6 @@ class TestCeleryExecutor(unittest.TestCase):
             with start_worker(app=app, logfile=sys.stdout, loglevel='info'):
                 execute_date = datetime.datetime.now()
 
-                cached_celery_backend = celery_executor.execute_command.backend
                 task_tuples_to_send = [
                     (('success', 'fake_simple_ti', execute_date, 0),
                      None, success_command, celery_executor.celery_configuration['task_default_queue'],
@@ -126,25 +124,11 @@ class TestCeleryExecutor(unittest.TestCase):
                      celery_executor.execute_command)
                 ]
 
-                chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send))
-                num_processes = min(len(task_tuples_to_send), executor._sync_parallelism)
-
-                with Pool(processes=num_processes) as send_pool:
-                    key_and_async_results = send_pool.map(
-                        celery_executor.send_task_to_executor,
-                        task_tuples_to_send,
-                        chunksize=chunksize)
-
-                for task_instance_key, _, result in key_and_async_results:
-                    # Only pops when enqueued successfully, otherwise keep it
-                    # and expect scheduler loop to deal with it.
-                    result.backend = cached_celery_backend
-                    executor.running.add(task_instance_key)
-                    executor.tasks[task_instance_key] = result
-                    executor.last_state[task_instance_key] = celery_states.PENDING
-
-                executor.running.add(('success', 'fake_simple_ti', execute_date, 0))
-                executor.running.add(('fail', 'fake_simple_ti', execute_date, 0))
+                # "Enqueue" them. We don't have a real SimpleTaskInstance, so directly edit the dict
+                for (key, simple_ti, command, queue, task) in task_tuples_to_send:
+                    executor.queued_tasks[key] = (command, 1, queue, simple_ti)
+
+                executor._process_tasks(task_tuples_to_send)
 
                 executor.end(synchronous=True)