You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/08/15 18:44:46 UTC

[airflow] 10/45: Fix zombie task handling with multiple schedulers (#24906)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9b6df301ac9e772bdf257a6e3b6ead25c703ff2f
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Jul 8 10:49:12 2022 -0600

    Fix zombie task handling with multiple schedulers (#24906)
    
    Each scheduler was looking at all running tasks for zombies, leading to
    multiple schedulers handling the zombies. This causes problems with
    retries (e.g. being marked as FAILED instead of UP_FOR_RETRY) and
    callbacks (e.g. `on_failure_callback` being called multiple times).
    
    When the second scheduler tries to determine if the task is able to be retried,
    and it's already in UP_FOR_RETRY (the first scheduler already finished),
    it sees the "next" try_number (as it's no longer running),
    which then leads it to be FAILED instead.
    
    The easy fix is to simply restrict each scheduler to its own TIs, as
    orphaned running TIs will be adopted anyways.
    
    (cherry picked from commit 1c0d0a5d907ae447b7221200952b47b69f8f8e87)
---
 airflow/jobs/scheduler_job.py    |  4 +++-
 tests/jobs/test_scheduler_job.py | 35 +++++++++++++++++++++++------------
 2 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 8e22065618..539e6f1eff 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1336,7 +1336,8 @@ class SchedulerJob(BaseJob):
     def _find_zombies(self, session):
         """
         Find zombie task instances, which are tasks haven't heartbeated for too long
-        and update the current zombie list.
+        or have a no-longer-running LocalTaskJob, and send them off to the DAG processor
+        to be handled.
         """
         self.log.debug("Finding 'running' jobs without a recent heartbeat")
         limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
@@ -1352,6 +1353,7 @@ class SchedulerJob(BaseJob):
                     LocalTaskJob.latest_heartbeat < limit_dttm,
                 )
             )
+            .filter(TaskInstance.queued_by_job_id == self.id)
             .all()
         )
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 6027867ab9..dfd66770c0 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3890,7 +3890,6 @@ class TestSchedulerJob:
             session.query(LocalTaskJob).delete()
             dag = dagbag.get_dag('example_branch_operator')
             dag.sync_to_db()
-            task = dag.get_task(task_id='run_this_first')
 
             dag_run = dag.create_dagrun(
                 state=DagRunState.RUNNING,
@@ -3899,21 +3898,33 @@ class TestSchedulerJob:
                 session=session,
             )
 
-            ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
-            local_job = LocalTaskJob(ti)
-            local_job.state = State.SHUTDOWN
-
-            session.add(local_job)
-            session.flush()
-
-            ti.job_id = local_job.id
-            session.add(ti)
-            session.flush()
-
             self.scheduler_job = SchedulerJob(subdir=os.devnull)
             self.scheduler_job.executor = MockExecutor()
             self.scheduler_job.processor_agent = mock.MagicMock()
 
+            # We will provision 2 tasks so we can check we only find zombies from this scheduler
+            tasks_to_setup = ['branching', 'run_this_first']
+
+            for task_id in tasks_to_setup:
+                task = dag.get_task(task_id=task_id)
+                ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
+                ti.queued_by_job_id = 999
+
+                local_job = LocalTaskJob(ti)
+                local_job.state = State.SHUTDOWN
+
+                session.add(local_job)
+                session.flush()
+
+                ti.job_id = local_job.id
+                session.add(ti)
+                session.flush()
+
+            assert task.task_id == 'run_this_first'  # Make sure we have the task/ti we expect
+
+            ti.queued_by_job_id = self.scheduler_job.id
+            session.flush()
+
             self.scheduler_job._find_zombies(session=session)
 
             self.scheduler_job.executor.callback_sink.send.assert_called_once()