You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/01/21 00:06:58 UTC

[airflow] 06/23: Adds retry on taskinstance retrieval lock (#20030)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 94865f9c6b780ab80bc78f6287752c426e769c60
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Tue Dec 7 16:05:47 2021 +0100

    Adds retry on taskinstance retrieval lock (#20030)
    
    Fixes: #19832
    
    Co-authored-by: Jaroslaw Potiuk <ja...@Jaroslaws-MacBook-Pro.local>
    (cherry picked from commit 78c815e22b67e442982b53f41d7d899723d5de9f)
---
 airflow/models/taskinstance.py | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 6e9862e..f37cada 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -91,6 +91,7 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.net import get_hostname
 from airflow.utils.operator_helpers import context_to_airflow_vars
 from airflow.utils.platform import getuser
+from airflow.utils.retries import run_with_db_retries
 from airflow.utils.session import create_session, provide_session
 from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
 from airflow.utils.state import DagRunState, State
@@ -723,7 +724,9 @@ class TaskInstance(Base, LoggingMixin):
         )
 
         if lock_for_update:
-            ti: Optional[TaskInstance] = qry.with_for_update().first()
+            for attempt in run_with_db_retries(logger=self.log):
+                with attempt:
+                    ti: Optional[TaskInstance] = qry.with_for_update().first()
         else:
             ti = qry.first()
         if ti: