You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/05/22 11:28:48 UTC

[airflow] branch v1-10-test updated (24b7416 -> 3f3a4b5)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


 discard 24b7416  Fix race in Celery tests by pre-creating result tables (#8909)
 discard d13c681  [AIRFLOW-3367] Run celery integration test with redis broker. (#4207)
     new 229f631  [AIRFLOW-3367] Run celery integration test with redis broker. (#4207)
     new 3f3a4b5  Fix race in Celery tests by pre-creating result tables (#8909)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (24b7416)
            \
             N -- N -- N   refs/heads/v1-10-test (3f3a4b5)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 tests/executors/test_celery_executor.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[airflow] 02/02: Fix race in Celery tests by pre-creating result tables (#8909)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3f3a4b5e2a0fb256c023d6087026608697f40a2f
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Tue May 19 13:21:44 2020 +0100

    Fix race in Celery tests by pre-creating result tables (#8909)
    
    We noticed our Celery tests failing sometimes with
    
    > (psycopg2.errors.UniqueViolation) duplicate key value violates unique
    > constraint "pg_type_typname_nsp_index"
    > DETAIL:  Key (typname, typnamespace)=(celery_tasksetmeta, 2200) already exists
    
    It appears this is a race condition in SQLAlchemy's "create_all()"
    function, where it first checks which tables exist, builds up a list of
    `CREATE TABLE` statements, then issues them. Thus if two celery worker
    processes start at the same time, they will find the the table doesn't
    yet exist, and both try to create it.
    
    This is _probably_ a bug in SQLA, but this should be an easy enough fix
    here, to just ensure that the table exists before launching any Celery tasks.
    
    (cherry picked from commit bae5cc2f5ca32e0f61c3b92008fbd484184448ef)
---
 tests/executors/test_celery_executor.py | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index dcf9910..d9a15c7 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -61,6 +61,17 @@ class TestCeleryExecutor(unittest.TestCase):
         patch_app = mock.patch('airflow.executors.celery_executor.app', test_app)
         patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute)
 
+        backend = test_app.backend
+
+        if hasattr(backend, 'ResultSession'):
+            # Pre-create the database tables now, otherwise SQLA vis Celery has a
+            # race condition where it one of the subprocesses can die with "Table
+            # already exists" error, because SQLA checks for which tables exist,
+            # then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT
+            # EXISTS
+            session = backend.ResultSession()
+            session.close()
+
         with patch_app, patch_execute:
             try:
                 yield test_app
@@ -140,6 +151,7 @@ class TestCeleryExecutor(unittest.TestCase):
         self.assertEquals(1, len(executor.queued_tasks))
         self.assertEquals(executor.queued_tasks['key'], value_tuple)
 
+    @pytest.mark.backend("mysql", "postgres")
     def test_exception_propagation(self):
         with self._prepare_app() as app:
             @app.task


[airflow] 01/02: [AIRFLOW-3367] Run celery integration test with redis broker. (#4207)

Posted by as...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 229f631f2dafa7a698e634a614c8fa4102f68b38
Author: Joshua Carp <jm...@gmail.com>
AuthorDate: Mon Dec 3 04:19:25 2018 -0500

    [AIRFLOW-3367] Run celery integration test with redis broker. (#4207)
    
    (cherry picked from commit 5710ef2615ad7a24c4b039f491e0fabd942978b3)
---
 tests/executors/test_celery_executor.py | 165 +++++++++++++++++++-------------
 1 file changed, 100 insertions(+), 65 deletions(-)

diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index f76e588..dcf9910 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -16,69 +16,103 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import contextlib
+import os
 import sys
 import unittest
 from multiprocessing import Pool
 
+# leave this it is used by the test worker
+# noinspection PyUnresolvedReferences
+import celery.contrib.testing.tasks  # noqa: F401 pylint: disable=unused-import
 import mock
-
-from celery.contrib.testing.worker import start_worker
 import pytest
+from celery import Celery
 from celery import states as celery_states
+from celery.contrib.testing.worker import start_worker
+from kombu.asynchronous import set_event_loop
+from parameterized import parameterized
 
+from airflow.configuration import conf
 from airflow.executors import celery_executor
-from airflow.executors.celery_executor import (CeleryExecutor, celery_configuration,
-                                               send_task_to_executor, execute_command)
-from airflow.executors.celery_executor import app
 from airflow.utils.state import State
 
-# leave this it is used by the test worker
-import celery.contrib.testing.tasks  # noqa: F401 pylint: disable=ungrouped-imports
+
+def _prepare_test_bodies():
+    if 'CELERY_BROKER_URLS' in os.environ:
+        return [
+            (url, )
+            for url in os.environ['CELERY_BROKER_URLS'].split(',')
+        ]
+    return [(conf.get('celery', 'BROKER_URL'))]
 
 
-class CeleryExecutorTest(unittest.TestCase):
+class TestCeleryExecutor(unittest.TestCase):
+
+    @contextlib.contextmanager
+    def _prepare_app(self, broker_url=None, execute=None):
+        broker_url = broker_url or conf.get('celery', 'BROKER_URL')
+        execute = execute or celery_executor.execute_command.__wrapped__
+
+        test_config = dict(celery_executor.celery_configuration)
+        test_config.update({'broker_url': broker_url})
+        test_app = Celery(broker_url, config_source=test_config)
+        test_execute = test_app.task(execute)
+        patch_app = mock.patch('airflow.executors.celery_executor.app', test_app)
+        patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute)
+
+        with patch_app, patch_execute:
+            try:
+                yield test_app
+            finally:
+                # Clear event loop to tear down each celery instance
+                set_event_loop(None)
+
+    @parameterized.expand(_prepare_test_bodies())
     @pytest.mark.integration("redis")
     @pytest.mark.integration("rabbitmq")
     @pytest.mark.backend("mysql", "postgres")
-    def test_celery_integration(self):
-        executor = CeleryExecutor()
-        executor.start()
-        with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
-            success_command = ['true', 'some_parameter']
-            fail_command = ['false', 'some_parameter']
-
-            cached_celery_backend = execute_command.backend
-            task_tuples_to_send = [('success', 'fake_simple_ti', success_command,
-                                    celery_configuration['task_default_queue'],
-                                    execute_command),
-                                   ('fail', 'fake_simple_ti', fail_command,
-                                    celery_configuration['task_default_queue'],
-                                    execute_command)]
-
-            chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send))
-            num_processes = min(len(task_tuples_to_send), executor._sync_parallelism)
-
-            send_pool = Pool(processes=num_processes)
-            key_and_async_results = send_pool.map(
-                send_task_to_executor,
-                task_tuples_to_send,
-                chunksize=chunksize)
-
-            send_pool.close()
-            send_pool.join()
-
-            for key, command, result in key_and_async_results:
-                # Only pops when enqueued successfully, otherwise keep it
-                # and expect scheduler loop to deal with it.
-                result.backend = cached_celery_backend
-                executor.running[key] = command
-                executor.tasks[key] = result
-                executor.last_state[key] = celery_states.PENDING
-
-            executor.running['success'] = True
-            executor.running['fail'] = True
-
-            executor.end(synchronous=True)
+    def test_celery_integration(self, broker_url):
+        with self._prepare_app(broker_url) as app:
+            executor = celery_executor.CeleryExecutor()
+            executor.start()
+
+            with start_worker(app=app, logfile=sys.stdout, loglevel='debug'):
+                success_command = ['true', 'some_parameter']
+                fail_command = ['false', 'some_parameter']
+
+                cached_celery_backend = celery_executor.execute_command.backend
+                task_tuples_to_send = [('success', 'fake_simple_ti', success_command,
+                                        celery_executor.celery_configuration['task_default_queue'],
+                                        celery_executor.execute_command),
+                                       ('fail', 'fake_simple_ti', fail_command,
+                                        celery_executor.celery_configuration['task_default_queue'],
+                                        celery_executor.execute_command)]
+
+                chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send))
+                num_processes = min(len(task_tuples_to_send), executor._sync_parallelism)
+
+                send_pool = Pool(processes=num_processes)
+                key_and_async_results = send_pool.map(
+                    celery_executor.send_task_to_executor,
+                    task_tuples_to_send,
+                    chunksize=chunksize)
+
+                send_pool.close()
+                send_pool.join()
+
+                for key, command, result in key_and_async_results:
+                    # Only pops when enqueued successfully, otherwise keep it
+                    # and expect scheduler loop to deal with it.
+                    result.backend = cached_celery_backend
+                    executor.running[key] = command
+                    executor.tasks[key] = result
+                    executor.last_state[key] = celery_states.PENDING
+
+                executor.running['success'] = True
+                executor.running['fail'] = True
+
+                executor.end(synchronous=True)
 
         self.assertTrue(executor.event_buffer['success'], State.SUCCESS)
         self.assertTrue(executor.event_buffer['fail'], State.FAILED)
@@ -93,31 +127,32 @@ class CeleryExecutorTest(unittest.TestCase):
     @pytest.mark.integration("rabbitmq")
     @pytest.mark.backend("mysql", "postgres")
     def test_error_sending_task(self):
-        @app.task
         def fake_execute_command():
             pass
 
-        # fake_execute_command takes no arguments while execute_command takes 1,
-        # which will cause TypeError when calling task.apply_async()
-        celery_executor.execute_command = fake_execute_command
-        executor = CeleryExecutor()
-        value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti'
-        executor.queued_tasks['key'] = value_tuple
-        executor.heartbeat()
-        self.assertEqual(1, len(executor.queued_tasks))
-        self.assertEqual(executor.queued_tasks['key'], value_tuple)
+        with self._prepare_app(execute=fake_execute_command):
+            # fake_execute_command takes no arguments while execute_command takes 1,
+            # which will cause TypeError when calling task.apply_async()
+            executor = celery_executor.CeleryExecutor()
+            value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti'
+            executor.queued_tasks['key'] = value_tuple
+            executor.heartbeat()
+        self.assertEquals(1, len(executor.queued_tasks))
+        self.assertEquals(executor.queued_tasks['key'], value_tuple)
 
     def test_exception_propagation(self):
-        @app.task
-        def fake_celery_task():
-            return {}
+        with self._prepare_app() as app:
+            @app.task
+            def fake_celery_task():
+                return {}
+
+            mock_log = mock.MagicMock()
+            executor = celery_executor.CeleryExecutor()
+            executor._log = mock_log
 
-        mock_log = mock.MagicMock()
-        executor = CeleryExecutor()
-        executor._log = mock_log
+            executor.tasks = {'key': fake_celery_task()}
+            executor.sync()
 
-        executor.tasks = {'key': fake_celery_task()}
-        executor.sync()
         assert mock_log.error.call_count == 1
         args, kwargs = mock_log.error.call_args_list[0]
         # Result of queuing is not a celery task but a dict,