You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/24 05:37:54 UTC

[GitHub] [airflow] uranusjr commented on a diff in pull request #27072: Resolve trigger assignment race condition

uranusjr commented on code in PR #27072:
URL: https://github.com/apache/airflow/pull/27072#discussion_r1002895940


##########
airflow/models/trigger.py:
##########
@@ -196,15 +196,16 @@ def assign_unassigned(cls, triggerer_id, capacity, session=None):
 
         # Find triggers who do NOT have an alive triggerer_id, and then assign
         # up to `capacity` of those to us.
-        trigger_ids_query = (
+        trigger_ids_query = with_row_locks(
             session.query(cls.id)
-            # notin_ doesn't find NULL rows
             .filter(or_(cls.triggerer_id.is_(None), cls.triggerer_id.notin_(alive_triggerer_ids)))
-            .limit(capacity)
-            .all()
-        )
-        session.query(cls).filter(cls.id.in_([i.id for i in trigger_ids_query])).update(
-            {cls.triggerer_id: triggerer_id},
-            synchronize_session=False,
-        )
+            .limit(capacity),
+            session,
+            skip_locked=True,
+        ).all()
+        if trigger_ids_query:
+            session.query(cls).filter(cls.id.in_([i.id for i in trigger_ids_query])).update(

Review Comment:
   ```suggestion
               session.query(cls).filter(cls.id.in_([i.id for i in trigger_ids_query])).update(
   ```
   
   Would it be possible to transform `trigger_ids_query` to a subquery so this can be done in one db call instead of two?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org