You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/05/31 21:28:58 UTC

[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

ephraimbuddy commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r642681719



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,32 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    # pylint: disable=too-many-nested-blocks
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []
+        for ti in task_instances:
+            try:
+                dag = self.dagbag.get_dag(ti.dag_id, session=session)
+                if os.path.exists(dag.fileloc):
+                    tis.append(ti)
+                else:
+                    dagrun = dag.get_dagrun(execution_date=ti.execution_date, session=session)
+                    if ti.state not in State.finished:
+                        ti.set_state(State.FAILED, session=session)
+                        self.log.error("Failing task: %s because DAG: %s is missing", ti.task_id, ti.dag_id)
+                    if dagrun.state not in State.finished:
+                        dagrun.set_state(State.FAILED)
+            except SerializedDagNotFound:
+                tis.append(ti)
+        return tis

Review comment:
       Ok. I will try that.
   The `SerializedDagNotFound` is raised at `dag = self.dagbag.get_dag(ti.dag_id, session=session)`. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org