You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/02 03:34:08 UTC

[airflow] branch main updated: Retry on Airflow Schedule DAG Run DB Deadlock (#26347)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0da4993500 Retry on Airflow Schedule DAG Run DB Deadlock (#26347)
0da4993500 is described below

commit 0da49935000476b1d1941b63d0d66d3c58d64fea
Author: Anthony Panat <ap...@ycharts.com>
AuthorDate: Sat Oct 1 23:33:59 2022 -0400

    Retry on Airflow Schedule DAG Run DB Deadlock (#26347)
    
    
    
    Co-authored-by: Anthony Panat <an...@Anthonys-MacBook-Pro-2.local>
    Co-authored-by: Anthony Panat <an...@anthonys-mbp-2.mynetworksettings.com>
---
 airflow/jobs/scheduler_job.py | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 53a96cf9ac..74541a1cf8 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -943,12 +943,7 @@ class SchedulerJob(BaseJob):
             # Bulk fetch the currently active dag runs for the dags we are
             # examining, rather than making one query per DagRun
 
-            callback_tuples = []
-            for dag_run in dag_runs:
-                callback_to_run = self._schedule_dag_run(dag_run, session)
-                callback_tuples.append((dag_run, callback_to_run))
-
-            guard.commit()
+            callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
 
         # Send the callbacks after we commit to ensure the context is up to date when it gets run
         for dag_run, callback_to_run in callback_tuples:
@@ -1232,6 +1227,18 @@ class SchedulerJob(BaseJob):
                 active_runs_of_dags[dag_run.dag_id] += 1
                 _update_state(dag, dag_run)
 
+    @retry_db_transaction
+    def _schedule_all_dag_runs(self, guard, dag_runs, session):
+        """Makes scheduling decisions for all `dag_runs`"""
+        callback_tuples = []
+        for dag_run in dag_runs:
+            callback_to_run = self._schedule_dag_run(dag_run, session)
+            callback_tuples.append((dag_run, callback_to_run))
+
+        guard.commit()
+
+        return callback_tuples
+
     def _schedule_dag_run(
         self,
         dag_run: DagRun,