You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/08 07:06:54 UTC

[GitHub] [airflow] kosteev commented on a change in pull request #16311: Prevent running `airflow db init` migrations and setup in parallel.

kosteev commented on a change in pull request #16311:
URL: https://github.com/apache/airflow/pull/16311#discussion_r647161460



##########
File path: airflow/utils/db.py
##########
@@ -561,10 +571,21 @@ def create_default_connections(session=None):
     )
 
 
-def initdb():
+@provide_session
+def initdb(session=None):
     """Initialize Airflow database."""
+    if session.connection().dialect.name == 'postgresql':
+        log.info('Acquiring lock on database')
+        session.connection().execute('select PG_ADVISORY_LOCK(1);')
+
     upgradedb()
 
+    if session.connection().dialect.name == 'mysql' and session.connection().dialect.server_version_info >= (
+        5,
+        6,
+    ):
+        session.connection().execute("select GET_LOCK('db_init',1800);")

Review comment:
       should it be before `upgradedb()` call?

##########
File path: airflow/migrations/env.py
##########
@@ -101,9 +101,6 @@ def run_migrations_online():
         with context.begin_transaction():
             if connection.dialect.name == 'mysql' and connection.dialect.server_version_info >= (5, 6):
                 connection.execute("select GET_LOCK('alembic',1800);")
-            if connection.dialect.name == 'postgresql':
-                context.get_context()._ensure_version_table()  # pylint: disable=protected-access
-                connection.execute("LOCK TABLE alembic_version IN ACCESS EXCLUSIVE MODE")

Review comment:
       Looks like comment in 110 line now becomes redundant

##########
File path: airflow/utils/db.py
##########
@@ -69,6 +69,11 @@ def merge_conn(conn, session=None):
 @provide_session
 def add_default_pool_if_not_exists(session=None):
     """Add default pool if it does not exist."""
+    if session.connection().dialect.name == 'mysql' and session.connection().dialect.server_version_info >= (
+        5,
+        6,
+    ):
+        session.connection().execute("select GET_LOCK('pool',1800);")

Review comment:
       Do we need lock here for postgres as well?

##########
File path: airflow/utils/db.py
##########
@@ -561,10 +571,21 @@ def create_default_connections(session=None):
     )
 
 
-def initdb():
+@provide_session
+def initdb(session=None):
     """Initialize Airflow database."""
+    if session.connection().dialect.name == 'postgresql':
+        log.info('Acquiring lock on database')
+        session.connection().execute('select PG_ADVISORY_LOCK(1);')
+
     upgradedb()
 
+    if session.connection().dialect.name == 'mysql' and session.connection().dialect.server_version_info >= (
+        5,
+        6,
+    ):
+        session.connection().execute("select GET_LOCK('db_init',1800);")

Review comment:
       I would also keep order of `ifs` consistent on acquiring/releasing lock, e.g. first postgresql then mysql

##########
File path: airflow/migrations/env.py
##########
@@ -101,9 +101,6 @@ def run_migrations_online():
         with context.begin_transaction():
             if connection.dialect.name == 'mysql' and connection.dialect.server_version_info >= (5, 6):
                 connection.execute("select GET_LOCK('alembic',1800);")

Review comment:
       It is not clear why lock is here for mysql, but no longer for postgresql.

##########
File path: airflow/utils/db.py
##########
@@ -69,6 +69,11 @@ def merge_conn(conn, session=None):
 @provide_session
 def add_default_pool_if_not_exists(session=None):
     """Add default pool if it does not exist."""
+    if session.connection().dialect.name == 'mysql' and session.connection().dialect.server_version_info >= (
+        5,
+        6,
+    ):
+        session.connection().execute("select GET_LOCK('pool',1800);")

Review comment:
       What do you think about wrapping entire `upgradedb` command to be consistent with `initdb` and avoid locks inside nested methods?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org