You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/02/10 09:42:48 UTC
[airflow] branch main updated: Move Zombie detection to SchedulerJob (#21181)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0abee18 Move Zombie detection to SchedulerJob (#21181)
0abee18 is described below
commit 0abee18a7d511e490546009cdf2690dec2de2170
Author: mhenc <mh...@google.com>
AuthorDate: Thu Feb 10 10:41:59 2022 +0100
Move Zombie detection to SchedulerJob (#21181)
---
airflow/config_templates/config.yml | 7 ++
airflow/config_templates/default_airflow.cfg | 3 +
airflow/dag_processing/manager.py | 58 +----------
airflow/jobs/scheduler_job.py | 44 ++++++++
airflow/models/dagbag.py | 1 -
tests/dag_processing/test_manager.py | 149 +--------------------------
tests/jobs/test_scheduler_job.py | 113 +++++++++++++++++++-
7 files changed, 168 insertions(+), 207 deletions(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 8ef738f..83ea6b9 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1850,6 +1850,13 @@
type: string
example: ~
default: "300"
+ - name: zombie_detection_interval
+ description: |
+ How often (in seconds) should the scheduler check for zombie tasks.
+ version_added: 2.3.0
+ type: float
+ example: ~
+ default: "10.0"
- name: catchup_by_default
description: |
Turn off scheduler catchup by setting this to ``False``.
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 520ab44..55161a55 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -923,6 +923,9 @@ child_process_log_directory = {AIRFLOW_HOME}/logs/scheduler
# associated task instance as failed and will re-schedule the task.
scheduler_zombie_task_threshold = 300
+# How often (in seconds) should the scheduler check for zombie tasks.
+zombie_detection_interval = 10.0
+
# Turn off scheduler catchup by setting this to ``False``.
# Default behavior is unchanged and
# Command Line Backfills still work, but the scheduler
diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 3b8a998..33b219c 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -34,7 +34,6 @@ from multiprocessing.connection import Connection as MultiprocessingConnection
from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Union, cast
from setproctitle import setproctitle
-from sqlalchemy import or_
from tabulate import tabulate
import airflow.models
@@ -42,17 +41,15 @@ from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.models import DagModel, errors
from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import SimpleTaskInstance
from airflow.stats import Stats
from airflow.utils import timezone
-from airflow.utils.callback_requests import CallbackRequest, SlaCallbackRequest, TaskCallbackRequest
+from airflow.utils.callback_requests import CallbackRequest, SlaCallbackRequest
from airflow.utils.file import list_py_file_paths, might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.mixins import MultiprocessingStartMethodMixin
from airflow.utils.net import get_hostname
from airflow.utils.process_utils import kill_child_processes_by_pids, reap_process_group
from airflow.utils.session import provide_session
-from airflow.utils.state import State
if TYPE_CHECKING:
import pathlib
@@ -434,8 +431,6 @@ class DagFileProcessorManager(LoggingMixin):
# How often to print out DAG file processing stats to the log. Default to
# 30 seconds.
self.print_stats_interval = conf.getint('scheduler', 'print_stats_interval')
- # How many seconds do we wait for tasks to heartbeat before mark them as zombies.
- self._zombie_threshold_secs = conf.getint('scheduler', 'scheduler_zombie_task_threshold')
# Map from file path to the processor
self._processors: Dict[str, DagFileProcessorProcess] = {}
@@ -445,13 +440,10 @@ class DagFileProcessorManager(LoggingMixin):
# Map from file path to stats about the file
self._file_stats: Dict[str, DagFileStat] = {}
- self._last_zombie_query_time = None
# Last time that the DAG dir was traversed to look for files
self.last_dag_dir_refresh_time = timezone.make_aware(datetime.fromtimestamp(0))
# Last time stats were printed
self.last_stat_print_time = 0
- # TODO: Remove magic number
- self._zombie_query_interval = 10
# How long to wait before timing out a process to parse a DAG file
self._processor_timeout = processor_timeout
@@ -566,7 +558,6 @@ class DagFileProcessorManager(LoggingMixin):
self._processors.pop(processor.file_path)
self._refresh_dag_dir()
- self._find_zombies()
self._kill_timed_out_processors()
@@ -1023,53 +1014,6 @@ class DagFileProcessorManager(LoggingMixin):
self._file_path_queue.extend(files_paths_to_queue)
- @provide_session
- def _find_zombies(self, session):
- """
- Find zombie task instances, which are tasks haven't heartbeated for too long
- and update the current zombie list.
- """
- now = timezone.utcnow()
- if (
- not self._last_zombie_query_time
- or (now - self._last_zombie_query_time).total_seconds() > self._zombie_query_interval
- ):
- # to avoid circular imports
- from airflow.jobs.local_task_job import LocalTaskJob as LJ
-
- self.log.info("Finding 'running' jobs without a recent heartbeat")
- TI = airflow.models.TaskInstance
- DM = airflow.models.DagModel
- limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
-
- zombies = (
- session.query(TI, DM.fileloc)
- .join(LJ, TI.job_id == LJ.id)
- .join(DM, TI.dag_id == DM.dag_id)
- .filter(TI.state == State.RUNNING)
- .filter(
- or_(
- LJ.state != State.RUNNING,
- LJ.latest_heartbeat < limit_dttm,
- )
- )
- .all()
- )
-
- if zombies:
- self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)
-
- self._last_zombie_query_time = timezone.utcnow()
- for ti, file_loc in zombies:
- request = TaskCallbackRequest(
- full_filepath=file_loc,
- simple_task_instance=SimpleTaskInstance(ti),
- msg=f"Detected {ti} as zombie",
- )
- self.log.error("Detected zombie job: %s", request)
- self._add_callback_to_queue(request)
- Stats.incr('zombies_killed')
-
def _kill_timed_out_processors(self):
"""Kill any file processors that timeout to defend against process hangs."""
now = timezone.utcnow()
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 7a6e3ef..6211655 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -38,6 +38,7 @@ from airflow.configuration import conf
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
from airflow.jobs.base_job import BaseJob
+from airflow.jobs.local_task_job import LocalTaskJob
from airflow.models import DAG
from airflow.models.dag import DagModel
from airflow.models.dagbag import DagBag
@@ -123,6 +124,8 @@ class SchedulerJob(BaseJob):
)
scheduler_idle_sleep_time = processor_poll_interval
self._scheduler_idle_sleep_time = scheduler_idle_sleep_time
+ # How many seconds do we wait for tasks to heartbeat before mark them as zombies.
+ self._zombie_threshold_secs = conf.getint('scheduler', 'scheduler_zombie_task_threshold')
self.do_pickle = do_pickle
super().__init__(*args, **kwargs)
@@ -739,6 +742,11 @@ class SchedulerJob(BaseJob):
self._emit_pool_metrics,
)
+ timers.call_regular_interval(
+ conf.getfloat('scheduler', 'zombie_detection_interval', fallback=10.0),
+ self._find_zombies,
+ )
+
for loop_count in itertools.count(start=1):
with Stats.timer() as timer:
@@ -1259,3 +1267,39 @@ class SchedulerJob(BaseJob):
)
if num_timed_out_tasks:
self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)
+
+ @provide_session
+ def _find_zombies(self, session):
+ """
+ Find zombie task instances, which are tasks haven't heartbeated for too long
+ and update the current zombie list.
+ """
+ self.log.debug("Finding 'running' jobs without a recent heartbeat")
+ limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
+
+ zombies = (
+ session.query(TaskInstance, DagModel.fileloc)
+ .join(LocalTaskJob, TaskInstance.job_id == LocalTaskJob.id)
+ .join(DagModel, TaskInstance.dag_id == DagModel.dag_id)
+ .filter(TaskInstance.state == State.RUNNING)
+ .filter(
+ or_(
+ LocalTaskJob.state != State.RUNNING,
+ LocalTaskJob.latest_heartbeat < limit_dttm,
+ )
+ )
+ .all()
+ )
+
+ if zombies:
+ self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)
+
+ for ti, file_loc in zombies:
+ request = TaskCallbackRequest(
+ full_filepath=file_loc,
+ simple_task_instance=SimpleTaskInstance(ti),
+ msg=f"Detected {ti} as zombie",
+ )
+ self.log.error("Detected zombie job: %s", request)
+ self.processor_agent.send_callback_to_execute(request)
+ Stats.incr('zombies_killed')
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index eac62dc..0136d7f 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -89,7 +89,6 @@ class DagBag(LoggingMixin):
"""
DAGBAG_IMPORT_TIMEOUT = conf.getfloat('core', 'DAGBAG_IMPORT_TIMEOUT')
- SCHEDULER_ZOMBIE_TASK_THRESHOLD = conf.getint('scheduler', 'scheduler_zombie_task_threshold')
def __init__(
self,
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index 5ea21a2..2746e59 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -45,17 +45,13 @@ from airflow.dag_processing.manager import (
DagParsingStat,
)
from airflow.dag_processing.processor import DagFileProcessorProcess
-from airflow.jobs.local_task_job import LocalTaskJob as LJ
-from airflow.models import DagBag, DagModel, TaskInstance as TI, errors
+from airflow.models import DagBag, DagModel, errors
from airflow.models.dagcode import DagCode
from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import SimpleTaskInstance
from airflow.utils import timezone
-from airflow.utils.callback_requests import CallbackRequest, TaskCallbackRequest
+from airflow.utils.callback_requests import CallbackRequest
from airflow.utils.net import get_hostname
from airflow.utils.session import create_session
-from airflow.utils.state import DagRunState, State
-from airflow.utils.types import DagRunType
from tests.core.test_logging_config import SETTINGS_FILE_VALID, settings_context
from tests.models import TEST_DAGS_FOLDER
from tests.test_utils.config import conf_vars
@@ -455,147 +451,6 @@ class TestDagFileProcessorManager:
> (freezed_base_time - manager.get_last_finish_time("file_1.py")).total_seconds()
)
- def test_find_zombies(self):
- manager = DagFileProcessorManager(
- dag_directory='directory',
- max_runs=1,
- processor_timeout=timedelta.max,
- signal_conn=MagicMock(),
- dag_ids=[],
- pickle_dags=False,
- async_mode=True,
- )
-
- dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
- with create_session() as session:
- session.query(LJ).delete()
- dag = dagbag.get_dag('example_branch_operator')
- dag.sync_to_db()
- task = dag.get_task(task_id='run_this_first')
-
- dag_run = dag.create_dagrun(
- state=DagRunState.RUNNING,
- execution_date=DEFAULT_DATE,
- run_type=DagRunType.SCHEDULED,
- session=session,
- )
-
- ti = TI(task, run_id=dag_run.run_id, state=State.RUNNING)
- local_job = LJ(ti)
- local_job.state = State.SHUTDOWN
-
- session.add(local_job)
- session.flush()
-
- ti.job_id = local_job.id
- session.add(ti)
- session.flush()
-
- manager._last_zombie_query_time = timezone.utcnow() - timedelta(
- seconds=manager._zombie_threshold_secs + 1
- )
- manager._find_zombies()
- requests = manager._callback_to_execute[dag.fileloc]
- assert 1 == len(requests)
- assert requests[0].full_filepath == dag.fileloc
- assert requests[0].msg == f"Detected {ti} as zombie"
- assert requests[0].is_failure_callback is True
- assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
- assert ti.dag_id == requests[0].simple_task_instance.dag_id
- assert ti.task_id == requests[0].simple_task_instance.task_id
- assert ti.run_id == requests[0].simple_task_instance.run_id
-
- session.query(TI).delete()
- session.query(LJ).delete()
-
- @mock.patch('airflow.dag_processing.manager.DagFileProcessorProcess')
- def test_handle_failure_callback_with_zombies_are_correctly_passed_to_dag_file_processor(
- self, mock_processor
- ):
- """
- Check that the same set of failure callback with zombies are passed to the dag
- file processors until the next zombie detection logic is invoked.
- """
- test_dag_path = TEST_DAG_FOLDER / 'test_example_bash_operator.py'
- with conf_vars({('scheduler', 'parsing_processes'): '1', ('core', 'load_examples'): 'False'}):
- dagbag = DagBag(test_dag_path, read_dags_from_db=False)
- with create_session() as session:
- session.query(LJ).delete()
- dag = dagbag.get_dag('test_example_bash_operator')
- dag.sync_to_db()
-
- dag_run = dag.create_dagrun(
- state=DagRunState.RUNNING,
- execution_date=DEFAULT_DATE,
- run_type=DagRunType.SCHEDULED,
- session=session,
- )
- task = dag.get_task(task_id='run_this_last')
-
- ti = TI(task, run_id=dag_run.run_id, state=State.RUNNING)
- local_job = LJ(ti)
- local_job.state = State.SHUTDOWN
- session.add(local_job)
- session.flush()
-
- # TODO: If there was an actual Relationship between TI and Job
- # we wouldn't need this extra commit
- session.add(ti)
- ti.job_id = local_job.id
- session.flush()
-
- expected_failure_callback_requests = [
- TaskCallbackRequest(
- full_filepath=dag.fileloc,
- simple_task_instance=SimpleTaskInstance(ti),
- msg="Message",
- )
- ]
-
- test_dag_path = TEST_DAG_FOLDER / 'test_example_bash_operator.py'
-
- child_pipe, parent_pipe = multiprocessing.Pipe()
- async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
-
- fake_processors = []
-
- def fake_processor_(*args, **kwargs):
- nonlocal fake_processors
- processor = FakeDagFileProcessorRunner._create_process(*args, **kwargs)
- fake_processors.append(processor)
- return processor
-
- mock_processor.side_effect = fake_processor_
-
- manager = DagFileProcessorManager(
- dag_directory=test_dag_path,
- max_runs=1,
- processor_timeout=timedelta.max,
- signal_conn=child_pipe,
- dag_ids=[],
- pickle_dags=False,
- async_mode=async_mode,
- )
-
- self.run_processor_manager_one_loop(manager, parent_pipe)
-
- if async_mode:
- # Once for initial parse, and then again for the add_callback_to_queue
- assert len(fake_processors) == 2
- assert fake_processors[0]._file_path == str(test_dag_path)
- assert fake_processors[0]._callback_requests == []
- else:
- assert len(fake_processors) == 1
-
- assert fake_processors[-1]._file_path == str(test_dag_path)
- callback_requests = fake_processors[-1]._callback_requests
- assert {zombie.simple_task_instance.key for zombie in expected_failure_callback_requests} == {
- result.simple_task_instance.key for result in callback_requests
- }
-
- child_pipe.close()
- parent_pipe.close()
-
@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.kill")
def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 707f587..845ffda 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -40,16 +40,17 @@ from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow.jobs.backfill_job import BackfillJob
from airflow.jobs.base_job import BaseJob
+from airflow.jobs.local_task_job import LocalTaskJob
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.models import DAG, DagBag, DagModel, Pool, TaskInstance
from airflow.models.dagrun import DagRun
from airflow.models.serialized_dag import SerializedDagModel
-from airflow.models.taskinstance import TaskInstanceKey
+from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKey
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils import timezone
-from airflow.utils.callback_requests import DagCallbackRequest
+from airflow.utils.callback_requests import DagCallbackRequest, TaskCallbackRequest
from airflow.utils.file import list_py_file_paths
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
@@ -3480,6 +3481,114 @@ class TestSchedulerJob:
assert ti1.next_method == "__fail__"
assert ti2.state == State.DEFERRED
+ def test_find_zombies_nothing(self):
+ with create_session() as session:
+ self.scheduler_job = SchedulerJob()
+ self.scheduler_job.processor_agent = mock.MagicMock()
+
+ self.scheduler_job._find_zombies(session=session)
+
+ self.scheduler_job.processor_agent.send_callback_to_execute.assert_not_called()
+
+ def test_find_zombies(self):
+ dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
+ with create_session() as session:
+ session.query(LocalTaskJob).delete()
+ dag = dagbag.get_dag('example_branch_operator')
+ dag.sync_to_db()
+ task = dag.get_task(task_id='run_this_first')
+
+ dag_run = dag.create_dagrun(
+ state=DagRunState.RUNNING,
+ execution_date=DEFAULT_DATE,
+ run_type=DagRunType.SCHEDULED,
+ session=session,
+ )
+
+ ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
+ local_job = LocalTaskJob(ti)
+ local_job.state = State.SHUTDOWN
+
+ session.add(local_job)
+ session.flush()
+
+ ti.job_id = local_job.id
+ session.add(ti)
+ session.flush()
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.processor_agent = mock.MagicMock()
+
+ self.scheduler_job._find_zombies(session=session)
+
+ self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once()
+ requests = self.scheduler_job.processor_agent.send_callback_to_execute.call_args[0]
+ assert 1 == len(requests)
+ assert requests[0].full_filepath == dag.fileloc
+ assert requests[0].msg == f"Detected {ti} as zombie"
+ assert requests[0].is_failure_callback is True
+ assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
+ assert ti.dag_id == requests[0].simple_task_instance.dag_id
+ assert ti.task_id == requests[0].simple_task_instance.task_id
+ assert ti.run_id == requests[0].simple_task_instance.run_id
+
+ session.query(TaskInstance).delete()
+ session.query(LocalTaskJob).delete()
+
+ def test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_processor(self):
+ """
+ Check that the same set of failure callback with zombies are passed to the dag
+ file processors until the next zombie detection logic is invoked.
+ """
+ with conf_vars({('core', 'load_examples'): 'False'}):
+ dagbag = DagBag(
+ dag_folder=os.path.join(settings.DAGS_FOLDER, "test_example_bash_operator.py"),
+ read_dags_from_db=False,
+ )
+ session = settings.Session()
+ session.query(LocalTaskJob).delete()
+ dag = dagbag.get_dag('test_example_bash_operator')
+ dag.sync_to_db()
+
+ dag_run = dag.create_dagrun(
+ state=DagRunState.RUNNING,
+ execution_date=DEFAULT_DATE,
+ run_type=DagRunType.SCHEDULED,
+ session=session,
+ )
+ task = dag.get_task(task_id='run_this_last')
+
+ ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
+ local_job = LocalTaskJob(ti)
+ local_job.state = State.SHUTDOWN
+ session.add(local_job)
+ session.flush()
+
+ # TODO: If there was an actual Relationship between TI and Job
+ # we wouldn't need this extra commit
+ session.add(ti)
+ ti.job_id = local_job.id
+ session.flush()
+
+ expected_failure_callback_requests = [
+ TaskCallbackRequest(
+ full_filepath=dag.fileloc,
+ simple_task_instance=SimpleTaskInstance(ti),
+ msg="Message",
+ )
+ ]
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.processor_agent = mock.MagicMock()
+
+ self.scheduler_job._find_zombies(session=session)
+
+ self.scheduler_job.processor_agent.send_callback_to_execute.assert_called_once()
+ callback_requests = self.scheduler_job.processor_agent.send_callback_to_execute.call_args[0]
+ assert {zombie.simple_task_instance.key for zombie in expected_failure_callback_requests} == {
+ result.simple_task_instance.key for result in callback_requests
+ }
+
@pytest.mark.xfail(reason="Work out where this goes")
def test_task_with_upstream_skip_process_task_instances():