You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/03 09:19:26 UTC

[GitHub] ashb closed pull request #4207: [AIRFLOW-3367] Run celery integration test with redis broker.

ashb closed pull request #4207: [AIRFLOW-3367] Run celery integration test with redis broker.
URL: https://github.com/apache/incubator-airflow/pull/4207
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 954e17ca03..e85979dace 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -16,20 +16,22 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import os
 import sys
 import unittest
+import contextlib
 from multiprocessing import Pool
 
 import mock
-from celery.contrib.testing.worker import start_worker
 
-from airflow.executors import celery_executor
-from airflow.executors.celery_executor import CELERY_FETCH_ERR_MSG_HEADER
-from airflow.executors.celery_executor import (CeleryExecutor, celery_configuration,
-                                               send_task_to_executor, execute_command)
-from airflow.executors.celery_executor import app
+from celery import Celery
 from celery import states as celery_states
+from celery.contrib.testing.worker import start_worker
+from kombu.asynchronous import set_event_loop
+from parameterized import parameterized
+
 from airflow.utils.state import State
+from airflow.executors import celery_executor
 
 from airflow import configuration
 configuration.load_test_config()
@@ -38,48 +40,80 @@
 import celery.contrib.testing.tasks  # noqa: F401
 
 
+def _prepare_test_bodies():
+    if 'CELERY_BROKER_URLS' in os.environ:
+        return [
+            (url, )
+            for url in os.environ['CELERY_BROKER_URLS'].split(',')
+        ]
+    return [(configuration.conf.get('celery', 'BROKER_URL'))]
+
+
 class CeleryExecutorTest(unittest.TestCase):
+
+    @contextlib.contextmanager
+    def _prepare_app(self, broker_url=None, execute=None):
+        broker_url = broker_url or configuration.conf.get('celery', 'BROKER_URL')
+        execute = execute or celery_executor.execute_command.__wrapped__
+
+        test_config = dict(celery_executor.celery_configuration)
+        test_config.update({'broker_url': broker_url})
+        test_app = Celery(broker_url, config_source=test_config)
+        test_execute = test_app.task(execute)
+        patch_app = mock.patch('airflow.executors.celery_executor.app', test_app)
+        patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute)
+
+        with patch_app, patch_execute:
+            try:
+                yield test_app
+            finally:
+                # Clear event loop to tear down each celery instance
+                set_event_loop(None)
+
+    @parameterized.expand(_prepare_test_bodies())
     @unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'),
                      "sqlite is configured with SequentialExecutor")
-    def test_celery_integration(self):
-        executor = CeleryExecutor()
-        executor.start()
-        with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
-            success_command = ['true', 'some_parameter']
-            fail_command = ['false', 'some_parameter']
-
-            cached_celery_backend = execute_command.backend
-            task_tuples_to_send = [('success', 'fake_simple_ti', success_command,
-                                    celery_configuration['task_default_queue'],
-                                    execute_command),
-                                   ('fail', 'fake_simple_ti', fail_command,
-                                    celery_configuration['task_default_queue'],
-                                    execute_command)]
-
-            chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send))
-            num_processes = min(len(task_tuples_to_send), executor._sync_parallelism)
-
-            send_pool = Pool(processes=num_processes)
-            key_and_async_results = send_pool.map(
-                send_task_to_executor,
-                task_tuples_to_send,
-                chunksize=chunksize)
-
-            send_pool.close()
-            send_pool.join()
-
-            for key, command, result in key_and_async_results:
-                # Only pops when enqueued successfully, otherwise keep it
-                # and expect scheduler loop to deal with it.
-                result.backend = cached_celery_backend
-                executor.running[key] = command
-                executor.tasks[key] = result
-                executor.last_state[key] = celery_states.PENDING
-
-            executor.running['success'] = True
-            executor.running['fail'] = True
-
-            executor.end(synchronous=True)
+    def test_celery_integration(self, broker_url):
+        with self._prepare_app(broker_url) as app:
+            executor = celery_executor.CeleryExecutor()
+            executor.start()
+
+            with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
+                success_command = ['true', 'some_parameter']
+                fail_command = ['false', 'some_parameter']
+
+                cached_celery_backend = celery_executor.execute_command.backend
+                task_tuples_to_send = [('success', 'fake_simple_ti', success_command,
+                                        celery_executor.celery_configuration['task_default_queue'],
+                                        celery_executor.execute_command),
+                                       ('fail', 'fake_simple_ti', fail_command,
+                                        celery_executor.celery_configuration['task_default_queue'],
+                                        celery_executor.execute_command)]
+
+                chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send))
+                num_processes = min(len(task_tuples_to_send), executor._sync_parallelism)
+
+                send_pool = Pool(processes=num_processes)
+                key_and_async_results = send_pool.map(
+                    celery_executor.send_task_to_executor,
+                    task_tuples_to_send,
+                    chunksize=chunksize)
+
+                send_pool.close()
+                send_pool.join()
+
+                for key, command, result in key_and_async_results:
+                    # Only pops when enqueued successfully, otherwise keep it
+                    # and expect scheduler loop to deal with it.
+                    result.backend = cached_celery_backend
+                    executor.running[key] = command
+                    executor.tasks[key] = result
+                    executor.last_state[key] = celery_states.PENDING
+
+                executor.running['success'] = True
+                executor.running['fail'] = True
+
+                executor.end(synchronous=True)
 
         self.assertTrue(executor.event_buffer['success'], State.SUCCESS)
         self.assertTrue(executor.event_buffer['fail'], State.FAILED)
@@ -93,38 +127,39 @@ def test_celery_integration(self):
     @unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'),
                      "sqlite is configured with SequentialExecutor")
     def test_error_sending_task(self):
-        @app.task
         def fake_execute_command():
             pass
 
-        # fake_execute_command takes no arguments while execute_command takes 1,
-        # which will cause TypeError when calling task.apply_async()
-        celery_executor.execute_command = fake_execute_command
-        executor = CeleryExecutor()
-        value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti'
-        executor.queued_tasks['key'] = value_tuple
-        executor.heartbeat()
+        with self._prepare_app(execute=fake_execute_command):
+            # fake_execute_command takes no arguments while execute_command takes 1,
+            # which will cause TypeError when calling task.apply_async()
+            executor = celery_executor.CeleryExecutor()
+            value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti'
+            executor.queued_tasks['key'] = value_tuple
+            executor.heartbeat()
         self.assertEquals(1, len(executor.queued_tasks))
         self.assertEquals(executor.queued_tasks['key'], value_tuple)
 
     def test_exception_propagation(self):
-        @app.task
-        def fake_celery_task():
-            return {}
+        with self._prepare_app() as app:
+            @app.task
+            def fake_celery_task():
+                return {}
+
+            mock_log = mock.MagicMock()
+            executor = celery_executor.CeleryExecutor()
+            executor._log = mock_log
 
-        mock_log = mock.MagicMock()
-        executor = CeleryExecutor()
-        executor._log = mock_log
+            executor.tasks = {'key': fake_celery_task()}
+            executor.sync()
 
-        executor.tasks = {'key': fake_celery_task()}
-        executor.sync()
         mock_log.error.assert_called_once()
         args, kwargs = mock_log.error.call_args_list[0]
         log = args[0]
         # Result of queuing is not a celery task but a dict,
         # and it should raise AttributeError and then get propagated
         # to the error log.
-        self.assertIn(CELERY_FETCH_ERR_MSG_HEADER, log)
+        self.assertIn(celery_executor.CELERY_FETCH_ERR_MSG_HEADER, log)
         self.assertIn('AttributeError', log)
 
 
diff --git a/tox.ini b/tox.ini
index 513bd83b1c..6065f9d072 100644
--- a/tox.ini
+++ b/tox.ini
@@ -43,6 +43,7 @@ setenv =
   MINICLUSTER_HOME=/tmp/minicluster-1.1-SNAPSHOT
   KRB5_CONFIG=/etc/krb5.conf
   KRB5_KTNAME=/etc/airflow.keytab
+  CELERY_BROKER_URLS=amqp://guest:guest@rabbitmq:5672,redis://redis:6379/0
   backend_mysql: AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql://root@mysql/airflow
   backend_mysql: AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://root@mysql/airflow
   backend_postgres: AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://postgres:airflow@postgres/airflow


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services