You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/10/02 03:34:08 UTC
[airflow] branch main updated: Retry on Airflow Schedule DAG Run DB Deadlock (#26347)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0da4993500 Retry on Airflow Schedule DAG Run DB Deadlock (#26347)
0da4993500 is described below
commit 0da49935000476b1d1941b63d0d66d3c58d64fea
Author: Anthony Panat <ap...@ycharts.com>
AuthorDate: Sat Oct 1 23:33:59 2022 -0400
Retry on Airflow Schedule DAG Run DB Deadlock (#26347)
Co-authored-by: Anthony Panat <an...@Anthonys-MacBook-Pro-2.local>
Co-authored-by: Anthony Panat <an...@anthonys-mbp-2.mynetworksettings.com>
---
airflow/jobs/scheduler_job.py | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 53a96cf9ac..74541a1cf8 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -943,12 +943,7 @@ class SchedulerJob(BaseJob):
# Bulk fetch the currently active dag runs for the dags we are
# examining, rather than making one query per DagRun
- callback_tuples = []
- for dag_run in dag_runs:
- callback_to_run = self._schedule_dag_run(dag_run, session)
- callback_tuples.append((dag_run, callback_to_run))
-
- guard.commit()
+ callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
# Send the callbacks after we commit to ensure the context is up to date when it gets run
for dag_run, callback_to_run in callback_tuples:
@@ -1232,6 +1227,18 @@ class SchedulerJob(BaseJob):
active_runs_of_dags[dag_run.dag_id] += 1
_update_state(dag, dag_run)
+ @retry_db_transaction
+ def _schedule_all_dag_runs(self, guard, dag_runs, session):
+ """Makes scheduling decisions for all `dag_runs`"""
+ callback_tuples = []
+ for dag_run in dag_runs:
+ callback_to_run = self._schedule_dag_run(dag_run, session)
+ callback_tuples.append((dag_run, callback_to_run))
+
+ guard.commit()
+
+ return callback_tuples
+
def _schedule_dag_run(
self,
dag_run: DagRun,