You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/03/22 20:37:14 UTC

[airflow] branch master updated: Acquire lock on db for the time of migration (#10151)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d96eb0  Acquire lock on db for the time of migration (#10151)
5d96eb0 is described below

commit 5d96eb0e00b831daf398f14919cbd7dadc95d254
Author: Anita Fronczak <ak...@gmail.com>
AuthorDate: Mon Mar 22 21:36:51 2021 +0100

    Acquire lock on db for the time of migration (#10151)
    
    We have a situation when both web server and k8s airflow-initdb can
    start migrations at the same time.
    
    Alembic in no way supports concurrent migrations.
    
    The solution is based on https://github.com/sqlalchemy/alembic/issues/633
    
    Change-Id: I5b894c947ec2e56efab622357e160e7c300b7b99
    
    Co-authored-by: Felix Uellendall <fe...@users.noreply.github.com>
    
    Co-authored-by: Cloud Composer Team <no...@google.com>
    Co-authored-by: Felix Uellendall <fe...@users.noreply.github.com>
---
 airflow/migrations/env.py | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py
index 459cb7c..9b12f6c 100644
--- a/airflow/migrations/env.py
+++ b/airflow/migrations/env.py
@@ -99,7 +99,15 @@ def run_migrations_online():
         )
 
         with context.begin_transaction():
+            if connection.dialect.name == 'mysql' and connection.dialect.server_version_info >= (5, 6):
+                connection.execute("select GET_LOCK('alembic',1800);")
+            if connection.dialect.name == 'postgresql':
+                context.get_context()._ensure_version_table()  # pylint: disable=protected-access
+                connection.execute("LOCK TABLE alembic_version IN ACCESS EXCLUSIVE MODE")
             context.run_migrations()
+            if connection.dialect.name == 'mysql' and connection.dialect.server_version_info >= (5, 6):
+                connection.execute("select RELEASE_LOCK('alembic');")
+            # for Postgres lock is released when transaction ends
 
 
 if context.is_offline_mode():