You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/06 07:25:10 UTC

[GitHub] [airflow] uranusjr commented on a diff in pull request #22701: Fixed backfill interference with scheduler

uranusjr commented on code in PR #22701:
URL: https://github.com/apache/airflow/pull/22701#discussion_r843561802


##########
airflow/jobs/backfill_job.py:
##########
@@ -782,6 +784,28 @@ def _execute(self, session=None):
                 return
             dagrun_infos = [DagRunInfo.interval(dagrun_start_date, dagrun_end_date)]
 
+        dag_with_subdags_ids = [d.dag_id for d in self._get_dag_with_subdags()]
+        running_dagruns = DagRun.find(
+            dag_id=dag_with_subdags_ids,
+            execution_start_date=self.bf_start_date,
+            execution_end_date=self.bf_end_date,
+            no_backfills=True,
+            state=State.RUNNING,
+        )
+
+        if running_dagruns:
+            for run in running_dagruns:
+                self.log.error(
+                    "Backfill cannot be created for DagRun %s in %s, as there's already %s in a RUNNING "
+                    "state. Changing DagRun into BACKFILL would cause scheduler to lose track of executing "
+                    "tasks. Not changing DagRun type into BACKFILL, and trying insert another DagRun into "
+                    "database would cause database constraint violation for dag_id + execution_date "
+                    "combination. Please adjust backfill dates or wait for this DagRun to finish.",
+                    run.dag_id,
+                    run.execution_date.strftime("%Y-%m-%dT%H:%M:%S"),
+                    run.run_type,
+                )

Review Comment:
   Should we only log one error for all runs, or must we log one for each?



##########
airflow/jobs/backfill_job.py:
##########
@@ -782,6 +784,28 @@ def _execute(self, session=None):
                 return
             dagrun_infos = [DagRunInfo.interval(dagrun_start_date, dagrun_end_date)]
 
+        dag_with_subdags_ids = [d.dag_id for d in self._get_dag_with_subdags()]
+        running_dagruns = DagRun.find(
+            dag_id=dag_with_subdags_ids,
+            execution_start_date=self.bf_start_date,
+            execution_end_date=self.bf_end_date,
+            no_backfills=True,
+            state=State.RUNNING,

Review Comment:
   Please prefer `DagRunState`. Same applies for other changes. (For `State.NONE`, simply use `None` instead.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org