You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/11/09 12:15:18 UTC
[airflow] branch master updated: Call scheduler "book-keeping"
operations less frequently. (#12139)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 92e405e Call scheduler "book-keeping" operations less frequently. (#12139)
92e405e is described below
commit 92e405e72922cc569a2e41281df9d055c3a7855d
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Mon Nov 9 12:14:24 2020 +0000
Call scheduler "book-keeping" operations less frequently. (#12139)
This change makes it so that certain operations in the scheduler are
called on a regular interval, instead of only once at start up, or every
time around the loop:
- adopt_or_reset_orphaned_tasks (detecting SchedulerJobs that died) was
previously only called on start up.
- _clean_tis_without_dagrun was previously called every time around the
scheduling loop, but this isn't so needed to be done every time as
this is a relatively rare cleanup operation
- _emit_pool_metrics doesn't need to be called _every_ time around the
loop, once every 5 seconds is enough.
This uses the built in ["sched" module][sched] to handle the "timers".
[sched]: https://docs.python.org/3/library/sched.html
---
airflow/config_templates/config.yml | 22 +++++++
airflow/config_templates/default_airflow.cfg | 10 +++
airflow/jobs/scheduler_job.py | 91 ++++++++++++++++++++--------
tests/jobs/test_scheduler_job.py | 15 +++--
4 files changed, 106 insertions(+), 32 deletions(-)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 6555c7b..ad75309 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1572,6 +1572,14 @@
type: string
example: ~
default: "5"
+ - name: clean_tis_without_dagrun
+ description: |
+ How often (in seconds) to check and tidy up 'running' TaskInstancess
+ that no longer have a matching DagRun
+ version_added: 2.0.0
+ type: float
+ example: ~
+ default: "15.0"
- name: scheduler_heartbeat_sec
description: |
The scheduler constantly tries to trigger new tasks (look at the
@@ -1617,6 +1625,13 @@
type: string
example: ~
default: "30"
+ - name: pool_metrics_interval
+ description: |
+ How often (in seconds) should pool usage stats be sent to statsd (if statsd_on is enabled)
+ version_added: 2.0.0
+ type: float
+ example: ~
+ default: "5.0"
- name: scheduler_health_check_threshold
description: |
If the last scheduler heartbeat happened more than scheduler_health_check_threshold
@@ -1626,6 +1641,13 @@
type: string
example: ~
default: "30"
+ - name: orphaned_tasks_check_interval
+ description: |
+ How often (in seconds) should the scheduler check for orphaned tasks and SchedulerJobs
+ version_added: 2.0.0
+ type: float
+ example: ~
+ default: "300.0"
- name: child_process_log_directory
description: ~
version_added: ~
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index c2bbb64..bcd3b89 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -781,6 +781,10 @@ tls_key =
# listen (in seconds).
job_heartbeat_sec = 5
+# How often (in seconds) to check and tidy up 'running' TaskInstancess
+# that no longer have a matching DagRun
+clean_tis_without_dagrun = 15.0
+
# The scheduler constantly tries to trigger new tasks (look at the
# scheduler section in the docs for more information). This defines
# how often the scheduler should run (in seconds).
@@ -802,10 +806,16 @@ dag_dir_list_interval = 300
# How often should stats be printed to the logs. Setting to 0 will disable printing stats
print_stats_interval = 30
+# How often (in seconds) should pool usage stats be sent to statsd (if statsd_on is enabled)
+pool_metrics_interval = 5.0
+
# If the last scheduler heartbeat happened more than scheduler_health_check_threshold
# ago (in seconds), scheduler is considered unhealthy.
# This is used by the health check in the "/health" endpoint
scheduler_health_check_threshold = 30
+
+# How often (in seconds) should the scheduler check for orphaned tasks and SchedulerJobs
+orphaned_tasks_check_interval = 300.0
child_process_log_directory = {AIRFLOW_HOME}/logs/scheduler
# Local task jobs periodically heartbeat to the DB. If the job has
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index b454aba..7d1aa72 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -22,6 +22,7 @@ import itertools
import logging
import multiprocessing
import os
+import sched
import signal
import sys
import threading
@@ -30,7 +31,7 @@ from collections import defaultdict
from contextlib import ExitStack, redirect_stderr, redirect_stdout, suppress
from datetime import timedelta
from multiprocessing.connection import Connection as MultiprocessingConnection
-from typing import Any, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
+from typing import Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
from setproctitle import setproctitle
from sqlalchemy import and_, func, not_, or_
@@ -1272,9 +1273,6 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
self.executor.job_id = self.id
self.executor.start()
- self.log.info("Resetting orphaned tasks for active dag runs")
- self.adopt_or_reset_orphaned_tasks()
-
self.register_signals()
# Start after resetting orphaned tasks to avoid stressing out DB.
@@ -1338,6 +1336,41 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
raise ValueError("Processor agent is not started.")
is_unit_test: bool = conf.getboolean('core', 'unit_test_mode')
+ timers = sched.scheduler()
+
+ def call_regular_interval(
+ delay: float,
+ action: Callable,
+ arguments=(),
+ kwargs={},
+ ): # pylint: disable=dangerous-default-value
+ def repeat(*args, **kwargs):
+ action(*args, **kwargs)
+ # This is not perfect. If we want a timer every 60s, but action
+ # takes 10s to run, this will run it every 70s.
+ # Good enough for now
+ timers.enter(delay, 1, repeat, args, kwargs)
+
+ timers.enter(delay, 1, repeat, arguments, kwargs)
+
+ # Check on start up, then every configured interval
+ self.adopt_or_reset_orphaned_tasks()
+
+ call_regular_interval(
+ conf.getfloat('scheduler', 'orphaned_tasks_check_interval', fallback=300.0),
+ self.adopt_or_reset_orphaned_tasks,
+ )
+
+ call_regular_interval(
+ conf.getfloat('scheduler', 'pool_metrics_interval', fallback=5.0),
+ self._emit_pool_metrics,
+ )
+
+ call_regular_interval(
+ conf.getfloat('scheduler', 'clean_tis_without_dagrun', fallback=15.0),
+ self._clean_tis_without_dagrun,
+ )
+
for loop_count in itertools.count(start=1):
loop_start_time = time.time()
@@ -1360,7 +1393,9 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
# Heartbeat the scheduler periodically
self.heartbeat(only_if_necessary=True)
- self._emit_pool_metrics()
+ # Run any pending timed events
+ next_event = timers.run(blocking=False)
+ self.log.debug("Next timed event is in %f", next_event)
loop_end_time = time.time()
loop_duration = loop_end_time - loop_start_time
@@ -1370,7 +1405,7 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
# If the scheduler is doing things, don't sleep. This means when there is work to do, the
# scheduler will run "as quick as possible", but when it's stopped, it can sleep, dropping CPU
# usage when "idle"
- time.sleep(self._processor_poll_interval)
+ time.sleep(min(self._processor_poll_interval, next_event))
if loop_count >= self.num_runs > 0:
self.log.info(
@@ -1388,6 +1423,29 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
)
break
+ @provide_session
+ def _clean_tis_without_dagrun(self, session):
+ with prohibit_commit(session) as guard:
+ try:
+ self._change_state_for_tis_without_dagrun(
+ old_states=[State.UP_FOR_RETRY], new_state=State.FAILED, session=session
+ )
+
+ self._change_state_for_tis_without_dagrun(
+ old_states=[State.QUEUED, State.SCHEDULED, State.UP_FOR_RESCHEDULE, State.SENSING],
+ new_state=State.NONE,
+ session=session,
+ )
+
+ guard.commit()
+ except OperationalError as e:
+ if is_lock_not_available_error(error=e):
+ self.log.debug("Lock held by another Scheduler")
+ session.rollback()
+ else:
+ raise
+ guard.commit()
+
def _do_scheduling(self, session) -> int:
"""
This function is where the main scheduling decisions take places. It:
@@ -1472,26 +1530,6 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
session.expunge_all()
# END: schedule TIs
- # TODO[HA]: Do we need to do it every time?
- try:
- self._change_state_for_tis_without_dagrun(
- old_states=[State.UP_FOR_RETRY], new_state=State.FAILED, session=session
- )
-
- self._change_state_for_tis_without_dagrun(
- old_states=[State.QUEUED, State.SCHEDULED, State.UP_FOR_RESCHEDULE, State.SENSING],
- new_state=State.NONE,
- session=session,
- )
-
- guard.commit()
- except OperationalError as e:
- if is_lock_not_available_error(error=e):
- self.log.debug("Lock held by another Scheduler")
- session.rollback()
- else:
- raise
-
try:
if self.executor.slots_available <= 0:
# We know we can't do anything here, so don't even try!
@@ -1720,6 +1758,7 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
:return: the number of TIs reset
:rtype: int
"""
+ self.log.info("Resetting orphaned tasks for active dag runs")
timeout = conf.getint('scheduler', 'scheduler_health_check_threshold')
num_failed = (
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 5dd1bec..b4ac720 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1919,18 +1919,21 @@ class TestSchedulerJob(unittest.TestCase):
ti.state = initial_task_state
session.commit()
- # Create scheduler and mock calls to processor. Run duration is set
- # to a high value to ensure loop is entered. Poll interval is 0 to
- # avoid sleep. Done flag is set to true to exist the loop immediately.
- scheduler = SchedulerJob(num_runs=0, processor_poll_interval=0)
+ # This poll interval is large, bug the scheduler doesn't sleep that
+ # long, instead we hit the clean_tis_without_dagrun interval instead
+ scheduler = SchedulerJob(num_runs=2, processor_poll_interval=30)
+ scheduler.dagbag = dagbag
executor = MockExecutor(do_update=False)
executor.queued_tasks
scheduler.executor = executor
processor = mock.MagicMock()
- processor.done = True
+ processor.done = False
scheduler.processor_agent = processor
- scheduler._run_scheduler_loop()
+ with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars(
+ {('scheduler', 'clean_tis_without_dagrun'): '0.001'}
+ ):
+ scheduler._run_scheduler_loop()
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
self.assertEqual(ti.state, expected_task_state)