You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/01 17:54:49 UTC
[airflow] branch main updated: Handle occasional deadlocks in trigger with retries (#24071)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d86ae09035 Handle occasional deadlocks in trigger with retries (#24071)
d86ae09035 is described below
commit d86ae090350de97e385ca4aaf128235f4c21f158
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Wed Jun 1 19:54:40 2022 +0200
Handle occasional deadlocks in trigger with retries (#24071)
Fixes: #23639
---
airflow/models/trigger.py | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index 24e62789b5..dc91fdccdf 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -24,6 +24,7 @@ from airflow.models.base import Base
from airflow.models.taskinstance import TaskInstance
from airflow.triggers.base import BaseTrigger
from airflow.utils import timezone
+from airflow.utils.retries import run_with_db_retries
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
from airflow.utils.state import State
@@ -88,9 +89,11 @@ class Trigger(Base):
(triggers have a one-to-many relationship to both)
"""
# Update all task instances with trigger IDs that are not DEFERRED to remove them
- session.query(TaskInstance).filter(
- TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
- ).update({TaskInstance.trigger_id: None})
+ for attempt in run_with_db_retries():
+ with attempt:
+ session.query(TaskInstance).filter(
+ TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
+ ).update({TaskInstance.trigger_id: None})
# Get all triggers that have no task instances depending on them...
ids = [
trigger_id