You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/28 15:47:26 UTC

[GitHub] [airflow] dstandish commented on a diff in pull request #27344: Add retry to submit_event in trigger to avoid deadlock

dstandish commented on code in PR #27344:
URL: https://github.com/apache/airflow/pull/27344#discussion_r1008211688


##########
airflow/models/trigger.py:
##########
@@ -122,17 +122,19 @@ def submit_event(cls, trigger_id, event, session=None):
         Takes an event from an instance of itself, and triggers all dependent
         tasks to resume.
         """
-        for task_instance in session.query(TaskInstance).filter(
-            TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
-        ):
-            # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
-            next_kwargs["event"] = event.payload
-            task_instance.next_kwargs = next_kwargs
-            # Remove ourselves as its trigger
-            task_instance.trigger_id = None
-            # Finally, mark it as scheduled so it gets re-queued
-            task_instance.state = State.SCHEDULED
+        for attempt in run_with_db_retries():
+            with attempt:
+                for task_instance in session.query(TaskInstance).filter(
+                    TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
+                ):
+                    # Add the event's payload into the kwargs for the task
+                    next_kwargs = task_instance.next_kwargs or {}
+                    next_kwargs["event"] = event.payload
+                    task_instance.next_kwargs = next_kwargs
+                    # Remove ourselves as its trigger
+                    task_instance.trigger_id = None
+                    # Finally, mark it as scheduled so it gets re-queued
+                    task_instance.state = State.SCHEDULED

Review Comment:
   ```suggestion
           for attempt in run_with_db_retries():
               with attempt:
                   tis = session.query(TaskInstance).filter(
                       TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
                   )
   
           for task_instance in tis:
               # Add the event's payload into the kwargs for the task
               next_kwargs = task_instance.next_kwargs or {}
               next_kwargs["event"] = event.payload
               task_instance.next_kwargs = next_kwargs
               # Remove ourselves as its trigger
               task_instance.trigger_id = None
               # Finally, mark it as scheduled so it gets re-queued
               task_instance.state = State.SCHEDULED
   ```
   
   i think separating the retry logic from the actual work makes things a little more digestable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org