You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/02/17 21:39:08 UTC

[airflow] 17/43: Fix Scheduler crash when executing task instances of missing DAG (#20349)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1b139a77d012971d147bffc74646923514db4b48
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Jan 13 22:23:10 2022 +0100

    Fix Scheduler crash when executing task instances of missing DAG (#20349)
    
    When executing task instances, we do not check if the dag is missing in
    the dagbag. This PR fixes it by ignoring task instances if we can't find
    the dag in serialized dag table
    
    Closes: #20099
    (cherry picked from commit 98715760f72e5205c291293088b5e79636884491)
---
 airflow/jobs/scheduler_job.py    | 11 +++++++++++
 tests/jobs/test_scheduler_job.py | 28 ++++++++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2fedf80..490d507 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -375,6 +375,17 @@ class SchedulerJob(BaseJob):
                     # Many dags don't have a task_concurrency, so where we can avoid loading the full
                     # serialized DAG the better.
                     serialized_dag = self.dagbag.get_dag(dag_id, session=session)
+                    # If the dag is missing, fail the task and continue to the next task.
+                    if not serialized_dag:
+                        self.log.error(
+                            "DAG '%s' for task instance %s not found in serialized_dag table",
+                            dag_id,
+                            task_instance,
+                        )
+                        session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update(
+                            {TI.state: State.FAILED}, synchronize_session='fetch'
+                        )
+                        continue
                     if serialized_dag.has_task(task_instance.task_id):
                         task_concurrency_limit = serialized_dag.get_task(
                             task_instance.task_id
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index a308029..7185720 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -610,6 +610,34 @@ class TestSchedulerJob:
         session.rollback()
         session.close()
 
+    def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
+        """Check that task instances of missing DAGs are failed"""
+        dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag'
+        task_id_1 = 'dummy'
+        task_id_2 = 'dummydummy'
+
+        with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 1}):
+            DummyOperator(task_id=task_id_1)
+            DummyOperator(task_id=task_id_2)
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.dagbag = mock.MagicMock()
+        self.scheduler_job.dagbag.get_dag.return_value = None
+
+        dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+        tis = dr.task_instances
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        session.flush()
+        assert 0 == len(res)
+        tis = dr.get_task_instances(session=session)
+        assert len(tis) == 2
+        assert all(ti.state == State.FAILED for ti in tis)
+
     def test_nonexistent_pool(self, dag_maker):
         dag_id = 'SchedulerJobTest.test_nonexistent_pool'
         with dag_maker(dag_id=dag_id, max_active_tasks=16):