You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/02/10 09:42:48 UTC

[airflow] branch main updated: Move Zombie detection to SchedulerJob (#21181)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0abee18  Move Zombie detection to SchedulerJob (#21181)
0abee18 is described below

commit 0abee18a7d511e490546009cdf2690dec2de2170
Author: mhenc <mh...@google.com>
AuthorDate: Thu Feb 10 10:41:59 2022 +0100

    Move Zombie detection to SchedulerJob (#21181)
---
 airflow/config_templates/config.yml          |   7 ++
 airflow/config_templates/default_airflow.cfg |   3 +
 airflow/dag_processing/manager.py            |  58 +----------
 airflow/jobs/scheduler_job.py                |  44 ++++++++
 airflow/models/dagbag.py                     |   1 -
 tests/dag_processing/test_manager.py         | 149 +--------------------------
 tests/jobs/test_scheduler_job.py             | 113 +++++++++++++++++++-
 7 files changed, 168 insertions(+), 207 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 8ef738f..83ea6b9 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1850,6 +1850,13 @@
       type: string
       example: ~
       default: "300"
+    - name: zombie_detection_interval
+      description: |
+        How often (in seconds) should the scheduler check for zombie tasks.
+      version_added: 2.3.0
+      type: float
+      example: ~
+      default: "10.0"
     - name: catchup_by_default
       description: |
         Turn off scheduler catchup by setting this to ``False``.
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 520ab44..55161a55 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -923,6 +923,9 @@ child_process_log_directory = {AIRFLOW_HOME}/logs/scheduler
 # associated task instance as failed and will re-schedule the task.
 scheduler_zombie_task_threshold = 300
 
+# How often (in seconds) should the scheduler check for zombie tasks.
+zombie_detection_interval = 10.0
+
 # Turn off scheduler catchup by setting this to ``False``.
 # Default behavior is unchanged and
 # Command Line Backfills still work, but the scheduler
diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 3b8a998..33b219c 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -34,7 +34,6 @@ from multiprocessing.connection import Connection as MultiprocessingConnection
 from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Union, cast
 
 from setproctitle import setproctitle
-from sqlalchemy import or_
 from tabulate import tabulate
 
 import airflow.models
@@ -42,17 +41,15 @@ from airflow.configuration import conf
 from airflow.dag_processing.processor import DagFileProcessorProcess
 from airflow.models import DagModel, errors
 from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import SimpleTaskInstance
 from airflow.stats import Stats
 from airflow.utils import timezone
-from airflow.utils.callback_requests import CallbackRequest, SlaCallbackRequest, TaskCallbackRequest
+from airflow.utils.callback_requests import CallbackRequest, SlaCallbackRequest
 from airflow.utils.file import list_py_file_paths, might_contain_dag
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.mixins import MultiprocessingStartMethodMixin
 from airflow.utils.net import get_hostname
 from airflow.utils.process_utils import kill_child_processes_by_pids, reap_process_group
 from airflow.utils.session import provide_session
-from airflow.utils.state import State
 
 if TYPE_CHECKING:
     import pathlib
@@ -434,8 +431,6 @@ class DagFileProcessorManager(LoggingMixin):
         # How often to print out DAG file processing stats to the log. Default to
         # 30 seconds.
         self.print_stats_interval = conf.getint('scheduler', 'print_stats_interval')
-        # How many seconds do we wait for tasks to heartbeat before mark them as zombies.
-        self._zombie_threshold_secs = conf.getint('scheduler', 'scheduler_zombie_task_threshold')
 
         # Map from file path to the processor
         self._processors: Dict[str, DagFileProcessorProcess] = {}
@@ -445,13 +440,10 @@ class DagFileProcessorManager(LoggingMixin):
         # Map from file path to stats about the file
         self._file_stats: Dict[str, DagFileStat] = {}
 
-        self._last_zombie_query_time = None
         # Last time that the DAG dir was traversed to look for files
         self.last_dag_dir_refresh_time = timezone.make_aware(datetime.fromtimestamp(0))
         # Last time stats were printed
         self.last_stat_print_time = 0
-        # TODO: Remove magic number
-        self._zombie_query_interval = 10
         # How long to wait before timing out a process to parse a DAG file
         self._processor_timeout = processor_timeout
 
@@ -566,7 +558,6 @@ class DagFileProcessorManager(LoggingMixin):
                 self._processors.pop(processor.file_path)
 
             self._refresh_dag_dir()
-            self._find_zombies()
 
             self._kill_timed_out_processors()
 
@@ -1023,53 +1014,6 @@ class DagFileProcessorManager(LoggingMixin):
 
         self._file_path_queue.extend(files_paths_to_queue)
 
-    @provide_session
-    def _find_zombies(self, session):
-        """
-        Find zombie task instances, which are tasks haven't heartbeated for too long
-        and update the current zombie list.
-        """
-        now = timezone.utcnow()
-        if (
-            not self._last_zombie_query_time
-            or (now - self._last_zombie_query_time).total_seconds() > self._zombie_query_interval
-        ):
-            # to avoid circular imports
-            from airflow.jobs.local_task_job import LocalTaskJob as LJ
-
-            self.log.info("Finding 'running' jobs without a recent heartbeat")
-            TI = airflow.models.TaskInstance
-            DM = airflow.models.DagModel
-            limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
-
-            zombies = (
-                session.query(TI, DM.fileloc)
-                .join(LJ, TI.job_id == LJ.id)
-                .join(DM, TI.dag_id == DM.dag_id)
-                .filter(TI.state == State.RUNNING)
-                .filter(
-                    or_(
-                        LJ.state != State.RUNNING,
-                        LJ.latest_heartbeat < limit_dttm,
-                    )
-                )
-                .all()
-            )
-
-            if zombies:
-                self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)
-
-            self._last_zombie_query_time = timezone.utcnow()
-            for ti, file_loc in zombies:
-                request = TaskCallbackRequest(
-                    full_filepath=file_loc,
-                    simple_task_instance=SimpleTaskInstance(ti),
-                    msg=f"Detected {ti} as zombie",
-                )
-                self.log.error("Detected zombie job: %s", request)
-                self._add_callback_to_queue(request)
-                Stats.incr('zombies_killed')
-
     def _kill_timed_out_processors(self):
         """Kill any file processors that timeout to defend against process hangs."""
         now = timezone.utcnow()
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 7a6e3ef..6211655 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -38,6 +38,7 @@ from airflow.configuration import conf
 from airflow.dag_processing.manager import DagFileProcessorAgent
 from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
 from airflow.jobs.base_job import BaseJob
+from airflow.jobs.local_task_job import LocalTaskJob
 from airflow.models import DAG
 from airflow.models.dag import DagModel
 from airflow.models.dagbag import DagBag
@@ -123,6 +124,8 @@ class SchedulerJob(BaseJob):
             )
             scheduler_idle_sleep_time = processor_poll_interval
         self._scheduler_idle_sleep_time = scheduler_idle_sleep_time
+        # How many seconds do we wait for tasks to heartbeat before mark them as zombies.
+        self._zombie_threshold_secs = conf.getint('scheduler', 'scheduler_zombie_task_threshold')
 
         self.do_pickle = do_pickle
         super().__init__(*args, **kwargs)
@@ -739,6 +742,11 @@ class SchedulerJob(BaseJob):
             self._emit_pool_metrics,
         )
 
+        timers.call_regular_interval(
+            conf.getfloat('scheduler', 'zombie_detection_interval', fallback=10.0),
+            self._find_zombies,
+        )
+
         for loop_count in itertools.count(start=1):
             with Stats.timer() as timer:
 
@@ -1259,3 +1267,39 @@ class SchedulerJob(BaseJob):
         )
         if num_timed_out_tasks:
             self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)
+
+    @provide_session
+    def _find_zombies(self, session):
+        """
+        Find zombie task instances, which are tasks haven't heartbeated for too long
+        and update the current zombie list.
+        """
+        self.log.debug("Finding 'running' jobs without a recent heartbeat")
+        limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
+
+        zombies = (
+            session.query(TaskInstance, DagModel.fileloc)
+            .join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id)
+            .join(DagModel, TaskInstance.dag_id == DagModel.dag_id)
+            .filter(TaskInstance.state == State.RUNNING)
+            .filter(
+                or_(
+                    LocalTaskJob.state != State.RUNNING,
+                    LocalTaskJob.latest_heartbeat < limit_dttm,
+                )
+            )
+            .all()
+        )
+
+        if zombies:
+            self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)
+
+        for ti, file_loc in zombies:
+            request = TaskCallbackRequest(
+                full_filepath=file_loc,
+                simple_task_instance=SimpleTaskInstance(ti),
+                msg=f"Detected {ti} as zombie",
+            )
+            self.log.error("Detected zombie job: %s", request)
+            self.processor_agent.send_callback_to_execute(request)
+            Stats.incr('zombies_killed')
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index eac62dc..0136d7f 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -89,7 +89,6 @@ class DagBag(LoggingMixin):
     """
 
     DAGBAG_IMPORT_TIMEOUT = conf.getfloat('core', 'DAGBAG_IMPORT_TIMEOUT')
-    SCHEDULER_ZOMBIE_TASK_THRESHOLD = conf.getint('scheduler', 'scheduler_zombie_task_threshold')
 
     def __init__(
         self,
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index 5ea21a2..2746e59 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -45,17 +45,13 @@ from airflow.dag_processing.manager import (
     DagParsingStat,
 )
 from airflow.dag_processing.processor import DagFileProcessorProcess
-from airflow.jobs.local_task_job import LocalTaskJob as LJ
-from airflow.models import DagBag, DagModel, TaskInstance as TI, errors
+from airflow.models import DagBag, DagModel, errors
 from airflow.models.dagcode import DagCode
 from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import SimpleTaskInstance
 from airflow.utils import timezone
-from airflow.utils.callback_requests import CallbackRequest, TaskCallbackRequest
+from airflow.utils.callback_requests import CallbackRequest
 from airflow.utils.net import get_hostname
 from airflow.utils.session import create_session
-from airflow.utils.state import DagRunState, State
-from airflow.utils.types import DagRunType
 from tests.core.test_logging_config import SETTINGS_FILE_VALID, settings_context
 from tests.models import TEST_DAGS_FOLDER
 from tests.test_utils.config import conf_vars
@@ -455,147 +451,6 @@ class TestDagFileProcessorManager:
                 > (freezed_base_time - manager.get_last_finish_time("file_1.py")).total_seconds()
             )
 
-    def test_find_zombies(self):
-        manager = DagFileProcessorManager(
-            dag_directory='directory',
-            max_runs=1,
-            processor_timeout=timedelta.max,
-            signal_conn=MagicMock(),
-            dag_ids=[],
-            pickle_dags=False,
-            async_mode=True,
-        )
-
-        dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
-        with create_session() as session:
-            session.query(LJ).delete()
-            dag = dagbag.get_dag('example_branch_operator')
-            dag.sync_to_db()
-            task = dag.get_task(task_id='run_this_first')
-
-            dag_run = dag.create_dagrun(
-                state=DagRunState.RUNNING,
-                execution_date=DEFAULT_DATE,
-                run_type=DagRunType.SCHEDULED,
-                session=session,
-            )
-
-            ti = TI(task, run_id=dag_run.run_id, state=State.RUNNING)
-            local_job = LJ(ti)
-            local_job.state = State.SHUTDOWN
-
-            session.add(local_job)
-            session.flush()
-
-            ti.job_id = local_job.id
-            session.add(ti)
-            session.flush()
-
-            manager._last_zombie_query_time = timezone.utcnow() - timedelta(
-                seconds=manager._zombie_threshold_secs + 1
-            )
-            manager._find_zombies()
-            requests = manager._callback_to_execute[dag.fileloc]
-            assert 1 == len(requests)
-            assert requests[0].full_filepath == dag.fileloc
-            assert requests[0].msg == f"Detected {ti} as zombie"
-            assert requests[0].is_failure_callback is True
-            assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
-            assert ti.dag_id == requests[0].simple_task_instance.dag_id
-            assert ti.task_id == requests[0].simple_task_instance.task_id
-            assert ti.run_id == requests[0].simple_task_instance.run_id
-
-            session.query(TI).delete()
-            session.query(LJ).delete()
-
-    @mock.patch('airflow.dag_processing.manager.DagFileProcessorProcess')
-    def test_handle_failure_callback_with_zombies_are_correctly_passed_to_dag_file_processor(
-        self, mock_processor
-    ):
-        """
-        Check that the same set of failure callback with zombies are passed to the dag
-        file processors until the next zombie detection logic is invoked.
-        """
-        test_dag_path = TEST_DAG_FOLDER / 'test_example_bash_operator.py'
-        with conf_vars({('scheduler', 'parsing_processes'): '1', ('core', 'load_examples'): 'False'}):
-            dagbag = DagBag(test_dag_path, read_dags_from_db=False)
-            with create_session() as session:
-                session.query(LJ).delete()
-                dag = dagbag.get_dag('test_example_bash_operator')
-                dag.sync_to_db()
-
-                dag_run = dag.create_dagrun(
-                    state=DagRunState.RUNNING,
-                    execution_date=DEFAULT_DATE,
-                    run_type=DagRunType.SCHEDULED,
-                    session=session,
-                )
-                task = dag.get_task(task_id='run_this_last')
-
-                ti = TI(task, run_id=dag_run.run_id, state=State.RUNNING)
-                local_job = LJ(ti)
-                local_job.state = State.SHUTDOWN
-                session.add(local_job)
-                session.flush()
-
-                # TODO: If there was an actual Relationship between TI and Job
-                # we wouldn't need this extra commit
-                session.add(ti)
-                ti.job_id = local_job.id
-                session.flush()
-
-                expected_failure_callback_requests = [
-                    TaskCallbackRequest(
-                        full_filepath=dag.fileloc,
-                        simple_task_instance=SimpleTaskInstance(ti),
-                        msg="Message",
-                    )
-                ]
-
-            test_dag_path = TEST_DAG_FOLDER / 'test_example_bash_operator.py'
-
-            child_pipe, parent_pipe = multiprocessing.Pipe()
-            async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
-
-            fake_processors = []
-
-            def fake_processor_(*args, **kwargs):
-                nonlocal fake_processors
-                processor = FakeDagFileProcessorRunner._create_process(*args, **kwargs)
-                fake_processors.append(processor)
-                return processor
-
-            mock_processor.side_effect = fake_processor_
-
-            manager = DagFileProcessorManager(
-                dag_directory=test_dag_path,
-                max_runs=1,
-                processor_timeout=timedelta.max,
-                signal_conn=child_pipe,
-                dag_ids=[],
-                pickle_dags=False,
-                async_mode=async_mode,
-            )
-
-            self.run_processor_manager_one_loop(manager, parent_pipe)
-
-            if async_mode:
-                # Once for initial parse, and then again for the add_callback_to_queue
-                assert len(fake_processors) == 2
-                assert fake_processors[0]._file_path == str(test_dag_path)
-                assert fake_processors[0]._callback_requests == []
-            else:
-                assert len(fake_processors) == 1
-
-            assert fake_processors[-1]._file_path == str(test_dag_path)
-            callback_requests = fake_processors[-1]._callback_requests
-            assert {zombie.simple_task_instance.key for zombie in expected_failure_callback_requests} == {
-                result.simple_task_instance.key for result in callback_requests
-            }
-
-            child_pipe.close()
-            parent_pipe.close()
-
     @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
     @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.kill")
     def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 707f587..845ffda 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -40,16 +40,17 @@ from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import BaseExecutor
 from airflow.jobs.backfill_job import BackfillJob
 from airflow.jobs.base_job import BaseJob
+from airflow.jobs.local_task_job import LocalTaskJob
 from airflow.jobs.scheduler_job import SchedulerJob
 from airflow.models import DAG, DagBag, DagModel, Pool, TaskInstance
 from airflow.models.dagrun import DagRun
 from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import TaskInstanceKey
+from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKey
 from airflow.operators.bash import BashOperator
 from airflow.operators.dummy import DummyOperator
 from airflow.serialization.serialized_objects import SerializedDAG
 from airflow.utils import timezone
-from airflow.utils.callback_requests import DagCallbackRequest
+from airflow.utils.callback_requests import DagCallbackRequest, TaskCallbackRequest
 from airflow.utils.file import list_py_file_paths
 from airflow.utils.session import create_session, provide_session
 from airflow.utils.state import DagRunState, State, TaskInstanceState
@@ -3480,6 +3481,114 @@ class TestSchedulerJob:
         assert ti1.next_method == "__fail__"
         assert ti2.state == State.DEFERRED
 
+    def test_find_zombies_nothing(self):
+        with create_session() as session:
+            self.scheduler_job = SchedulerJob()
+            self.scheduler_job.processor_agent = mock.MagicMock()
+
+            self.scheduler_job._find_zombies(session=session)
+
+            self.scheduler_job.processor_agent.send_callback_to_execute.assert_not_called()
+
+    def test_find_zombies(self):
+        dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
+        with create_session() as session:
+            session.query(LocalTaskJob).delete()
+            dag = dagbag.get_dag('example_branch_operator')
+            dag.sync_to_db()
+            task = dag.get_task(task_id='run_this_first')
+
+            dag_run = dag.create_dagrun(
+                state=DagRunState.RUNNING,
+                execution_date=DEFAULT_DATE,
+                run_type=DagRunType.SCHEDULED,
+                session=session,
+            )
+
+            ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
+            local_job = LocalTaskJob(ti)
+            local_job.state = State.SHUTDOWN
+
+            session.add(local_job)
+            session.flush()
+
+            ti.job_id = local_job.id
+            session.add(ti)
+            session.flush()
+
+            self.scheduler_job = SchedulerJob(subdir=os.devnull)
+            self.scheduler_job.processor_agent = mock.MagicMock()
+
+            self.scheduler_job._find_zombies(session=session)
+
+            self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once()
+            requests = self.scheduler_job.processor_agent.send_callback_to_execute.call_args[0]
+            assert 1 == len(requests)
+            assert requests[0].full_filepath == dag.fileloc
+            assert requests[0].msg == f"Detected {ti} as zombie"
+            assert requests[0].is_failure_callback is True
+            assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
+            assert ti.dag_id == requests[0].simple_task_instance.dag_id
+            assert ti.task_id == requests[0].simple_task_instance.task_id
+            assert ti.run_id == requests[0].simple_task_instance.run_id
+
+            session.query(TaskInstance).delete()
+            session.query(LocalTaskJob).delete()
+
+    def test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_processor(self):
+        """
+        Check that the same set of failure callback with zombies are passed to the dag
+        file processors until the next zombie detection logic is invoked.
+        """
+        with conf_vars({('core', 'load_examples'): 'False'}):
+            dagbag = DagBag(
+                dag_folder=os.path.join(settings.DAGS_FOLDER, "test_example_bash_operator.py"),
+                read_dags_from_db=False,
+            )
+            session = settings.Session()
+            session.query(LocalTaskJob).delete()
+            dag = dagbag.get_dag('test_example_bash_operator')
+            dag.sync_to_db()
+
+            dag_run = dag.create_dagrun(
+                state=DagRunState.RUNNING,
+                execution_date=DEFAULT_DATE,
+                run_type=DagRunType.SCHEDULED,
+                session=session,
+            )
+            task = dag.get_task(task_id='run_this_last')
+
+            ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
+            local_job = LocalTaskJob(ti)
+            local_job.state = State.SHUTDOWN
+            session.add(local_job)
+            session.flush()
+
+            # TODO: If there was an actual Relationship between TI and Job
+            # we wouldn't need this extra commit
+            session.add(ti)
+            ti.job_id = local_job.id
+            session.flush()
+
+            expected_failure_callback_requests = [
+                TaskCallbackRequest(
+                    full_filepath=dag.fileloc,
+                    simple_task_instance=SimpleTaskInstance(ti),
+                    msg="Message",
+                )
+            ]
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.processor_agent = mock.MagicMock()
+
+        self.scheduler_job._find_zombies(session=session)
+
+        self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once()
+        callback_requests = self.scheduler_job.processor_agent.send_callback_to_execute.call_args[0]
+        assert {zombie.simple_task_instance.key for zombie in expected_failure_callback_requests} == {
+            result.simple_task_instance.key for result in callback_requests
+        }
+
 
 @pytest.mark.xfail(reason="Work out where this goes")
 def test_task_with_upstream_skip_process_task_instances():