You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/01/31 16:51:38 UTC

[airflow] branch v2-2-test updated: Fix Scheduler crash when executing task instances of missing DAG (#20349)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-2-test by this push:
     new 9cdc3e6  Fix Scheduler crash when executing task instances of missing DAG (#20349)
9cdc3e6 is described below

commit 9cdc3e6b4618651a3f638345664c53ed4a3b7458
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Jan 13 22:23:10 2022 +0100

    Fix Scheduler crash when executing task instances of missing DAG (#20349)
    
    When executing task instances, we do not check if the dag is missing in
    the dagbag. This PR fixes it by ignoring task instances if we can't find
    the dag in serialized dag table
    
    Closes: #20099
    (cherry picked from commit 98715760f72e5205c291293088b5e79636884491)
---
 airflow/jobs/scheduler_job.py    | 11 +++++++++++
 tests/jobs/test_scheduler_job.py | 28 ++++++++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 2fedf80..490d507 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -375,6 +375,17 @@ class SchedulerJob(BaseJob):
                     # Many dags don't have a task_concurrency, so where we can avoid loading the full
                     # serialized DAG the better.
                     serialized_dag = self.dagbag.get_dag(dag_id, session=session)
+                    # If the dag is missing, fail the task and continue to the next task.
+                    if not serialized_dag:
+                        self.log.error(
+                            "DAG '%s' for task instance %s not found in serialized_dag table",
+                            dag_id,
+                            task_instance,
+                        )
+                        session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update(
+                            {TI.state: State.FAILED}, synchronize_session='fetch'
+                        )
+                        continue
                     if serialized_dag.has_task(task_instance.task_id):
                         task_concurrency_limit = serialized_dag.get_task(
                             task_instance.task_id
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index a308029..7185720 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -610,6 +610,34 @@ class TestSchedulerJob:
         session.rollback()
         session.close()
 
+    def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
+        """Check that task instances of missing DAGs are failed"""
+        dag_id = 'SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag'
+        task_id_1 = 'dummy'
+        task_id_2 = 'dummydummy'
+
+        with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 1}):
+            DummyOperator(task_id=task_id_1)
+            DummyOperator(task_id=task_id_2)
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.dagbag = mock.MagicMock()
+        self.scheduler_job.dagbag.get_dag.return_value = None
+
+        dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+        tis = dr.task_instances
+        for ti in tis:
+            ti.state = State.SCHEDULED
+            session.merge(ti)
+        session.flush()
+        res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
+        session.flush()
+        assert 0 == len(res)
+        tis = dr.get_task_instances(session=session)
+        assert len(tis) == 2
+        assert all(ti.state == State.FAILED for ti in tis)
+
     def test_nonexistent_pool(self, dag_maker):
         dag_id = 'SchedulerJobTest.test_nonexistent_pool'
         with dag_maker(dag_id=dag_id, max_active_tasks=16):