You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/01/21 00:06:58 UTC
[airflow] 06/23: Adds retry on taskinstance retrieval lock (#20030)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 94865f9c6b780ab80bc78f6287752c426e769c60
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Tue Dec 7 16:05:47 2021 +0100
Adds retry on taskinstance retrieval lock (#20030)
Fixes: #19832
Co-authored-by: Jaroslaw Potiuk <ja...@Jaroslaws-MacBook-Pro.local>
(cherry picked from commit 78c815e22b67e442982b53f41d7d899723d5de9f)
---
airflow/models/taskinstance.py | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6e9862e..f37cada 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -91,6 +91,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
from airflow.utils.operator_helpers import context_to_airflow_vars
from airflow.utils.platform import getuser
+from airflow.utils.retries import run_with_db_retries
from airflow.utils.session import create_session, provide_session
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
from airflow.utils.state import DagRunState, State
@@ -723,7 +724,9 @@ class TaskInstance(Base, LoggingMixin):
)
if lock_for_update:
- ti: Optional[TaskInstance] = qry.with_for_update().first()
+ for attempt in run_with_db_retries(logger=self.log):
+ with attempt:
+ ti: Optional[TaskInstance] = qry.with_for_update().first()
else:
ti = qry.first()
if ti: