You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/05/22 11:21:19 UTC

[airflow] 02/02: Fix race in Celery tests by pre-creating result tables (#8909)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 24b74168afe26500abaf82fd388eccee18611c4e
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Tue May 19 13:21:44 2020 +0100

    Fix race in Celery tests by pre-creating result tables (#8909)
    
    We noticed our Celery tests failing sometimes with
    
    > (psycopg2.errors.UniqueViolation) duplicate key value violates unique
    > constraint "pg_type_typname_nsp_index"
    > DETAIL:  Key (typname, typnamespace)=(celery_tasksetmeta, 2200) already exists
    
    It appears this is a race condition in SQLAlchemy's "create_all()"
    function, where it first checks which tables exist, builds up a list of
    `CREATE TABLE` statements, then issues them. Thus if two celery worker
    processes start at the same time, they will find the the table doesn't
    yet exist, and both try to create it.
    
    This is _probably_ a bug in SQLA, but this should be an easy enough fix
    here, to just ensure that the table exists before launching any Celery tasks.
    
    (cherry picked from commit bae5cc2f5ca32e0f61c3b92008fbd484184448ef)
---
 tests/executors/test_celery_executor.py | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py
index 511c90d..80671cd 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -61,6 +61,17 @@ class TestCeleryExecutor(unittest.TestCase):
         patch_app = mock.patch('airflow.executors.celery_executor.app', test_app)
         patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute)
 
+        backend = test_app.backend
+
+        if hasattr(backend, 'ResultSession'):
+            # Pre-create the database tables now, otherwise SQLA vis Celery has a
+            # race condition where it one of the subprocesses can die with "Table
+            # already exists" error, because SQLA checks for which tables exist,
+            # then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT
+            # EXISTS
+            session = backend.ResultSession()
+            session.close()
+
         with patch_app, patch_execute:
             try:
                 yield test_app
@@ -140,6 +151,7 @@ class TestCeleryExecutor(unittest.TestCase):
         self.assertEquals(1, len(executor.queued_tasks))
         self.assertEquals(executor.queued_tasks['key'], value_tuple)
 
+    @pytest.mark.backend("mysql", "postgres")
     def test_exception_propagation(self):
         with self._prepare_app() as app:
             @app.task