You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/05/22 11:21:18 UTC

[airflow] 01/02: [AIRFLOW-3367] Run celery integration test with redis broker. (#4207)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d13c6810674dae4abbc572116852e6b331c9f40e
Author: Joshua Carp <jm...@gmail.com>
AuthorDate: Mon Dec 3 04:19:25 2018 -0500

    [AIRFLOW-3367] Run celery integration test with redis broker. (#4207)
    
    (cherry picked from commit 5710ef2615ad7a24c4b039f491e0fabd942978b3)
---
 tests/executors/test_celery_executor.py | 165 +++++++++++++++++++-------------
 1 file changed, 100 insertions(+), 65 deletions(-)

diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index f76e588..511c90d 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -16,69 +16,103 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import contextlib
+import os
 import sys
 import unittest
 from multiprocessing import Pool
 
+# leave this it is used by the test worker
+# noinspection PyUnresolvedReferences
+import celery.contrib.testing.tasks  # noqa: F401 pylint: disable=unused-import
 import mock
-
-from celery.contrib.testing.worker import start_worker
 import pytest
+from celery import Celery
 from celery import states as celery_states
+from celery.contrib.testing.worker import start_worker
+from kombu.asynchronous import set_event_loop
+from parameterized import parameterized
 
+from airflow.configuration import conf
 from airflow.executors import celery_executor
-from airflow.executors.celery_executor import (CeleryExecutor, celery_configuration,
-                                               send_task_to_executor, execute_command)
-from airflow.executors.celery_executor import app
 from airflow.utils.state import State
 
-# leave this it is used by the test worker
-import celery.contrib.testing.tasks  # noqa: F401 pylint: disable=ungrouped-imports
+
+def _prepare_test_bodies():
+    if 'CELERY_BROKER_URLS' in os.environ:
+        return [
+            (url, )
+            for url in os.environ['CELERY_BROKER_URLS'].split(',')
+        ]
+    return [(conf.get('celery', 'BROKER_URL'))]
 
 
-class CeleryExecutorTest(unittest.TestCase):
+class TestCeleryExecutor(unittest.TestCase):
+
+    @contextlib.contextmanager
+    def _prepare_app(self, broker_url=None, execute=None):
+        broker_url = broker_url or conf.get('celery', 'BROKER_URL')
+        execute = execute or celery_executor.execute_command.__wrapped__
+
+        test_config = dict(celery_executor.celery_configuration)
+        test_config.update({'broker_url': broker_url})
+        test_app = Celery(broker_url, config_source=test_config)
+        test_execute = test_app.task(execute)
+        patch_app = mock.patch('airflow.executors.celery_executor.app', test_app)
+        patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute)
+
+        with patch_app, patch_execute:
+            try:
+                yield test_app
+            finally:
+                # Clear event loop to tear down each celery instance
+                set_event_loop(None)
+
     @pytest.mark.integration("redis")
     @pytest.mark.integration("rabbitmq")
     @pytest.mark.backend("mysql", "postgres")
-    def test_celery_integration(self):
-        executor = CeleryExecutor()
-        executor.start()
-        with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
-            success_command = ['true', 'some_parameter']
-            fail_command = ['false', 'some_parameter']
-
-            cached_celery_backend = execute_command.backend
-            task_tuples_to_send = [('success', 'fake_simple_ti', success_command,
-                                    celery_configuration['task_default_queue'],
-                                    execute_command),
-                                   ('fail', 'fake_simple_ti', fail_command,
-                                    celery_configuration['task_default_queue'],
-                                    execute_command)]
-
-            chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send))
-            num_processes = min(len(task_tuples_to_send), executor._sync_parallelism)
-
-            send_pool = Pool(processes=num_processes)
-            key_and_async_results = send_pool.map(
-                send_task_to_executor,
-                task_tuples_to_send,
-                chunksize=chunksize)
-
-            send_pool.close()
-            send_pool.join()
-
-            for key, command, result in key_and_async_results:
-                # Only pops when enqueued successfully, otherwise keep it
-                # and expect scheduler loop to deal with it.
-                result.backend = cached_celery_backend
-                executor.running[key] = command
-                executor.tasks[key] = result
-                executor.last_state[key] = celery_states.PENDING
-
-            executor.running['success'] = True
-            executor.running['fail'] = True
-
-            executor.end(synchronous=True)
+    @parameterized.expand(_prepare_test_bodies())
+    def test_celery_integration(self, broker_url):
+        with self._prepare_app(broker_url) as app:
+            executor = celery_executor.CeleryExecutor()
+            executor.start()
+
+            with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
+                success_command = ['true', 'some_parameter']
+                fail_command = ['false', 'some_parameter']
+
+                cached_celery_backend = celery_executor.execute_command.backend
+                task_tuples_to_send = [('success', 'fake_simple_ti', success_command,
+                                        celery_executor.celery_configuration['task_default_queue'],
+                                        celery_executor.execute_command),
+                                       ('fail', 'fake_simple_ti', fail_command,
+                                        celery_executor.celery_configuration['task_default_queue'],
+                                        celery_executor.execute_command)]
+
+                chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send))
+                num_processes = min(len(task_tuples_to_send), executor._sync_parallelism)
+
+                send_pool = Pool(processes=num_processes)
+                key_and_async_results = send_pool.map(
+                    celery_executor.send_task_to_executor,
+                    task_tuples_to_send,
+                    chunksize=chunksize)
+
+                send_pool.close()
+                send_pool.join()
+
+                for key, command, result in key_and_async_results:
+                    # Only pops when enqueued successfully, otherwise keep it
+                    # and expect scheduler loop to deal with it.
+                    result.backend = cached_celery_backend
+                    executor.running[key] = command
+                    executor.tasks[key] = result
+                    executor.last_state[key] = celery_states.PENDING
+
+                executor.running['success'] = True
+                executor.running['fail'] = True
+
+                executor.end(synchronous=True)
 
         self.assertTrue(executor.event_buffer['success'], State.SUCCESS)
         self.assertTrue(executor.event_buffer['fail'], State.FAILED)
@@ -93,31 +127,32 @@ class CeleryExecutorTest(unittest.TestCase):
     @pytest.mark.integration("rabbitmq")
     @pytest.mark.backend("mysql", "postgres")
     def test_error_sending_task(self):
-        @app.task
         def fake_execute_command():
             pass
 
-        # fake_execute_command takes no arguments while execute_command takes 1,
-        # which will cause TypeError when calling task.apply_async()
-        celery_executor.execute_command = fake_execute_command
-        executor = CeleryExecutor()
-        value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti'
-        executor.queued_tasks['key'] = value_tuple
-        executor.heartbeat()
-        self.assertEqual(1, len(executor.queued_tasks))
-        self.assertEqual(executor.queued_tasks['key'], value_tuple)
+        with self._prepare_app(execute=fake_execute_command):
+            # fake_execute_command takes no arguments while execute_command takes 1,
+            # which will cause TypeError when calling task.apply_async()
+            executor = celery_executor.CeleryExecutor()
+            value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti'
+            executor.queued_tasks['key'] = value_tuple
+            executor.heartbeat()
+        self.assertEquals(1, len(executor.queued_tasks))
+        self.assertEquals(executor.queued_tasks['key'], value_tuple)
 
     def test_exception_propagation(self):
-        @app.task
-        def fake_celery_task():
-            return {}
+        with self._prepare_app() as app:
+            @app.task
+            def fake_celery_task():
+                return {}
+
+            mock_log = mock.MagicMock()
+            executor = celery_executor.CeleryExecutor()
+            executor._log = mock_log
 
-        mock_log = mock.MagicMock()
-        executor = CeleryExecutor()
-        executor._log = mock_log
+            executor.tasks = {'key': fake_celery_task()}
+            executor.sync()
 
-        executor.tasks = {'key': fake_celery_task()}
-        executor.sync()
         assert mock_log.error.call_count == 1
         args, kwargs = mock_log.error.call_args_list[0]
         # Result of queuing is not a celery task but a dict,