You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/29 13:18:41 UTC

[GitHub] [airflow] uranusjr commented on a change in pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

uranusjr commented on a change in pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#discussion_r758351332



##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,49 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
+@contextlib.contextmanager
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
+        elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+            conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), id=str(lock), timeout=lock_timeout)
+        elif dialect.name == 'mssql':
+            # TODO: make locking works for MSSQL
+            pass
+
+        yield None
+    finally:
+        # The session may have been "closed" (which is fine, the lock lasts more than a transaction) -- ensure
+        # we get a usable connection
+        conn = session.connection()
+        if dialect.name == 'postgresql':
+            conn.execute('SET LOCK_TIMEOUT TO DEFAULT')
+            conn.execute(text('SELECT pg_advisory_unlock(:id)'), id=lock.value)
+        elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+            conn.execute(text("select RELEASE_LOCK(:id)"), id=str(lock))
+        elif dialect.name == 'mssql':
+            # TODO: make locking works for MSSQL

Review comment:
       ```suggestion
               # TODO: make locking work for MSSQL
   ```

##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,49 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
+@contextlib.contextmanager
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
+        elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+            conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), id=str(lock), timeout=lock_timeout)
+        elif dialect.name == 'mssql':
+            # TODO: make locking works for MSSQL

Review comment:
       ```suggestion
               # TODO: make locking work for MSSQL
   ```

##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,49 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.

Review comment:
       ```suggestion
       Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we
       call ``str()`, which is implemented using the ``_name_`` field.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org