You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/07/19 09:51:51 UTC

[airflow] branch main updated: Prevent running `airflow db init\upgrade` migrations and setup in parallel. (#17078)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fbc945d  Prevent running `airflow db init\upgrade` migrations and setup in parallel. (#17078)
fbc945d is described below

commit fbc945d2a2046feda18e7a1a902a318dab9e6fd2
Author: Anita Fronczak <af...@google.com>
AuthorDate: Mon Jul 19 11:51:35 2021 +0200

    Prevent running `airflow db init\upgrade` migrations and setup in parallel. (#17078)
---
 airflow/utils/db.py      | 23 ++++++++++++++---------
 airflow/utils/session.py |  4 +++-
 2 files changed, 17 insertions(+), 10 deletions(-)

diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 055b1ce..9c6cfa9 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -575,18 +575,23 @@ def create_default_connections(session=None):
 @provide_session
 def initdb(session=None):
     """Initialize Airflow database."""
-    with create_global_lock(session=session):
-        upgradedb()
+    upgradedb(session=session)
+    filldb()
+
 
-        if conf.getboolean('core', 'LOAD_DEFAULT_CONNECTIONS'):
-            create_default_connections()
+@provide_session
+def filldb(session=None):
+    if conf.getboolean('core', 'LOAD_DEFAULT_CONNECTIONS'):
+        create_default_connections(session=session)
+
+    with create_global_lock(session=session):
 
         dagbag = DagBag()
         # Save DAGs in the ORM
-        dagbag.sync_to_db()
+        dagbag.sync_to_db(session=session)
 
         # Deactivate the unknown ones
-        DAG.deactivate_unknown_dags(dagbag.dags.keys())
+        DAG.deactivate_unknown_dags(dagbag.dags.keys(), session=session)
 
         from flask_appbuilder.models.sqla import Base
 
@@ -718,7 +723,7 @@ def upgradedb(session=None):
         return
     with create_global_lock(session=session, pg_lock_id=2, lock_name="upgrade"):
         command.upgrade(config, 'heads')
-        add_default_pool_if_not_exists()
+    add_default_pool_if_not_exists()
 
 
 @provide_session
@@ -728,11 +733,11 @@ def resetdb(session=None):
 
     connection = settings.engine.connect()
 
-    with create_global_lock(session=session, pg_lock_id=3, lock_name="reset"):
+    with create_global_lock(session=session, pg_lock_id=4, lock_name="reset"):
         drop_airflow_models(connection)
         drop_flask_models(connection)
 
-        initdb()
+    initdb(session=session)
 
 
 def drop_airflow_models(connection):
diff --git a/airflow/utils/session.py b/airflow/utils/session.py
index 70c08f1..ce052de 100644
--- a/airflow/utils/session.py
+++ b/airflow/utils/session.py
@@ -104,4 +104,6 @@ def create_global_lock(session=None, pg_lock_id=1, lock_name='init', mysql_lock_
             session.connection().execute(f"select RELEASE_LOCK('{lock_name}');")
 
         if dialect.name == 'mssql':
-            session.connection().execute(f"sp_releaseapplock @Resource = '{lock_name}';")
+            session.connection().execute(
+                f"sp_releaseapplock @Resource = '{lock_name}', @LockOwner = 'Session';"
+            )