You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/05/22 11:21:18 UTC
[airflow] 01/02: [AIRFLOW-3367] Run celery integration test with
redis broker. (#4207)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d13c6810674dae4abbc572116852e6b331c9f40e
Author: Joshua Carp <jm...@gmail.com>
AuthorDate: Mon Dec 3 04:19:25 2018 -0500
[AIRFLOW-3367] Run celery integration test with redis broker. (#4207)
(cherry picked from commit 5710ef2615ad7a24c4b039f491e0fabd942978b3)
---
tests/executors/test_celery_executor.py | 165 +++++++++++++++++++-------------
1 file changed, 100 insertions(+), 65 deletions(-)
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index f76e588..511c90d 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -16,69 +16,103 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import contextlib
+import os
import sys
import unittest
from multiprocessing import Pool
+# leave this it is used by the test worker
+# noinspection PyUnresolvedReferences
+import celery.contrib.testing.tasks # noqa: F401 pylint: disable=unused-import
import mock
-
-from celery.contrib.testing.worker import start_worker
import pytest
+from celery import Celery
from celery import states as celery_states
+from celery.contrib.testing.worker import start_worker
+from kombu.asynchronous import set_event_loop
+from parameterized import parameterized
+from airflow.configuration import conf
from airflow.executors import celery_executor
-from airflow.executors.celery_executor import (CeleryExecutor, celery_configuration,
- send_task_to_executor, execute_command)
-from airflow.executors.celery_executor import app
from airflow.utils.state import State
-# leave this it is used by the test worker
-import celery.contrib.testing.tasks # noqa: F401 pylint: disable=ungrouped-imports
+
+def _prepare_test_bodies():
+ if 'CELERY_BROKER_URLS' in os.environ:
+ return [
+ (url, )
+ for url in os.environ['CELERY_BROKER_URLS'].split(',')
+ ]
+ return [(conf.get('celery', 'BROKER_URL'))]
-class CeleryExecutorTest(unittest.TestCase):
+class TestCeleryExecutor(unittest.TestCase):
+
+ @contextlib.contextmanager
+ def _prepare_app(self, broker_url=None, execute=None):
+ broker_url = broker_url or conf.get('celery', 'BROKER_URL')
+ execute = execute or celery_executor.execute_command.__wrapped__
+
+ test_config = dict(celery_executor.celery_configuration)
+ test_config.update({'broker_url': broker_url})
+ test_app = Celery(broker_url, config_source=test_config)
+ test_execute = test_app.task(execute)
+ patch_app = mock.patch('airflow.executors.celery_executor.app', test_app)
+ patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute)
+
+ with patch_app, patch_execute:
+ try:
+ yield test_app
+ finally:
+ # Clear event loop to tear down each celery instance
+ set_event_loop(None)
+
@pytest.mark.integration("redis")
@pytest.mark.integration("rabbitmq")
@pytest.mark.backend("mysql", "postgres")
- def test_celery_integration(self):
- executor = CeleryExecutor()
- executor.start()
- with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
- success_command = ['true', 'some_parameter']
- fail_command = ['false', 'some_parameter']
-
- cached_celery_backend = execute_command.backend
- task_tuples_to_send = [('success', 'fake_simple_ti', success_command,
- celery_configuration['task_default_queue'],
- execute_command),
- ('fail', 'fake_simple_ti', fail_command,
- celery_configuration['task_default_queue'],
- execute_command)]
-
- chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send))
- num_processes = min(len(task_tuples_to_send), executor._sync_parallelism)
-
- send_pool = Pool(processes=num_processes)
- key_and_async_results = send_pool.map(
- send_task_to_executor,
- task_tuples_to_send,
- chunksize=chunksize)
-
- send_pool.close()
- send_pool.join()
-
- for key, command, result in key_and_async_results:
- # Only pops when enqueued successfully, otherwise keep it
- # and expect scheduler loop to deal with it.
- result.backend = cached_celery_backend
- executor.running[key] = command
- executor.tasks[key] = result
- executor.last_state[key] = celery_states.PENDING
-
- executor.running['success'] = True
- executor.running['fail'] = True
-
- executor.end(synchronous=True)
+ @parameterized.expand(_prepare_test_bodies())
+ def test_celery_integration(self, broker_url):
+ with self._prepare_app(broker_url) as app:
+ executor = celery_executor.CeleryExecutor()
+ executor.start()
+
+ with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
+ success_command = ['true', 'some_parameter']
+ fail_command = ['false', 'some_parameter']
+
+ cached_celery_backend = celery_executor.execute_command.backend
+ task_tuples_to_send = [('success', 'fake_simple_ti', success_command,
+ celery_executor.celery_configuration['task_default_queue'],
+ celery_executor.execute_command),
+ ('fail', 'fake_simple_ti', fail_command,
+ celery_executor.celery_configuration['task_default_queue'],
+ celery_executor.execute_command)]
+
+ chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send))
+ num_processes = min(len(task_tuples_to_send), executor._sync_parallelism)
+
+ send_pool = Pool(processes=num_processes)
+ key_and_async_results = send_pool.map(
+ celery_executor.send_task_to_executor,
+ task_tuples_to_send,
+ chunksize=chunksize)
+
+ send_pool.close()
+ send_pool.join()
+
+ for key, command, result in key_and_async_results:
+ # Only pops when enqueued successfully, otherwise keep it
+ # and expect scheduler loop to deal with it.
+ result.backend = cached_celery_backend
+ executor.running[key] = command
+ executor.tasks[key] = result
+ executor.last_state[key] = celery_states.PENDING
+
+ executor.running['success'] = True
+ executor.running['fail'] = True
+
+ executor.end(synchronous=True)
self.assertTrue(executor.event_buffer['success'], State.SUCCESS)
self.assertTrue(executor.event_buffer['fail'], State.FAILED)
@@ -93,31 +127,32 @@ class CeleryExecutorTest(unittest.TestCase):
@pytest.mark.integration("rabbitmq")
@pytest.mark.backend("mysql", "postgres")
def test_error_sending_task(self):
- @app.task
def fake_execute_command():
pass
- # fake_execute_command takes no arguments while execute_command takes 1,
- # which will cause TypeError when calling task.apply_async()
- celery_executor.execute_command = fake_execute_command
- executor = CeleryExecutor()
- value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti'
- executor.queued_tasks['key'] = value_tuple
- executor.heartbeat()
- self.assertEqual(1, len(executor.queued_tasks))
- self.assertEqual(executor.queued_tasks['key'], value_tuple)
+ with self._prepare_app(execute=fake_execute_command):
+ # fake_execute_command takes no arguments while execute_command takes 1,
+ # which will cause TypeError when calling task.apply_async()
+ executor = celery_executor.CeleryExecutor()
+ value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti'
+ executor.queued_tasks['key'] = value_tuple
+ executor.heartbeat()
+ self.assertEquals(1, len(executor.queued_tasks))
+ self.assertEquals(executor.queued_tasks['key'], value_tuple)
def test_exception_propagation(self):
- @app.task
- def fake_celery_task():
- return {}
+ with self._prepare_app() as app:
+ @app.task
+ def fake_celery_task():
+ return {}
+
+ mock_log = mock.MagicMock()
+ executor = celery_executor.CeleryExecutor()
+ executor._log = mock_log
- mock_log = mock.MagicMock()
- executor = CeleryExecutor()
- executor._log = mock_log
+ executor.tasks = {'key': fake_celery_task()}
+ executor.sync()
- executor.tasks = {'key': fake_celery_task()}
- executor.sync()
assert mock_log.error.call_count == 1
args, kwargs = mock_log.error.call_args_list[0]
# Result of queuing is not a celery task but a dict,