You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/01/12 00:07:53 UTC

[airflow] branch v2-5-test updated: Row-lock TIs to be removed during mapped task expansion (#28689)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-5-test by this push:
     new 6285c4e71b Row-lock TIs to be removed during mapped task expansion (#28689)
6285c4e71b is described below

commit 6285c4e71b79b8994da0cf9b8b7e8942ec0a2110
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jan 4 07:21:44 2023 +0100

    Row-lock TIs to be removed during mapped task expansion (#28689)
    
    Instead of query-update, we row lock the TI to apply the update.
    This protects against updating a row that has been updated by another process.
    
    (cherry picked from commit a055d8fd9b42ae662e0c696e29066926b5346f6a)
---
 airflow/models/abstractoperator.py | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py
index ba0a8954ae..d693f8bfc9 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -31,6 +31,7 @@ from airflow.utils.helpers import render_template_as_native, render_template_to_
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.mixins import ResolveMixin
 from airflow.utils.session import NEW_SESSION, provide_session
+from airflow.utils.sqlalchemy import skip_locked, with_row_locks
 from airflow.utils.state import State, TaskInstanceState
 from airflow.utils.task_group import MappedTaskGroup
 from airflow.utils.trigger_rule import TriggerRule
@@ -548,13 +549,15 @@ class AbstractOperator(LoggingMixin, DAGNode):
 
         # Any (old) task instances with inapplicable indexes (>= the total
         # number we need) are set to "REMOVED".
-        session.query(TaskInstance).filter(
+        query = session.query(TaskInstance).filter(
             TaskInstance.dag_id == self.dag_id,
             TaskInstance.task_id == self.task_id,
             TaskInstance.run_id == run_id,
             TaskInstance.map_index >= total_expanded_ti_count,
-        ).update({TaskInstance.state: TaskInstanceState.REMOVED})
-
+        )
+        to_update = with_row_locks(query, of=TaskInstance, session=session, **skip_locked(session=session))
+        for ti in to_update:
+            ti.state = TaskInstanceState.REMOVED
         session.flush()
         return all_expanded_tis, total_expanded_ti_count - 1