You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/26 14:35:17 UTC

[GitHub] [airflow] ashb opened a new pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

ashb opened a new pull request #19842:
URL: https://github.com/apache/airflow/pull/19842


   If you are running multiple schedulers on PostgreSQL, it is likely that sooner or later you will have one scheduler fail the race to enter the critical section (which is fine, and expected).
   
   However this can end up spamming the DB logs with errors like this:
   
   ```
   Nov 26 14:08:48 sinope postgres[709953]: 2021-11-26 14:08:48.672 GMT [709953] ERROR:  could not obtain lock on row in relation "slot_pool"
   Nov 26 14:08:48 sinope postgres[709953]: 2021-11-26 14:08:48.672 GMT [709953] STATEMENT:  SELECT slot_pool.pool AS slot_pool_pool, slot_pool.slots AS slot_pool_slots
   Nov 26 14:08:48 sinope postgres[709953]:         FROM slot_pool FOR UPDATE NOWAIT
   Nov 26 14:08:49 sinope postgres[709954]: 2021-11-26 14:08:49.730 GMT [709954] ERROR:  could not obtain lock on row in relation "slot_pool"
   Nov 26 14:08:49 sinope postgres[709954]: 2021-11-26 14:08:49.730 GMT [709954] STATEMENT:  SELECT slot_pool.pool AS slot_pool_pool, slot_pool.slots AS slot_pool_slots
   Nov 26 14:08:49 sinope postgres[709954]:         FROM slot_pool FOR UPDATE NOWAIT
   ```
   
   If you are really unlucky that can end up happening over and over and over again.
   
   So to avoid this error, for PostgreSQL only, we first try to acquire an "advisory lock" (advisory because it's up to the application to respect it), and if we cannot raise an error _like_ would have happened from the `FOR UPDATE NOWAIT`.
   
   (We still obtain the exclusive log on the pool rows so that the rows are locked.)
   
   This PR is split in to two commits, the second obtains the lock, and the first commit refactors the existing global locks to use enums to remove magic constants as these (integer for postgres) lock ids are "global", so we need to be sure the scheduler's lock doesn't clash with the `db upgrade` lock.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#issuecomment-981622670


   Interesting.... it was passing without changing the lock. Let me think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#discussion_r757564067



##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    INIT = enum.auto()
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
+@contextlib.contextmanager
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)

Review comment:
       ```suggestion
               conn.execute(text('SELECT PG_ADVISORY_LOCK(:id)'), id=lock.value)
   ```

##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    INIT = enum.auto()
+    MIGRATIONS = enum.auto()

Review comment:
       Any reason we need both `init` and `migrations` as `initdb()` runs `upgradedb()` too?

##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    INIT = enum.auto()
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
+@contextlib.contextmanager
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
+        elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+            conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), id=str(lock), timeout=lock_timeout)
+        elif dialect.name == 'mssql':
+            # TODO: make locking works for MSSQL
+            pass
+
+        yield None
+    finally:
+        if dialect.name == 'postgresql':
+            conn.execute('SET LOCK_TIMEOUT TO DEFAULT')
+            conn.execute(text('SELECT pg_advisory_unlock(:id)'), id=lock.value)

Review comment:
       ```suggestion
               conn.execute(text('SELECT PG_ADVISORY_UNLOCK(:id)'), id=lock.value)
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -234,8 +234,25 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
         :type max_tis: int
         :return: list[airflow.models.TaskInstance]
         """
+        from airflow.utils.db import DBLocks
+
         executable_tis: List[TI] = []
 
+        if session.get_bind().dialect.name == "postgresql":
+            # Optimization: to avoid littering the DB errors of "ERROR: canceling statement due to lock
+            # timeout", try to take out a transactional advisory lock (unlocks automatically on
+            # COMMIT/ROLLBACK)
+            lock_acquired = session.execute(
+                text("SELECT pg_try_advisory_xact_lock(:id)").bindparams(
+                    id=DBLocks.SCHEDULER_CRITICAL_SECTION

Review comment:
       ```suggestion
                       id=DBLocks.SCHEDULER_CRITICAL_SECTION.value
   ```
   
   

##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    INIT = enum.auto()
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
+@contextlib.contextmanager
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)

Review comment:
       nit as others are CAP in above and below lines




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#issuecomment-980023917


   /cc @wolfier @collinmcnulty 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#discussion_r757579779



##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    INIT = enum.auto()
+    MIGRATIONS = enum.auto()

Review comment:
       Yeah, I wasn't sure. Previously there were _three_ separate locks: one for upgrade, one for reset, and one for the "DAG sync" part of init.
   
   This init lock could perhaps be dropped entirely, yes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#discussion_r758351332



##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,49 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
+@contextlib.contextmanager
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
+        elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+            conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), id=str(lock), timeout=lock_timeout)
+        elif dialect.name == 'mssql':
+            # TODO: make locking works for MSSQL
+            pass
+
+        yield None
+    finally:
+        # The session may have been "closed" (which is fine, the lock lasts more than a transaction) -- ensure
+        # we get a usable connection
+        conn = session.connection()
+        if dialect.name == 'postgresql':
+            conn.execute('SET LOCK_TIMEOUT TO DEFAULT')
+            conn.execute(text('SELECT pg_advisory_unlock(:id)'), id=lock.value)
+        elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+            conn.execute(text("select RELEASE_LOCK(:id)"), id=str(lock))
+        elif dialect.name == 'mssql':
+            # TODO: make locking works for MSSQL

Review comment:
       ```suggestion
               # TODO: make locking work for MSSQL
   ```

##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,49 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
+@contextlib.contextmanager
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
+        elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+            conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), id=str(lock), timeout=lock_timeout)
+        elif dialect.name == 'mssql':
+            # TODO: make locking works for MSSQL

Review comment:
       ```suggestion
               # TODO: make locking work for MSSQL
   ```

##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,49 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.

Review comment:
       ```suggestion
       Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we
       call ``str()`, which is implemented using the ``_name_`` field.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#issuecomment-981734201


   Fixed in 3b0ec6234 -- the connection we issued the lock from was closed when the migrations ran.
   
   The fix there is to use the same connection so that when the migrations code is finished with it, it doesn't get closed, so we can still lock it.
   
   I suspect this might have been the problem with the MSSQL locking too -- it's just that on Postgres we never noticed as it was only a warning and the return value was ignored!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#discussion_r757579051



##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    INIT = enum.auto()
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
+@contextlib.contextmanager
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)

Review comment:
       https://www.postgresql.org/docs/9.1/functions-admin.html -- casing as per the docs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] AhmadBenMaallem commented on pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
AhmadBenMaallem commented on pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#issuecomment-1015278186


   Has this fix been integrated into an official docker image ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#issuecomment-981620635


   Hmm tests are failing:
   
   ```
    INFO  [alembic.runtime.migration] Running upgrade 7b2661a43ba3 -> be2bfac3da23, Add has_import_errors column to DagModel
     WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
     Traceback (most recent call last):
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
         cursor, statement, parameters, context
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
         cursor.execute(statement, parameters)
     psycopg2.errors.LockNotAvailable: canceling statement due to lock timeout
     
     
     The above exception was the direct cause of the following exception:
     
     Traceback (most recent call last):
       File "/opt/airflow/airflow/utils/db.py", line 1017, in create_global_lock
         conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
         return meth(self, multiparams, params)
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
         return connection._execute_clauseelement(self, multiparams, params)
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
         distilled_params,
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
         e, statement, parameters, cursor, context
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
         sqlalchemy_exception, with_traceback=exc_info[2], from_=e
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
         raise exception
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
         cursor, statement, parameters, context
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
         cursor.execute(statement, parameters)
     sqlalchemy.exc.OperationalError: (psycopg2.errors.LockNotAvailable) canceling statement due to lock timeout
     
     [SQL: SELECT pg_advisory_lock(%(id)s)]
     [parameters: {'id': 1}]
     (Background on this error at: http://sqlalche.me/e/13/e3q8)
     
     During handling of the above exception, another exception occurred:
     
     Traceback (most recent call last):
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
         cursor, statement, parameters, context
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
         cursor.execute(statement, parameters)
     psycopg2.errors.InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block
     
     
     The above exception was the direct cause of the following exception:
     
     Traceback (most recent call last):
       File "/usr/local/bin/airflow", line 33, in <module>
         sys.exit(load_entry_point('apache-airflow', 'console_scripts', 'airflow')())
       File "/opt/airflow/airflow/__main__.py", line 48, in main
         args.func(args)
       File "/opt/airflow/airflow/cli/cli_parser.py", line 48, in command
         return func(*args, **kwargs)
       File "/opt/airflow/airflow/cli/commands/db_command.py", line 39, in resetdb
         db.resetdb()
       File "/opt/airflow/airflow/utils/session.py", line 69, in wrapper
         return func(*args, session=session, **kwargs)
       File "/opt/airflow/airflow/utils/db.py", line 932, in resetdb
         initdb(session=session)
       File "/opt/airflow/airflow/utils/session.py", line 66, in wrapper
         return func(*args, **kwargs)
       File "/opt/airflow/airflow/utils/db.py", line 599, in initdb
         with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
       File "/usr/local/lib/python3.7/contextlib.py", line 112, in __enter__
         return next(self.gen)
       File "/opt/airflow/airflow/utils/db.py", line 1030, in create_global_lock
         conn.execute('SET LOCK_TIMEOUT TO DEFAULT')
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1003, in execute
         return self._execute_text(object_, multiparams, params)
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1178, in _execute_text
         parameters,
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
         e, statement, parameters, cursor, context
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
         sqlalchemy_exception, with_traceback=exc_info[2], from_=e
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
         raise exception
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
         cursor, statement, parameters, context
       File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
         cursor.execute(statement, parameters)
     sqlalchemy.exc.InternalError: (psycopg2.errors.InFailedSqlTransaction) current transaction is aborted, commands ignored until end of transaction block
     
     [SQL: SET LOCK_TIMEOUT TO DEFAULT]
     (Background on this error at: http://sqlalche.me/e/13/2j85)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#issuecomment-980036030


   Oh nice. Will take a close look later :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb edited a comment on pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb edited a comment on pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#issuecomment-981622670


   Interesting.... it was passing without changing the lock. Let me think.
   
   Edit: I'm thinking of wrong PR here!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#discussion_r758380157



##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,49 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
+@contextlib.contextmanager
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
+        elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+            conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), id=str(lock), timeout=lock_timeout)
+        elif dialect.name == 'mssql':
+            # TODO: make locking works for MSSQL
+            pass
+
+        yield None
+    finally:
+        # The session may have been "closed" (which is fine, the lock lasts more than a transaction) -- ensure
+        # we get a usable connection
+        conn = session.connection()

Review comment:
       > WARNING:  you don't own a lock of type ExclusiveLock
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#issuecomment-1015300119


   No, as this hasn't been included in a release yet -- it's slated for 2.3
   
   And while it might be annoying, seeing these in the DB in the logs is _not an error_ as far as Airflow is concerned.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#issuecomment-981620517


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb edited a comment on pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb edited a comment on pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#issuecomment-981622670


   Interesting.... it was passing without changing the lock. Let me think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#discussion_r758379886



##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,49 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    MIGRATIONS = enum.auto()
+    SCHEDULER_CRITICAL_SECTION = enum.auto()
+
+    def __str__(self):
+        return f"airflow_{self._name_}"
+
+
+@contextlib.contextmanager
+def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
+    """Contextmanager that will create and teardown a global db lock."""
+    conn = session.connection()
+    dialect = conn.dialect
+    try:
+        if dialect.name == 'postgresql':
+            conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
+            conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
+        elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
+            conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), id=str(lock), timeout=lock_timeout)
+        elif dialect.name == 'mssql':
+            # TODO: make locking works for MSSQL
+            pass
+
+        yield None
+    finally:
+        # The session may have been "closed" (which is fine, the lock lasts more than a transaction) -- ensure
+        # we get a usable connection
+        conn = session.connection()

Review comment:
       This is the problem and the cause of the failures -- if we close the connection we can't unlock someone else's lock.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb merged pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #19842:
URL: https://github.com/apache/airflow/pull/19842


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#discussion_r757580273



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -234,8 +234,25 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
         :type max_tis: int
         :return: list[airflow.models.TaskInstance]
         """
+        from airflow.utils.db import DBLocks
+
         executable_tis: List[TI] = []
 
+        if session.get_bind().dialect.name == "postgresql":
+            # Optimization: to avoid littering the DB errors of "ERROR: canceling statement due to lock
+            # timeout", try to take out a transactional advisory lock (unlocks automatically on
+            # COMMIT/ROLLBACK)
+            lock_acquired = session.execute(
+                text("SELECT pg_try_advisory_xact_lock(:id)").bindparams(
+                    id=DBLocks.SCHEDULER_CRITICAL_SECTION

Review comment:
       Thanks -- it works anyway (as its an IntEnum) but better to be explicit I agree.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#discussion_r758261829



##########
File path: airflow/utils/db.py
##########
@@ -986,3 +988,47 @@ def check(session=None):
     """
     session.execute('select 1 as is_alive;')
     log.info("Connection successful.")
+
+
+@enum.unique
+class DBLocks(enum.IntEnum):
+    """
+    Cross-db Identifiers for advisory global database locks.
+
+    Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
+    field.
+    """
+
+    INIT = enum.auto()
+    MIGRATIONS = enum.auto()

Review comment:
       Fixed in f18e2cdf2




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19842: Avoid littering postgres server logs with "could not obtain lock" with HA schedulers

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19842:
URL: https://github.com/apache/airflow/pull/19842#issuecomment-980031299


   This could _probably_ go in to 2.2.x, but I've marked it as 2.3 as I haven't tested this extensively yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org