You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/01 17:54:49 UTC

[airflow] branch main updated: Handle occasional deadlocks in trigger with retries (#24071)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d86ae09035 Handle occasional deadlocks in trigger with retries (#24071)
d86ae09035 is described below

commit d86ae090350de97e385ca4aaf128235f4c21f158
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Wed Jun 1 19:54:40 2022 +0200

    Handle occasional deadlocks in trigger with retries (#24071)
    
    Fixes: #23639
---
 airflow/models/trigger.py | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index 24e62789b5..dc91fdccdf 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -24,6 +24,7 @@ from airflow.models.base import Base
 from airflow.models.taskinstance import TaskInstance
 from airflow.triggers.base import BaseTrigger
 from airflow.utils import timezone
+from airflow.utils.retries import run_with_db_retries
 from airflow.utils.session import provide_session
 from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
 from airflow.utils.state import State
@@ -88,9 +89,11 @@ class Trigger(Base):
         (triggers have a one-to-many relationship to both)
         """
         # Update all task instances with trigger IDs that are not DEFERRED to remove them
-        session.query(TaskInstance).filter(
-            TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
-        ).update({TaskInstance.trigger_id: None})
+        for attempt in run_with_db_retries():
+            with attempt:
+                session.query(TaskInstance).filter(
+                    TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
+                ).update({TaskInstance.trigger_id: None})
         # Get all triggers that have no task instances depending on them...
         ids = [
             trigger_id