You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/07/05 14:47:25 UTC
[airflow] 06/16: Handle occasional deadlocks in trigger with retries (#24071)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4284d03e98df5701a3e41cbf0826b2dccbefacee
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Wed Jun 1 19:54:40 2022 +0200
Handle occasional deadlocks in trigger with retries (#24071)
Fixes: #23639
(cherry picked from commit d86ae090350de97e385ca4aaf128235f4c21f158)
---
airflow/models/trigger.py | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index c1ccdd4964..2f332393f9 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -24,6 +24,7 @@ from airflow.models.base import Base
from airflow.models.taskinstance import TaskInstance
from airflow.triggers.base import BaseTrigger
from airflow.utils import timezone
+from airflow.utils.retries import run_with_db_retries
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
from airflow.utils.state import State
@@ -88,9 +89,11 @@ class Trigger(Base):
(triggers have a one-to-many relationship to both)
"""
# Update all task instances with trigger IDs that are not DEFERRED to remove them
- session.query(TaskInstance).filter(
- TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
- ).update({TaskInstance.trigger_id: None})
+ for attempt in run_with_db_retries():
+ with attempt:
+ session.query(TaskInstance).filter(
+ TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
+ ).update({TaskInstance.trigger_id: None})
# Get all triggers that have no task instances depending on them...
ids = [
trigger_id