You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/01/12 00:07:53 UTC
[airflow] branch v2-5-test updated: Row-lock TIs to be removed during mapped task expansion (#28689)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-5-test by this push:
new 6285c4e71b Row-lock TIs to be removed during mapped task expansion (#28689)
6285c4e71b is described below
commit 6285c4e71b79b8994da0cf9b8b7e8942ec0a2110
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jan 4 07:21:44 2023 +0100
Row-lock TIs to be removed during mapped task expansion (#28689)
Instead of query-update, we row lock the TI to apply the update.
This protects against updating a row that has been updated by another process.
(cherry picked from commit a055d8fd9b42ae662e0c696e29066926b5346f6a)
---
airflow/models/abstractoperator.py | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py
index ba0a8954ae..d693f8bfc9 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -31,6 +31,7 @@ from airflow.utils.helpers import render_template_as_native, render_template_to_
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.mixins import ResolveMixin
from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.sqlalchemy import skip_locked, with_row_locks
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.task_group import MappedTaskGroup
from airflow.utils.trigger_rule import TriggerRule
@@ -548,13 +549,15 @@ class AbstractOperator(LoggingMixin, DAGNode):
# Any (old) task instances with inapplicable indexes (>= the total
# number we need) are set to "REMOVED".
- session.query(TaskInstance).filter(
+ query = session.query(TaskInstance).filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id == self.task_id,
TaskInstance.run_id == run_id,
TaskInstance.map_index >= total_expanded_ti_count,
- ).update({TaskInstance.state: TaskInstanceState.REMOVED})
-
+ )
+ to_update = with_row_locks(query, of=TaskInstance, session=session, **skip_locked(session=session))
+ for ti in to_update:
+ ti.state = TaskInstanceState.REMOVED
session.flush()
return all_expanded_tis, total_expanded_ti_count - 1