You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/28 10:26:00 UTC

[GitHub] [airflow] NickYadance opened a new pull request, #27344: Add retry to submit_event in trigger to avoid deadlock

NickYadance opened a new pull request, #27344:
URL: https://github.com/apache/airflow/pull/27344

   related: https://github.com/apache/airflow/issues/27000


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #27344: Add retry to submit_event in trigger to avoid deadlock

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #27344:
URL: https://github.com/apache/airflow/pull/27344#discussion_r1008214523


##########
airflow/models/trigger.py:
##########
@@ -122,17 +122,19 @@ def submit_event(cls, trigger_id, event, session=None):
         Takes an event from an instance of itself, and triggers all dependent
         tasks to resume.
         """
-        for task_instance in session.query(TaskInstance).filter(
-            TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
-        ):
-            # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
-            next_kwargs["event"] = event.payload
-            task_instance.next_kwargs = next_kwargs
-            # Remove ourselves as its trigger
-            task_instance.trigger_id = None
-            # Finally, mark it as scheduled so it gets re-queued
-            task_instance.state = State.SCHEDULED
+        for attempt in run_with_db_retries():
+            with attempt:
+                for task_instance in session.query(TaskInstance).filter(
+                    TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
+                ):
+                    # Add the event's payload into the kwargs for the task
+                    next_kwargs = task_instance.next_kwargs or {}
+                    next_kwargs["event"] = event.payload
+                    task_instance.next_kwargs = next_kwargs
+                    # Remove ourselves as its trigger
+                    task_instance.trigger_id = None
+                    # Finally, mark it as scheduled so it gets re-queued
+                    task_instance.state = State.SCHEDULED

Review Comment:
   actually that might not work



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #27344: Add retry to submit_event in trigger to avoid deadlock

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #27344:
URL: https://github.com/apache/airflow/pull/27344#discussion_r1008216707


##########
airflow/models/trigger.py:
##########
@@ -122,17 +122,19 @@ def submit_event(cls, trigger_id, event, session=None):
         Takes an event from an instance of itself, and triggers all dependent
         tasks to resume.
         """
-        for task_instance in session.query(TaskInstance).filter(
-            TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
-        ):
-            # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
-            next_kwargs["event"] = event.payload
-            task_instance.next_kwargs = next_kwargs
-            # Remove ourselves as its trigger
-            task_instance.trigger_id = None
-            # Finally, mark it as scheduled so it gets re-queued
-            task_instance.state = State.SCHEDULED
+        for attempt in run_with_db_retries():
+            with attempt:
+                for task_instance in session.query(TaskInstance).filter(
+                    TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
+                ):
+                    # Add the event's payload into the kwargs for the task
+                    next_kwargs = task_instance.next_kwargs or {}
+                    next_kwargs["event"] = event.payload
+                    task_instance.next_kwargs = next_kwargs
+                    # Remove ourselves as its trigger
+                    task_instance.trigger_id = None
+                    # Finally, mark it as scheduled so it gets re-queued
+                    task_instance.state = State.SCHEDULED

Review Comment:
   because we're mutating the objects and probably this actually fails when the method exits.
   
   so, looking at this again, it looks like we may be able to use retry decorator instead of the code black context manager.  that would seemingly be cleaner but i'll take a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] NickYadance commented on pull request #27344: Add retry to submit_event in trigger to avoid deadlock

Posted by GitBox <gi...@apache.org>.
NickYadance commented on PR #27344:
URL: https://github.com/apache/airflow/pull/27344#issuecomment-1297953893

   > @NickYadance did you try this fix? did it resolve the issue? do you think it's possible to write a test for it? i imagine you could might simulate the scenario by managing two sessions. if so, you could write a test that reliably fails without the fix, then add the fix and leave the test.
   > 
   > the reason I ask is because i see that that the retry is within the scope of the session, and i'm not sure whether, if we get a deadlock error, the session will need to be rolled back.
   > 
   > and, another option would be to put `@tenacity.retry(stop=tenacity.stop_after_attempt(MAX_DB_TRIES))` as the outermost decorator, and then each failed attempt would get rolled back and a new session created for the next attempt
   
   No actually. The solution is from here as these are similiar cases. 
   https://github.com/apache/airflow/blob/b29ca4e77d4d80fb1f4d6d4b497a3a14979dd244/airflow/models/trigger.py#L100-L104 
   Working on to reproduce this...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #27344: Add retry to submit_event in trigger to avoid deadlock

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #27344:
URL: https://github.com/apache/airflow/pull/27344#issuecomment-1309860668

   > > @NickYadance did you try this fix? did it resolve the issue? do you think it's possible to write a test for it? i imagine you could might simulate the scenario by managing two sessions. if so, you could write a test that reliably fails without the fix, then add the fix and leave the test.
   > > the reason I ask is because i see that that the retry is within the scope of the session, and i'm not sure whether, if we get a deadlock error, the session will need to be rolled back.
   > > and, another option would be to put `@tenacity.retry(stop=tenacity.stop_after_attempt(MAX_DB_TRIES))` as the outermost decorator, and then each failed attempt would get rolled back and a new session created for the next attempt
   > 
   > No actually. The solution is from here as these are similiar cases.
   > 
   > https://github.com/apache/airflow/blob/b29ca4e77d4d80fb1f4d6d4b497a3a14979dd244/airflow/models/trigger.py#L100-L104
   > 
   > 
   > Working on to reproduce this...
   
   Here's an example that illustrates what I'm saying:
   
   ```
   from airflow.models import Log
   from airflow.settings import Session
   
   session = Session()
   for i in range(3):
       val = 'hi' if i == 2 else {}
       print(f"{val=}")
       try:
           session.add(Log(event=val))
           session.commit()
           break
       except Exception as e:
           print(f'failure: {e}')
   ```
   
   This line `session.add(Log(event=val))` fails when val is dict but succeeds when string.
   
   The first two times it runs, it runs with dict.  The third time it runs with string.
   
   But you will see that it doesn't matter that the third try uses a good value; it will still fail because the transaction has not been rolled back.
   
   The only difference is the error is `(sqlite3.InterfaceError) Error binding parameter 4 - probably unsupported type` instead of deadlock error.
   
   Now... is that a difference that makes a difference?  I am not sure.  But, I think it's likely.  If that's true, this won't work and you'll have to do the retry around the whole transaction instead of within it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #27344: Add retry to submit_event in trigger to avoid deadlock

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #27344:
URL: https://github.com/apache/airflow/pull/27344#issuecomment-1327107144

   @NickYadance did you give up on this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #27344: Add retry to submit_event in trigger to avoid deadlock

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #27344:
URL: https://github.com/apache/airflow/pull/27344#issuecomment-1295174859

   @NickYadance did you try this? did it resolve the issue?  do you think it's possible to write a test for it?  i imagine you could might simulate the scenario by managing two sessions.  if so, you could write a test that reliably fails without the fix, then add the fix and leave the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #27344: Add retry to submit_event in trigger to avoid deadlock

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #27344:
URL: https://github.com/apache/airflow/pull/27344#discussion_r1008211688


##########
airflow/models/trigger.py:
##########
@@ -122,17 +122,19 @@ def submit_event(cls, trigger_id, event, session=None):
         Takes an event from an instance of itself, and triggers all dependent
         tasks to resume.
         """
-        for task_instance in session.query(TaskInstance).filter(
-            TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
-        ):
-            # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
-            next_kwargs["event"] = event.payload
-            task_instance.next_kwargs = next_kwargs
-            # Remove ourselves as its trigger
-            task_instance.trigger_id = None
-            # Finally, mark it as scheduled so it gets re-queued
-            task_instance.state = State.SCHEDULED
+        for attempt in run_with_db_retries():
+            with attempt:
+                for task_instance in session.query(TaskInstance).filter(
+                    TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
+                ):
+                    # Add the event's payload into the kwargs for the task
+                    next_kwargs = task_instance.next_kwargs or {}
+                    next_kwargs["event"] = event.payload
+                    task_instance.next_kwargs = next_kwargs
+                    # Remove ourselves as its trigger
+                    task_instance.trigger_id = None
+                    # Finally, mark it as scheduled so it gets re-queued
+                    task_instance.state = State.SCHEDULED

Review Comment:
   ```suggestion
           for attempt in run_with_db_retries():
               with attempt:
                   tis = session.query(TaskInstance).filter(
                       TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
                   )
   
           for task_instance in tis:
               # Add the event's payload into the kwargs for the task
               next_kwargs = task_instance.next_kwargs or {}
               next_kwargs["event"] = event.payload
               task_instance.next_kwargs = next_kwargs
               # Remove ourselves as its trigger
               task_instance.trigger_id = None
               # Finally, mark it as scheduled so it gets re-queued
               task_instance.state = State.SCHEDULED
   ```
   
   i think separating the retry logic from the actual work makes things a little more digestable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] NickYadance commented on pull request #27344: Add retry to submit_event in trigger to avoid deadlock

Posted by GitBox <gi...@apache.org>.
NickYadance commented on PR #27344:
URL: https://github.com/apache/airflow/pull/27344#issuecomment-1327169588

   nop. i just rebase my source repo cause i'm testing another issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] NickYadance closed pull request #27344: Add retry to submit_event in trigger to avoid deadlock

Posted by GitBox <gi...@apache.org>.
NickYadance closed pull request #27344: Add retry to submit_event in trigger to avoid deadlock
URL: https://github.com/apache/airflow/pull/27344


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org