You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/07/05 14:47:25 UTC

[airflow] 06/16: Handle occasional deadlocks in trigger with retries (#24071)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4284d03e98df5701a3e41cbf0826b2dccbefacee
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Wed Jun 1 19:54:40 2022 +0200

    Handle occasional deadlocks in trigger with retries (#24071)
    
    Fixes: #23639
    (cherry picked from commit d86ae090350de97e385ca4aaf128235f4c21f158)
---
 airflow/models/trigger.py | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index c1ccdd4964..2f332393f9 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -24,6 +24,7 @@ from airflow.models.base import Base
 from airflow.models.taskinstance import TaskInstance
 from airflow.triggers.base import BaseTrigger
 from airflow.utils import timezone
+from airflow.utils.retries import run_with_db_retries
 from airflow.utils.session import provide_session
 from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
 from airflow.utils.state import State
@@ -88,9 +89,11 @@ class Trigger(Base):
         (triggers have a one-to-many relationship to both)
         """
         # Update all task instances with trigger IDs that are not DEFERRED to remove them
-        session.query(TaskInstance).filter(
-            TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
-        ).update({TaskInstance.trigger_id: None})
+        for attempt in run_with_db_retries():
+            with attempt:
+                session.query(TaskInstance).filter(
+                    TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
+                ).update({TaskInstance.trigger_id: None})
         # Get all triggers that have no task instances depending on them...
         ids = [
             trigger_id