You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/03 09:19:26 UTC
[GitHub] ashb closed pull request #4207: [AIRFLOW-3367] Run celery
integration test with redis broker.
ashb closed pull request #4207: [AIRFLOW-3367] Run celery integration test with redis broker.
URL: https://github.com/apache/incubator-airflow/pull/4207
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 954e17ca03..e85979dace 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -16,20 +16,22 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import os
import sys
import unittest
+import contextlib
from multiprocessing import Pool
import mock
-from celery.contrib.testing.worker import start_worker
-from airflow.executors import celery_executor
-from airflow.executors.celery_executor import CELERY_FETCH_ERR_MSG_HEADER
-from airflow.executors.celery_executor import (CeleryExecutor, celery_configuration,
- send_task_to_executor, execute_command)
-from airflow.executors.celery_executor import app
+from celery import Celery
from celery import states as celery_states
+from celery.contrib.testing.worker import start_worker
+from kombu.asynchronous import set_event_loop
+from parameterized import parameterized
+
from airflow.utils.state import State
+from airflow.executors import celery_executor
from airflow import configuration
configuration.load_test_config()
@@ -38,48 +40,80 @@
import celery.contrib.testing.tasks # noqa: F401
+def _prepare_test_bodies():
+ if 'CELERY_BROKER_URLS' in os.environ:
+ return [
+ (url, )
+ for url in os.environ['CELERY_BROKER_URLS'].split(',')
+ ]
+ return [(configuration.conf.get('celery', 'BROKER_URL'))]
+
+
class CeleryExecutorTest(unittest.TestCase):
+
+ @contextlib.contextmanager
+ def _prepare_app(self, broker_url=None, execute=None):
+ broker_url = broker_url or configuration.conf.get('celery', 'BROKER_URL')
+ execute = execute or celery_executor.execute_command.__wrapped__
+
+ test_config = dict(celery_executor.celery_configuration)
+ test_config.update({'broker_url': broker_url})
+ test_app = Celery(broker_url, config_source=test_config)
+ test_execute = test_app.task(execute)
+ patch_app = mock.patch('airflow.executors.celery_executor.app', test_app)
+ patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute)
+
+ with patch_app, patch_execute:
+ try:
+ yield test_app
+ finally:
+ # Clear event loop to tear down each celery instance
+ set_event_loop(None)
+
+ @parameterized.expand(_prepare_test_bodies())
@unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'),
"sqlite is configured with SequentialExecutor")
- def test_celery_integration(self):
- executor = CeleryExecutor()
- executor.start()
- with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
- success_command = ['true', 'some_parameter']
- fail_command = ['false', 'some_parameter']
-
- cached_celery_backend = execute_command.backend
- task_tuples_to_send = [('success', 'fake_simple_ti', success_command,
- celery_configuration['task_default_queue'],
- execute_command),
- ('fail', 'fake_simple_ti', fail_command,
- celery_configuration['task_default_queue'],
- execute_command)]
-
- chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send))
- num_processes = min(len(task_tuples_to_send), executor._sync_parallelism)
-
- send_pool = Pool(processes=num_processes)
- key_and_async_results = send_pool.map(
- send_task_to_executor,
- task_tuples_to_send,
- chunksize=chunksize)
-
- send_pool.close()
- send_pool.join()
-
- for key, command, result in key_and_async_results:
- # Only pops when enqueued successfully, otherwise keep it
- # and expect scheduler loop to deal with it.
- result.backend = cached_celery_backend
- executor.running[key] = command
- executor.tasks[key] = result
- executor.last_state[key] = celery_states.PENDING
-
- executor.running['success'] = True
- executor.running['fail'] = True
-
- executor.end(synchronous=True)
+ def test_celery_integration(self, broker_url):
+ with self._prepare_app(broker_url) as app:
+ executor = celery_executor.CeleryExecutor()
+ executor.start()
+
+ with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
+ success_command = ['true', 'some_parameter']
+ fail_command = ['false', 'some_parameter']
+
+ cached_celery_backend = celery_executor.execute_command.backend
+ task_tuples_to_send = [('success', 'fake_simple_ti', success_command,
+ celery_executor.celery_configuration['task_default_queue'],
+ celery_executor.execute_command),
+ ('fail', 'fake_simple_ti', fail_command,
+ celery_executor.celery_configuration['task_default_queue'],
+ celery_executor.execute_command)]
+
+ chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send))
+ num_processes = min(len(task_tuples_to_send), executor._sync_parallelism)
+
+ send_pool = Pool(processes=num_processes)
+ key_and_async_results = send_pool.map(
+ celery_executor.send_task_to_executor,
+ task_tuples_to_send,
+ chunksize=chunksize)
+
+ send_pool.close()
+ send_pool.join()
+
+ for key, command, result in key_and_async_results:
+ # Only pops when enqueued successfully, otherwise keep it
+ # and expect scheduler loop to deal with it.
+ result.backend = cached_celery_backend
+ executor.running[key] = command
+ executor.tasks[key] = result
+ executor.last_state[key] = celery_states.PENDING
+
+ executor.running['success'] = True
+ executor.running['fail'] = True
+
+ executor.end(synchronous=True)
self.assertTrue(executor.event_buffer['success'], State.SUCCESS)
self.assertTrue(executor.event_buffer['fail'], State.FAILED)
@@ -93,38 +127,39 @@ def test_celery_integration(self):
@unittest.skipIf('sqlite' in configuration.conf.get('core', 'sql_alchemy_conn'),
"sqlite is configured with SequentialExecutor")
def test_error_sending_task(self):
- @app.task
def fake_execute_command():
pass
- # fake_execute_command takes no arguments while execute_command takes 1,
- # which will cause TypeError when calling task.apply_async()
- celery_executor.execute_command = fake_execute_command
- executor = CeleryExecutor()
- value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti'
- executor.queued_tasks['key'] = value_tuple
- executor.heartbeat()
+ with self._prepare_app(execute=fake_execute_command):
+ # fake_execute_command takes no arguments while execute_command takes 1,
+ # which will cause TypeError when calling task.apply_async()
+ executor = celery_executor.CeleryExecutor()
+ value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti'
+ executor.queued_tasks['key'] = value_tuple
+ executor.heartbeat()
self.assertEquals(1, len(executor.queued_tasks))
self.assertEquals(executor.queued_tasks['key'], value_tuple)
def test_exception_propagation(self):
- @app.task
- def fake_celery_task():
- return {}
+ with self._prepare_app() as app:
+ @app.task
+ def fake_celery_task():
+ return {}
+
+ mock_log = mock.MagicMock()
+ executor = celery_executor.CeleryExecutor()
+ executor._log = mock_log
- mock_log = mock.MagicMock()
- executor = CeleryExecutor()
- executor._log = mock_log
+ executor.tasks = {'key': fake_celery_task()}
+ executor.sync()
- executor.tasks = {'key': fake_celery_task()}
- executor.sync()
mock_log.error.assert_called_once()
args, kwargs = mock_log.error.call_args_list[0]
log = args[0]
# Result of queuing is not a celery task but a dict,
# and it should raise AttributeError and then get propagated
# to the error log.
- self.assertIn(CELERY_FETCH_ERR_MSG_HEADER, log)
+ self.assertIn(celery_executor.CELERY_FETCH_ERR_MSG_HEADER, log)
self.assertIn('AttributeError', log)
diff --git a/tox.ini b/tox.ini
index 513bd83b1c..6065f9d072 100644
--- a/tox.ini
+++ b/tox.ini
@@ -43,6 +43,7 @@ setenv =
MINICLUSTER_HOME=/tmp/minicluster-1.1-SNAPSHOT
KRB5_CONFIG=/etc/krb5.conf
KRB5_KTNAME=/etc/airflow.keytab
+ CELERY_BROKER_URLS=amqp://guest:guest@rabbitmq:5672,redis://redis:6379/0
backend_mysql: AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql://root@mysql/airflow
backend_mysql: AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://root@mysql/airflow
backend_postgres: AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://postgres:airflow@postgres/airflow
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services