You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/17 21:39:20 UTC
[airflow] 29/43: Avoid deadlock when rescheduling task (#21362)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f2fe0df6b3caa86a4315322264fad077f03b32e6
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Feb 7 20:12:05 2022 +0100
Avoid deadlock when rescheduling task (#21362)
The scheduler job performs scheduling after locking the "scheduled"
DagRun row for writing. This should prevent from modifying DagRun
and related task instances by another scheduler or "mini-scheduler"
run after task is completed.
However there is apparently one more case where the DagRun is being
locked by "Task" processes - namely when task throws
AirflowRescheduleException. In this case a new "TaskReschedule"
entity is inserted into the database and it also performs lock
on the DagRun (because TaskReschedule has "DagRun" relationship.
This PR modifies handling the AirflowRescheduleException to obtain the
very same DagRun lock before it attempts to insert TaskReschedule
entity.
Seems that TaskReschedule is the only one that has this relationship
so likely all the misterious SchedulerJob deadlock cases we
experienced might be explained (and fixed) by this one.
It is likely that this one:
* Fixes: #16982
* Fixes: #19957
(cherry picked from commit 6d110b565a505505351d1ff19592626fb24e4516)
---
airflow/models/taskinstance.py | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index ec34156..2dcc923 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -93,7 +93,7 @@ from airflow.utils.operator_helpers import context_to_airflow_vars
from airflow.utils.platform import getuser
from airflow.utils.retries import run_with_db_retries
from airflow.utils.session import create_session, provide_session
-from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
+from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, with_row_locks
from airflow.utils.state import DagRunState, State
from airflow.utils.timeout import timeout
@@ -1657,11 +1657,24 @@ class TaskInstance(Base, LoggingMixin):
# Don't record reschedule request in test mode
if test_mode:
return
+
+ from airflow.models.dagrun import DagRun # Avoid circular import
+
self.refresh_from_db(session)
self.end_date = timezone.utcnow()
self.set_duration()
+ # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+ # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+ with_row_locks(
+ session.query(DagRun).filter_by(
+ dag_id=self.dag_id,
+ run_id=self.run_id,
+ ),
+ session=session,
+ ).one()
+
# Log reschedule request
session.add(
TaskReschedule(