You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/10 18:33:09 UTC

[airflow] 14/18: Avoid deadlock when rescheduling task (#21362)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b1e3572665ebf98b8f9a497bf7ec6b2308a61c68
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Feb 7 20:12:05 2022 +0100

    Avoid deadlock when rescheduling task (#21362)
    
    The scheduler job performs scheduling after locking the "scheduled"
    DagRun row for writing. This should prevent from modifying DagRun
    and related task instances by another scheduler or "mini-scheduler"
    run after task is completed.
    
    However there is apparently one more case where the DagRun is being
    locked by "Task" processes - namely when task throws
    AirflowRescheduleException. In this case a new "TaskReschedule"
    entity is inserted into the database and it also performs lock
    on the DagRun (because TaskReschedule has "DagRun" relationship.
    
    This PR modifies handling the AirflowRescheduleException to obtain the
    very same DagRun lock before it attempts to insert TaskReschedule
    entity.
    
    Seems that TaskReschedule is the only one that has this relationship
    so likely all the misterious SchedulerJob deadlock cases we
    experienced might be explained (and fixed) by this one.
    
    It is likely that this one:
    
    * Fixes: #16982
    * Fixes: #19957
    
    (cherry picked from commit 6d110b565a505505351d1ff19592626fb24e4516)
---
 airflow/models/taskinstance.py | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index ec34156..2dcc923 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -93,7 +93,7 @@ from airflow.utils.operator_helpers import context_to_airflow_vars
 from airflow.utils.platform import getuser
 from airflow.utils.retries import run_with_db_retries
 from airflow.utils.session import create_session, provide_session
-from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
+from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, with_row_locks
 from airflow.utils.state import DagRunState, State
 from airflow.utils.timeout import timeout
 
@@ -1657,11 +1657,24 @@ class TaskInstance(Base, LoggingMixin):
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()
+
         # Log reschedule request
         session.add(
             TaskReschedule(