You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/07/19 09:51:51 UTC
[airflow] branch main updated: Prevent running `airflow db
init\upgrade` migrations and setup in parallel. (#17078)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fbc945d Prevent running `airflow db init\upgrade` migrations and setup in parallel. (#17078)
fbc945d is described below
commit fbc945d2a2046feda18e7a1a902a318dab9e6fd2
Author: Anita Fronczak <af...@google.com>
AuthorDate: Mon Jul 19 11:51:35 2021 +0200
Prevent running `airflow db init\upgrade` migrations and setup in parallel. (#17078)
---
airflow/utils/db.py | 23 ++++++++++++++---------
airflow/utils/session.py | 4 +++-
2 files changed, 17 insertions(+), 10 deletions(-)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 055b1ce..9c6cfa9 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -575,18 +575,23 @@ def create_default_connections(session=None):
@provide_session
def initdb(session=None):
"""Initialize Airflow database."""
- with create_global_lock(session=session):
- upgradedb()
+ upgradedb(session=session)
+ filldb()
+
- if conf.getboolean('core', 'LOAD_DEFAULT_CONNECTIONS'):
- create_default_connections()
+@provide_session
+def filldb(session=None):
+ if conf.getboolean('core', 'LOAD_DEFAULT_CONNECTIONS'):
+ create_default_connections(session=session)
+
+ with create_global_lock(session=session):
dagbag = DagBag()
# Save DAGs in the ORM
- dagbag.sync_to_db()
+ dagbag.sync_to_db(session=session)
# Deactivate the unknown ones
- DAG.deactivate_unknown_dags(dagbag.dags.keys())
+ DAG.deactivate_unknown_dags(dagbag.dags.keys(), session=session)
from flask_appbuilder.models.sqla import Base
@@ -718,7 +723,7 @@ def upgradedb(session=None):
return
with create_global_lock(session=session, pg_lock_id=2, lock_name="upgrade"):
command.upgrade(config, 'heads')
- add_default_pool_if_not_exists()
+ add_default_pool_if_not_exists()
@provide_session
@@ -728,11 +733,11 @@ def resetdb(session=None):
connection = settings.engine.connect()
- with create_global_lock(session=session, pg_lock_id=3, lock_name="reset"):
+ with create_global_lock(session=session, pg_lock_id=4, lock_name="reset"):
drop_airflow_models(connection)
drop_flask_models(connection)
- initdb()
+ initdb(session=session)
def drop_airflow_models(connection):
diff --git a/airflow/utils/session.py b/airflow/utils/session.py
index 70c08f1..ce052de 100644
--- a/airflow/utils/session.py
+++ b/airflow/utils/session.py
@@ -104,4 +104,6 @@ def create_global_lock(session=None, pg_lock_id=1, lock_name='init', mysql_lock_
session.connection().execute(f"select RELEASE_LOCK('{lock_name}');")
if dialect.name == 'mssql':
- session.connection().execute(f"sp_releaseapplock @Resource = '{lock_name}';")
+ session.connection().execute(
+ f"sp_releaseapplock @Resource = '{lock_name}', @LockOwner = 'Session';"
+ )