You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/29 19:46:11 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #13920: Bugfix: Don't try to create a duplicate Dag Run in Scheduler

kaxil commented on a change in pull request #13920:
URL: https://github.com/apache/airflow/pull/13920#discussion_r567055461



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1571,16 +1585,25 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
                 continue
 
             dag_hash = self.dagbag.dags_hash.get(dag.dag_id)
-            dag.create_dagrun(
-                run_type=DagRunType.SCHEDULED,
-                execution_date=dag_model.next_dagrun,
-                start_date=timezone.utcnow(),
-                state=State.RUNNING,
-                external_trigger=False,
-                session=session,
-                dag_hash=dag_hash,
-                creating_job_id=self.id,
-            )
+            # Explicitly check if the DagRun already exists. This is an edge case
+            # where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after`
+            # are not updated.
+            # We opted to check DagRun existence instead
+            # of catching an Integrity error and rolling back the session i.e
+            # we need to run self._update_dag_next_dagruns if the Dag Run already exists or if we
+            # create a new one. This is so that in the next Scheduling loop we try to create new runs
+            # instead of falling in a loop of Integrity Error.
+            if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:

Review comment:
       `dag_model.next_dagrun` isn't updated so it will still have the old value which is what we want to check -- that we don't try to recreate the same dag run otherwise it will error with Unique Constraint Violation error




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org