You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/26 15:16:39 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

kaxil commented on a change in pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#discussion_r757564067



##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    INIT = enum.auto()
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
+@contextlib.contextmanager
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)

Review comment:
       ```suggestion
               conn.execute(text('SELECT PG_ADVISORY_LOCK(:id)'), id=lock.value)
   ```

##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    INIT = enum.auto()
+    MIGRATIONS = enum.auto()

Review comment:
       Any reason we need both `init` and `migrations` as `initdb()` runs `upgradedb()` too?

##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    INIT = enum.auto()
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
+@contextlib.contextmanager
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
+        elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+            conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), id=str(lock), timeout=lock_timeout)
+        elif dialect.name == 'mssql':
+            # TODO: make locking works for MSSQL
+            pass
+
+        yield None
+    finally:
+        if dialect.name == 'postgresql':
+            conn.execute('SET LOCK_TIMEOUT TO DEFAULT')
+            conn.execute(text('SELECT pg_advisory_unlock(:id)'), id=lock.value)

Review comment:
       ```suggestion
               conn.execute(text('SELECT PG_ADVISORY_UNLOCK(:id)'), id=lock.value)
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -234,8 +234,25 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
         :type max_tis: int
         :return: list[airflow.models.TaskInstance]
         """
+        from airflow.utils.db import DBLocks
+
         executable_tis: List[TI] = []
 
+        if session.get_bind().dialect.name == "postgresql":
+            # Optimization: to avoid littering the DB errors of "ERROR: canceling statement due to lock
+            # timeout", try to take out a transactional advisory lock (unlocks automatically on
+            # COMMIT/ROLLBACK)
+            lock_acquired = session.execute(
+                text("SELECT pg_try_advisory_xact_lock(:id)").bindparams(
+                    id=DBLocks.SCHEDULER_CRITICAL_SECTION

Review comment:
       ```suggestion
                       id=DBLocks.SCHEDULER_CRITICAL_SECTION.value
   ```
   
   

##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    INIT = enum.auto()
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
+@contextlib.contextmanager
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)

Review comment:
       nit as others are CAP in above and below lines




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org