You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/28 15:47:26 UTC
[GitHub] [airflow] dstandish commented on a diff in pull request #27344: Add retry to submit_event in trigger to avoid deadlock
dstandish commented on code in PR #27344:
URL: https://github.com/apache/airflow/pull/27344#discussion_r1008211688
##########
airflow/models/trigger.py:
##########
@@ -122,17 +122,19 @@ def submit_event(cls, trigger_id, event, session=None):
Takes an event from an instance of itself, and triggers all dependent
tasks to resume.
"""
- for task_instance in session.query(TaskInstance).filter(
- TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
- ):
- # Add the event's payload into the kwargs for the task
- next_kwargs = task_instance.next_kwargs or {}
- next_kwargs["event"] = event.payload
- task_instance.next_kwargs = next_kwargs
- # Remove ourselves as its trigger
- task_instance.trigger_id = None
- # Finally, mark it as scheduled so it gets re-queued
- task_instance.state = State.SCHEDULED
+ for attempt in run_with_db_retries():
+ with attempt:
+ for task_instance in session.query(TaskInstance).filter(
+ TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
+ ):
+ # Add the event's payload into the kwargs for the task
+ next_kwargs = task_instance.next_kwargs or {}
+ next_kwargs["event"] = event.payload
+ task_instance.next_kwargs = next_kwargs
+ # Remove ourselves as its trigger
+ task_instance.trigger_id = None
+ # Finally, mark it as scheduled so it gets re-queued
+ task_instance.state = State.SCHEDULED
Review Comment:
```suggestion
for attempt in run_with_db_retries():
with attempt:
tis = session.query(TaskInstance).filter(
TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
)
for task_instance in tis:
# Add the event's payload into the kwargs for the task
next_kwargs = task_instance.next_kwargs or {}
next_kwargs["event"] = event.payload
task_instance.next_kwargs = next_kwargs
# Remove ourselves as its trigger
task_instance.trigger_id = None
# Finally, mark it as scheduled so it gets re-queued
task_instance.state = State.SCHEDULED
```
i think separating the retry logic from the actual work makes things a little more digestable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org