You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/06 14:56:29 UTC

[GitHub] [airflow] potiuk opened a new pull request #21362: Avoid deadlock when rescheduling task

potiuk opened a new pull request #21362:
URL: https://github.com/apache/airflow/pull/21362


   The scheduler job performs scheduling after lockign the
   DagRun for writing. This should prevent from modifying DagRun by
   another scheduler or "mini-scheduler" run after task is completed.
   
   However there is apparently one more case where the DagRun is being
   locked by "Task" processes - namely when task throws
   AirflowRescheduleException. In this case a new "TaskReschedule"
   entity is inserted into the database and it also performs lock
   on the DagRun (because TaskReschedule has "DagRun" relationship.
   
   This PR modifies handling the AirflowRescheduleException to obtain the
   very same DagRun lock before it attempts to insert TaskReschedule
   entity.
   
   Seems that TaskReschedule is the only one that has this relationship
   so likely all the misterious SchedulerJob deadlock cases we
   experienced might be explained (and fixed) by this one.
   
   It is likely that this one:
   
   * Fixes: #16982
   * Fixes: #19957
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#discussion_r800911499



##########
File path: airflow/models/taskinstance.py
##########
@@ -1671,11 +1671,24 @@ def _handle_reschedule(
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()

Review comment:
       I think we should always obtain a lock here. And I think the only error that can happen here is the same as in Scheduler or Mini-Scheduler (we already do exactly the same call there). The error that might happen here is if we cannot obtain the lock for a "long time" I think. Which should not happen usually unless someone locks and does not free the lock (manual DB query). 
   
   I believe one_or_none() is not good here. We insert TaskReschedule which (as I understand) MUST have a corresponding entry in `DagRun`. So if we try to insert TaskReschedule that will not have a dagrun, it will fail because of the relationship with `DagRun` and inability to have foreign key to it. So if we have "none" returned here we are in a deep trouble. 
   
   Also, the whole point of this operation is to obtaine the lock on DagRun (same as scheduler and mini-scheduler do) so that any operations on the related task instances are "protected" from others who want to modify the same DagRun  (and wait for the lock we already have).
   
   I just literally copy&pasted it from there
   
   ```
       @provide_session
       @Sentry.enrich_errors
       def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
           try:
               # Re-select the row with a lock
               dag_run = with_row_locks(
                   session.query(DagRun).filter_by(
                       dag_id=self.dag_id,
                       run_id=self.task_instance.run_id,
                   ),
                   session=session,
               ).one()
   ```
   
   In mini scheduler we do try/except OperationalError , but I think this only makes sense because mini-scheduler is really "optional" and any error can be safely ignored there (and session rolled back).
   
   ```
           except OperationalError as e:
               # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
               self.log.info(
                   "Skipping mini scheduling run due to exception: %s",
                   e.statement,
                   exc_info=True,
               )
               session.rollback()
   ```
   
   But I think in this case, this woud not be appropriate. Inserting TaskReschedule in this case is not optional, so I prefer the error to propagate up to the top and FAIL the task instead (that's what would happen if we just let the exception propagate). I think this is a good behaviour. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1030853621


   How I got to that: 
   
   Gist here: https://gist.github.com/tulanowski/fcc8358bad3c8e5d15678639ec015d8b
   
   Query 1 (just a fragment of it) - this is scheduler trying to get the task instances to consider for scheduling: 
   
   ```
   Failed to execute task (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
   [SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id,
    task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS
   dag_run_1.dag_hash AS dag_run_1_dag_hash
   FROM task_instance INNER JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
   WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s
    LIMIT %s FOR UPDATE]
   ```
   
   This is this query (TRANSACTION 1 from the "server log"):
   
   ```
   SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id,
   task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date 
   AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS 
   .... 
   ```
   
   Query 2: This is is TaskReschedule insert which only happens (as far as I checked) when AirlfowRescheduleException is thrown during task execution (TRANSACTION 2 from the server log):
   
   ```
   INSERT INTO task_reschedule (task_id, dag_id, run_id, try_number, start_date, end_date, duration, reschedule_date)
   VALUES ('raw_sensor.raw_input_shop.ipad_orders_from_popup_store', 'dwh_core.main',
   'scheduled__2022-01-27T00:15:00+00:00', 1, '2022-01-28 02:16:08.000687', '2022-01-28 02:16:08.142040', 0.141353e0, '2022-01-28 02:17:08.129220')
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1031621645


   > Then please don't add me again! By still using @ephraim!
   
   Ah sorry. Really. Copy&paste. Really sorry ! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#discussion_r800809192



##########
File path: airflow/models/taskinstance.py
##########
@@ -1671,11 +1671,24 @@ def _handle_reschedule(
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()

Review comment:
       Does it look like this will throw an error if we can't obtain a lock?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#discussion_r800911499



##########
File path: airflow/models/taskinstance.py
##########
@@ -1671,11 +1671,24 @@ def _handle_reschedule(
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()

Review comment:
       I think we should always obtain a lock here. And I think the only error that can happen here is the same as in Scheduler or Mini-Scheduler (we already do exactly the same call there). The error that might happen here is if we cannot obtain the lock for a "long time" I think. Which should not happen usually unless someone locks and does not free the lock (manual DB query). 
   
   Also I think one_or_none() is not good here. We insert TaskReschedule which (as I understand) MUST have a corresponding entry in `DagRun`. So if we try to insert TaskReschedule that will not have a dagrun, it will fail because of the relationship with `DagRun` and inability to have foreign key to it. So if we have "none" returned here we are in a deep trouble. The whole point of this row is to obtaine the lock on DagRun (same as scheduler and mini-scheduler do) so that any operations on the related task instances are "protected" from others who want to obtain the same DagRun row (and wait for the same lock).
   
   I just literally copy&pasted it from there
   
   ```
       @provide_session
       @Sentry.enrich_errors
       def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
           try:
               # Re-select the row with a lock
               dag_run = with_row_locks(
                   session.query(DagRun).filter_by(
                       dag_id=self.dag_id,
                       run_id=self.task_instance.run_id,
                   ),
                   session=session,
               ).one()
   ```
   
   In mini scheduler we do try/except OperationalError , but I think this only makes sense because mini-scheduler is really "optional" and any error can be safely ignored there (and session rolled back).
   
   ```
           except OperationalError as e:
               # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
               self.log.info(
                   "Skipping mini scheduling run due to exception: %s",
                   e.statement,
                   exc_info=True,
               )
               session.rollback()
   ```
   
   But I think in this case, this woud not be appropriate. Inserting TaskReschedule in this case is not optional, so I prefer the error to propagate up to the top and FAIL the task instead (that's what woudl happen if we just let the exception propagate). I think this is a good behaviour. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1030853621


   How I got to that: 
   
   Gist here: https://gist.github.com/tulanowski/fcc8358bad3c8e5d15678639ec015d8b
   
   Query 1 (just a fragment of it) - this is scheduler trying to get the task instances to consider for scheduling: 
   
   ```
   Failed to execute task (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
   [SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id,
    task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS
   dag_run_1.dag_hash AS dag_run_1_dag_hash
   FROM task_instance INNER JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
   WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s
    LIMIT %s FOR UPDATE]
   ```
   
   This is this query (TRANSACTION 1 from the "server log"):
   
   ```
   SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id,
   task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date 
   AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS 
   .... 
   ```
   
   Query 2: This is is TaskReschedule insert which only happens (as far as I checked) when AirlfowRescheduleException is thrown during task execution (TRANSACTION 2 from the server log):
   
   ```
   INSERT INTO task_reschedule (task_id, dag_id, run_id, try_number, start_date, end_date, duration, reschedule_date)
   VALUES ('raw_sensor.raw_input_shop.ipad_orders_from_popup_store', 'dwh_core.main',
   'scheduled__2022-01-27T00:15:00+00:00', 1, '2022-01-28 02:16:08.000687', '2022-01-28 02:16:08.142040', 0.141353e0, '2022-01-28 02:17:08.129220')
   ```
   
   Both of them are waiting for this lock:
   
   ```
   *** (2) WAITING FOR THIS LOCK TO BE GRANTED:
   RECORD LOCKS space id 577 page no 2196 n bits 296 index dag_run_dag_id_run_id_key of table `airflow`.`dag_run` trx id 844106125 lock mode S locks rec but not gap waiting
   Record lock, heap no 230 PHYSICAL RECORD: n_fields 3; compact format; info bits 0
    0: len 13; hex 6477685f636f72652e6d61696e; asc dwh_core.main;;
    1: len 30; hex 7363686564756c65645f5f323032322d30312d32375430303a31353a3030; asc scheduled__2022-01-27T00:15:00; (total 36 bytes);
    2: len 4; hex 800198fa; asc     ;;
   ```
   
   How I understand this - this one is a lock on index of the dag_run primary key which needs to be updated because we are inserting a row in TaskReschedule, and because of the 'DagRun" relationship in the TaskReschedule object, this one needs to be locked when TaskReschedule related to the same dag_run_id needs to be updated. 
   
   So what I think happens:
   
   1) Scheduler locks the DAGRun entity (but not dag run index ! )
   2) Task throws AirflowRescheduleException and it tries to insert 'task_reschedule' and attempts to get the index lock (but cannot get it because that also needs to get the DagRun entity lock so it waits for it)
   2) The UPDATE TASK_INSTANCE and this one tries to get the index lock - but it can't because the insert already has it
   
   Classic deadlock.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1030849164


   @ashb @ephraim @uranusjr @BasPH  - I think I have finally found thanks to the "server log" provided by our user - the reason for the deadlock that has been plaguing some users (also the nature of the scenario that it is caused by explains why it is so rare and why only some users experience it) . Please take a look.
   
   The gist with log that led me to this hypothesis/solution is here:  https://github.com/apache/airflow/issues/16982#issuecomment-1024136484


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] stablum commented on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
stablum commented on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1080263884


   Hello,
   
   unfortunately I'm still getting deadlocks: https://github.com/apache/airflow/issues/19957#issuecomment-1079965417


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1030852222


   Hey @jedcunningham - if this one is confirmed to fix the deadlock issue, I think it is a very good candidate for 2.2.4 - it's very small and IMHO not at all risky (the worst that can happen is slightly slower rescheduling when AirflowRescheduleException is thrown) and it solves a really nasty edge case, that cannot be workarounded otherwise (If my hypothesis is confirmed that is).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#discussion_r800911499



##########
File path: airflow/models/taskinstance.py
##########
@@ -1671,11 +1671,24 @@ def _handle_reschedule(
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()

Review comment:
       I think we should always obtain a lock here. And I think the only error that can happen here is the same as in Scheduler or Mini-Scheduler (we already do exactly the same call there). The error that might happen here is if we cannot obtain the lock for a "long time" I think. Which should not happen usually unless someone locks and does not free the lock (manual DB query). 
   
   Also I think one_or_none() is not good here. We insert TaskReschedule which (as I understand) MUST have a corresponding entry in `DagRun`. So if we try to insert TaskReschedule that will not have a dagrun, it will fail because of the relationship with `DagRun` and inability to have foreign key to it. So if we have "none" returned here we are in a deep trouble. 
   
   Also, the whole point of this operation is to obtaine the lock on DagRun (same as scheduler and mini-scheduler do) so that any operations on the related task instances are "protected" from others who want to obtain the same DagRun row (and wait for the lock we already have).
   
   I just literally copy&pasted it from there
   
   ```
       @provide_session
       @Sentry.enrich_errors
       def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
           try:
               # Re-select the row with a lock
               dag_run = with_row_locks(
                   session.query(DagRun).filter_by(
                       dag_id=self.dag_id,
                       run_id=self.task_instance.run_id,
                   ),
                   session=session,
               ).one()
   ```
   
   In mini scheduler we do try/except OperationalError , but I think this only makes sense because mini-scheduler is really "optional" and any error can be safely ignored there (and session rolled back).
   
   ```
           except OperationalError as e:
               # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
               self.log.info(
                   "Skipping mini scheduling run due to exception: %s",
                   e.statement,
                   exc_info=True,
               )
               session.rollback()
   ```
   
   But I think in this case, this woud not be appropriate. Inserting TaskReschedule in this case is not optional, so I prefer the error to propagate up to the top and FAIL the task instead (that's what woudl happen if we just let the exception propagate). I think this is a good behaviour. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#discussion_r800911499



##########
File path: airflow/models/taskinstance.py
##########
@@ -1671,11 +1671,24 @@ def _handle_reschedule(
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()

Review comment:
       I think we should always obtain a lock here. And I think the only error that can happen here is the same as in Scheduler or Mini-Scheduler (we already do exactly the same call there). The error that might happen here is if we cannot obtain the lock for a "long time" I think. Which should not happen usually unless someone locks and does not free the lock (manual DB query). 
   
   I believe one_or_none() is not good here. We insert `TaskReschedule` which (as I understand) MUST have a corresponding entry in `DagRun`. So if we try to insert `TaskReschedule` that will not have a `DagRun`, it will fail because of the relationship with `DagRun` and inability to have foreign key to it. So if we have "none" returned here we are in a deep, deep trouble. 
   
   Also, the whole point of this operation is to obtaine the lock on DagRun (same as scheduler and mini-scheduler do) so that any operations on the related task instances are "protected" from others who want to modify the same DagRun  (and wait for the lock we already have).
   
   I just literally copy&pasted it from there
   
   ```
       @provide_session
       @Sentry.enrich_errors
       def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
           try:
               # Re-select the row with a lock
               dag_run = with_row_locks(
                   session.query(DagRun).filter_by(
                       dag_id=self.dag_id,
                       run_id=self.task_instance.run_id,
                   ),
                   session=session,
               ).one()
   ```
   
   In mini scheduler we do try/except OperationalError , but I think this only makes sense because mini-scheduler is really "optional" and any error can be safely ignored there (and session rolled back).
   
   ```
           except OperationalError as e:
               # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
               self.log.info(
                   "Skipping mini scheduling run due to exception: %s",
                   e.statement,
                   exc_info=True,
               )
               session.rollback()
   ```
   
   But I think in this case, this woud not be appropriate. Inserting TaskReschedule in this case is not optional, so I prefer the error to propagate up to the top and FAIL the task instead (that's what would happen if we just let the exception propagate). I think this is a good behaviour. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#discussion_r800911499



##########
File path: airflow/models/taskinstance.py
##########
@@ -1671,11 +1671,24 @@ def _handle_reschedule(
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()

Review comment:
       I think we should always obtain a lock here. And I think the only error that can happen here is the same as in Scheduler or Mini-Scheduler (we already do exactly the same call there). The error that might happen here is if we cannot obtain the lock for a "long time" I think. Which should not happen usually unless someone locks and does not free the lock (manual DB query). 
   
   Also I think one_or_none() is not good here. We insert TaskReschedule which (as I understand) MUST have a corresponding entry in `DagRun`. So if we try to insert TaskReschedule that will not have a dagrun, it will fail because of the relationship with `DagRun` and inability to have foreign key to it. So if we have "none" returned here we are in a deep trouble. 
   
   Also, the whole point of this operation is to obtaine the lock on DagRun (same as scheduler and mini-scheduler do) so that any operations on the related task instances are "protected" from others who want to modify the same DagRun  (and wait for the lock we already have).
   
   I just literally copy&pasted it from there
   
   ```
       @provide_session
       @Sentry.enrich_errors
       def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
           try:
               # Re-select the row with a lock
               dag_run = with_row_locks(
                   session.query(DagRun).filter_by(
                       dag_id=self.dag_id,
                       run_id=self.task_instance.run_id,
                   ),
                   session=session,
               ).one()
   ```
   
   In mini scheduler we do try/except OperationalError , but I think this only makes sense because mini-scheduler is really "optional" and any error can be safely ignored there (and session rolled back).
   
   ```
           except OperationalError as e:
               # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
               self.log.info(
                   "Skipping mini scheduling run due to exception: %s",
                   e.statement,
                   exc_info=True,
               )
               session.rollback()
   ```
   
   But I think in this case, this woud not be appropriate. Inserting TaskReschedule in this case is not optional, so I prefer the error to propagate up to the top and FAIL the task instead (that's what would happen if we just let the exception propagate). I think this is a good behaviour. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1030853621


   How I got to that: 
   
   Query 1 (just a fragment of it) - this is scheduler trying to get the task instances to consider for scheduling: 
   
   ```
   Failed to execute task (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
   [SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id,
    task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS
   dag_run_1.dag_hash AS dag_run_1_dag_hash
   FROM task_instance INNER JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
   WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s
    LIMIT %s FOR UPDATE]
   ```
   
   This is this query (TRANSACTION 1 from the "server log"):
   
   ```
   SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id,
   task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date 
   AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS 
   .... 
   ```
   
   Query 2: This is is TaskReschedule insert which only happens (as far as I checked) when AirlfowRescheduleException is thrown during task execution (TRANSACTION 2 from the server log):
   
   ```
   INSERT INTO task_reschedule (task_id, dag_id, run_id, try_number, start_date, end_date, duration, reschedule_date)
   VALUES ('raw_sensor.raw_input_shop.ipad_orders_from_popup_store', 'dwh_core.main',
   'scheduled__2022-01-27T00:15:00+00:00', 1, '2022-01-28 02:16:08.000687', '2022-01-28 02:16:08.142040', 0.141353e0, '2022-01-28 02:17:08.129220')
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1030909570


   > You most probably did not ment me. Ephraim... T.Ephraim perhaps or so .. please double check who you tag!
   
   Yep. sorry. Happens I meant @ephraimbuddy . Fee free to mute that discussion 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraim commented on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
ephraim commented on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1031617221


   > > You most probably did not ment me. Ephraim... T.Ephraim perhaps or so .. please double check who you tag!
   > 
   > Yep. sorry. Happens I meant @ephraimbuddy . Fee free to mute that discussion
   
   Then please don't add me again! By still using @ephraim!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraim commented on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
ephraim commented on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1030907422


   You most probably did not ment me. Ephraim... T.Ephraim perhaps or so .. please double check who you tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1030849164


   @ashb @ephraim @uranusjr @BasPH  - I think I have finally found  - thanks to the "server log" provided by our user - the reason for the deadlock that has been plaguing some users (also the nature of the scenario that it is caused by explains why it is so rare and why only some users experience it) . Please take a look.
   
   The gist with log that led me to this hypothesis/solution is here:  https://github.com/apache/airflow/issues/16982#issuecomment-1024136484


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #21362:
URL: https://github.com/apache/airflow/pull/21362


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#discussion_r800911499



##########
File path: airflow/models/taskinstance.py
##########
@@ -1671,11 +1671,24 @@ def _handle_reschedule(
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()

Review comment:
       I think we should always obtain a lock here. And I think the only error that can happen here is the same as in Scheduler or Mini-Scheduler (we already do exactly the same call there). The error that might happen here is if we cannot obtain the lock for a "long time" I think. Which should not happen usually unless someone locks and does not free the lock (manual DB query). 
   
   I believe one_or_none() is not good here. We insert `TaskReschedule` which (as I understand) MUST have a corresponding entry in `DagRun`. So if we try to insert `TaskReschedule` that will not have a `DagRun`, it will fail because of the relationship with `DagRun` and inability to have foreign key to it. So if we have "none" returned here we are in a deep trouble. 
   
   Also, the whole point of this operation is to obtaine the lock on DagRun (same as scheduler and mini-scheduler do) so that any operations on the related task instances are "protected" from others who want to modify the same DagRun  (and wait for the lock we already have).
   
   I just literally copy&pasted it from there
   
   ```
       @provide_session
       @Sentry.enrich_errors
       def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
           try:
               # Re-select the row with a lock
               dag_run = with_row_locks(
                   session.query(DagRun).filter_by(
                       dag_id=self.dag_id,
                       run_id=self.task_instance.run_id,
                   ),
                   session=session,
               ).one()
   ```
   
   In mini scheduler we do try/except OperationalError , but I think this only makes sense because mini-scheduler is really "optional" and any error can be safely ignored there (and session rolled back).
   
   ```
           except OperationalError as e:
               # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
               self.log.info(
                   "Skipping mini scheduling run due to exception: %s",
                   e.statement,
                   exc_info=True,
               )
               session.rollback()
   ```
   
   But I think in this case, this woud not be appropriate. Inserting TaskReschedule in this case is not optional, so I prefer the error to propagate up to the top and FAIL the task instead (that's what would happen if we just let the exception propagate). I think this is a good behaviour. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#discussion_r800911499



##########
File path: airflow/models/taskinstance.py
##########
@@ -1671,11 +1671,24 @@ def _handle_reschedule(
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()

Review comment:
       I think we should always obtain a lock here. And I think the only error that can happen here is the same as in Scheduler or Mini-Scheduler (we already do exactly the same call there). The error that might happen here is if we cannot obtain the lock for a "long time" I think. Which should not happen usually unless someone locks and does not free the lock (manual DB query). 
   
   Also I think one_or_none() is not good here. We insert TaskReschedule which (as I understand) MUST have a corresponding entry in `DagRun`. So if we try to insert TaskReschedule that will not have a dagrun, it will fail because of the relationship with `DagRun` and inability to have foreign key to it. So if we have "none" returned here we are in a deep trouble. 
   
   Also, the whole point of this operation is to obtaine the lock on DagRun (same as scheduler and mini-scheduler do) so that any operations on the related task instances are "protected" from others who want to obtain the same DagRun row (and wait for the same lock).
   
   I just literally copy&pasted it from there
   
   ```
       @provide_session
       @Sentry.enrich_errors
       def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
           try:
               # Re-select the row with a lock
               dag_run = with_row_locks(
                   session.query(DagRun).filter_by(
                       dag_id=self.dag_id,
                       run_id=self.task_instance.run_id,
                   ),
                   session=session,
               ).one()
   ```
   
   In mini scheduler we do try/except OperationalError , but I think this only makes sense because mini-scheduler is really "optional" and any error can be safely ignored there (and session rolled back).
   
   ```
           except OperationalError as e:
               # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
               self.log.info(
                   "Skipping mini scheduling run due to exception: %s",
                   e.statement,
                   exc_info=True,
               )
               session.rollback()
   ```
   
   But I think in this case, this woud not be appropriate. Inserting TaskReschedule in this case is not optional, so I prefer the error to propagate up to the top and FAIL the task instead (that's what woudl happen if we just let the exception propagate). I think this is a good behaviour. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1031593645


   @ashb @ephraim @uranusjr @jedcunningham - keen look on that might be useful, that might be cool to get that one before 2.2.4 (if we have a consensus that this one looks like a plausible explanation + fix). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#discussion_r800911499



##########
File path: airflow/models/taskinstance.py
##########
@@ -1671,11 +1671,24 @@ def _handle_reschedule(
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()

Review comment:
       I think we should always obtain a lock here. And I think the only error that can happen here is the same as in Scheduler or Mini-Scheduler (we already do exactly the same call there). The error that might happen here is if we cannot obtain the lock for a "long time" I think. Which should not happen usually unless someone locks and does not free the lock (manual DB query). 
   
   Also I think one_or_none() is not good here. We insert TaskReschedule which (as I understand) MUST have a corresponding entry in `DagRun`. So if we try to insert TaskReschedule that will not have a dagrun, it will fail because of the relationship with `DagRun` and inability to have foreign key to it. So if we have "none" returned here we are in a deep trouble. 
   
   Also, the whole point of this operation is to obtaine the lock on DagRun (same as scheduler and mini-scheduler do) so that any operations on the related task instances are "protected" from others who want to modify the same DagRun  (and wait for the lock we already have).
   
   I just literally copy&pasted it from there
   
   ```
       @provide_session
       @Sentry.enrich_errors
       def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
           try:
               # Re-select the row with a lock
               dag_run = with_row_locks(
                   session.query(DagRun).filter_by(
                       dag_id=self.dag_id,
                       run_id=self.task_instance.run_id,
                   ),
                   session=session,
               ).one()
   ```
   
   In mini scheduler we do try/except OperationalError , but I think this only makes sense because mini-scheduler is really "optional" and any error can be safely ignored there (and session rolled back).
   
   ```
           except OperationalError as e:
               # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
               self.log.info(
                   "Skipping mini scheduling run due to exception: %s",
                   e.statement,
                   exc_info=True,
               )
               session.rollback()
   ```
   
   But I think in this case, this woud not be appropriate. Inserting TaskReschedule in this case is not optional, so I prefer the error to propagate up to the top and FAIL the task instead (that's what woudl happen if we just let the exception propagate). I think this is a good behaviour. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#discussion_r800911499



##########
File path: airflow/models/taskinstance.py
##########
@@ -1671,11 +1671,24 @@ def _handle_reschedule(
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()

Review comment:
       I think we should always obtain a lock here. And I think the only error that can happen here is the same as in Scheduler or Mini-Scheduler (we already do exactly the same call there). The error that might happen here is if we cannot obtain the lock for a "long time" I think. Which should not happen usually unless someone locks and does not free the lock (manual DB query). 
   
   I believe one_or_none() is not good here. We insert `TaskReschedule` which (as I understand) MUST have a corresponding entry in `DagRun`. So if we try to insert `TaskReschedule` that will not have a `DagRun`, it will fail because of the relationship with `DagRun` and inability to have foreign key to it. So if we have "none" returned here we are in a deep, deep trouble. 
   
   Also, the whole point of this operation is to obtain the lock on DagRun (same as scheduler and mini-scheduler do) so that any operations on the related task instances are "protected" from others who want to modify the same DagRun  (and wait for the lock we already have).
   
   I just literally copy&pasted it from there
   
   ```
       @provide_session
       @Sentry.enrich_errors
       def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
           try:
               # Re-select the row with a lock
               dag_run = with_row_locks(
                   session.query(DagRun).filter_by(
                       dag_id=self.dag_id,
                       run_id=self.task_instance.run_id,
                   ),
                   session=session,
               ).one()
   ```
   
   In mini scheduler we do try/except OperationalError , but I think this only makes sense because mini-scheduler is really "optional" and any error can be safely ignored there (and session rolled back).
   
   ```
           except OperationalError as e:
               # Any kind of DB error here is _non fatal_ as this block is just an optimisation.
               self.log.info(
                   "Skipping mini scheduling run due to exception: %s",
                   e.statement,
                   exc_info=True,
               )
               session.rollback()
   ```
   
   But I think in this case, this woud not be appropriate. Inserting TaskReschedule in this case is not optional, so I prefer the error to propagate up to the top and FAIL the task instead (that's what would happen if we just let the exception propagate). I think this is a good behaviour. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1031593645






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1031777303


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#issuecomment-1030853621


   How I got to that: 
   
   Query 1 (just a fragment of it) - this is scheduler trying to get the task instances to consider for scheduling: 
   
   ```
   Failed to execute task (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
   [SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id,
    task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS
   dag_run_1.dag_hash AS dag_run_1_dag_hash
   FROM task_instance INNER JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
   WHERE task_instance.dag_id = %s AND task_instance.task_id = %s AND task_instance.run_id = %s
    LIMIT %s FOR UPDATE]
   ```
   
   This is this query (TRANSACTION 1) from the "server log":
   
   ```
   SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id,
   task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date 
   AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS 
   .... 
   ```
   
   Query 2: This is is TaskReschedule insert which only happens (as far as I checked) when AirlfowRescheduleException is thrown during task execution (TRANSACTION 2 from the server log):
   
   ```
   INSERT INTO task_reschedule (task_id, dag_id, run_id, try_number, start_date, end_date, duration, reschedule_date)
   VALUES ('raw_sensor.raw_input_shop.ipad_orders_from_popup_store', 'dwh_core.main',
   'scheduled__2022-01-27T00:15:00+00:00', 1, '2022-01-28 02:16:08.000687', '2022-01-28 02:16:08.142040', 0.141353e0, '2022-01-28 02:17:08.129220')
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #21362: Avoid deadlock when rescheduling task

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #21362:
URL: https://github.com/apache/airflow/pull/21362#discussion_r800810004



##########
File path: airflow/models/taskinstance.py
##########
@@ -1671,11 +1671,24 @@ def _handle_reschedule(
         # Don't record reschedule request in test mode
         if test_mode:
             return
+
+        from airflow.models.dagrun import DagRun  # Avoid circular import
+
         self.refresh_from_db(session)
 
         self.end_date = timezone.utcnow()
         self.set_duration()
 
+        # Lock DAG run to be sure not to get into a deadlock situation when trying to insert
+        # TaskReschedule which apparently also creates lock on corresponding DagRun entity
+        with_row_locks(
+            session.query(DagRun).filter_by(
+                dag_id=self.dag_id,
+                run_id=self.run_id,
+            ),
+            session=session,
+        ).one()

Review comment:
       I think we can use `one_or_none()` instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org