You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/01 13:57:39 UTC

[GitHub] [airflow] ashb commented on a change in pull request #16182: Do not queue tasks when the DAG file goes missing

ashb commented on a change in pull request #16182:
URL: https://github.com/apache/airflow/pull/16182#discussion_r643122857



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []
+        for ti in task_instances:
+            try:
+                dag = self.dagbag.get_dag(ti.dag_id, session=session)
+            except SerializedDagNotFound:
+                tis.append(ti)
+                continue
+            if os.path.exists(dag.fileloc):
+                tis.append(ti)
+            else:
+                dagrun = dag.get_dagrun(execution_date=ti.execution_date, session=session)
+                if ti.state not in State.finished:
+                    ti.set_state(State.FAILED, session=session)

Review comment:
       There is a "removed" state that would perhaps be more appropriate here.
   
   (Though the removed state is not in the `State.finished`, and probably should be.)

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []
+        for ti in task_instances:
+            try:
+                dag = self.dagbag.get_dag(ti.dag_id, session=session)
+            except SerializedDagNotFound:
+                tis.append(ti)
+                continue

Review comment:
       Loading the full dag is an expensive operation -- nothing in here actually needs the DAG object, so I think this calls for a new method on the DagBag `has_dag` -- that first checks if the dag exist in it's local cache, and then if not checks for the DB row existing, but crucially, it doesn't actually load the full row!

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []
+        for ti in task_instances:
+            try:
+                dag = self.dagbag.get_dag(ti.dag_id, session=session)
+            except SerializedDagNotFound:
+                tis.append(ti)
+                continue
+            if os.path.exists(dag.fileloc):
+                tis.append(ti)
+            else:
+                dagrun = dag.get_dagrun(execution_date=ti.execution_date, session=session)

Review comment:
       No need to use the dag object here.
   
   ```suggestion
                   dagrun = ti.get_dagrun(session)
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.

Review comment:
       ```suggestion
   
           Return task instances that exists in SerializedDag table as well as dags folder.
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -901,6 +901,33 @@ def __get_concurrency_maps(
             task_map[(dag_id, task_id)] = count
         return dag_map, task_map
 
+    def _remove_tis_with_missing_dag(self, task_instances, session=None):
+        """
+        Fail task instances and the corresponding DagRun if the dag can't be found in
+        the dags folder but exists in SerializedDag table.
+        Return task instances that exists in SerializedDag table as well as dags folder.
+        If the dag can't be found in DagBag, just return the task instance. This is common in
+        unittest where subdir is os.devnull
+        """
+        tis = []
+        for ti in task_instances:
+            try:
+                dag = self.dagbag.get_dag(ti.dag_id, session=session)
+            except SerializedDagNotFound:
+                tis.append(ti)
+                continue
+            if os.path.exists(dag.fileloc):
+                tis.append(ti)
+            else:
+                dagrun = dag.get_dagrun(execution_date=ti.execution_date, session=session)
+                if ti.state not in State.finished:
+                    ti.set_state(State.FAILED, session=session)
+                    self.log.error("Failing task: %s because DAG: %s is missing", ti.task_id, ti.dag_id)
+                if dagrun.state not in State.finished:
+                    dagrun.set_state(State.FAILED)

Review comment:
       If the final tasks in a DagRun are still actually running, it probably doesn't make sense to instantly fail this, as this run could still end up completing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org