You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2020/11/09 12:15:18 UTC

[airflow] branch master updated: Call scheduler "book-keeping" operations less frequently. (#12139)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 92e405e  Call scheduler "book-keeping" operations less frequently. (#12139)
92e405e is described below

commit 92e405e72922cc569a2e41281df9d055c3a7855d
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Mon Nov 9 12:14:24 2020 +0000

    Call scheduler "book-keeping" operations less frequently. (#12139)
    
    This change makes it so that certain operations in the scheduler are
    called on a regular interval, instead of only once at start up, or every
    time around the loop:
    
    - adopt_or_reset_orphaned_tasks (detecting SchedulerJobs that died) was
      previously only called on start up.
    - _clean_tis_without_dagrun was previously called every time around the
      scheduling loop, but this isn't so needed to be done every time as
      this is a relatively rare cleanup operation
    - _emit_pool_metrics doesn't need to be called _every_ time around the
      loop, once every 5 seconds is enough.
    
    This uses the built in ["sched" module][sched] to handle the "timers".
    
    [sched]: https://docs.python.org/3/library/sched.html
---
 airflow/config_templates/config.yml          | 22 +++++++
 airflow/config_templates/default_airflow.cfg | 10 +++
 airflow/jobs/scheduler_job.py                | 91 ++++++++++++++++++++--------
 tests/jobs/test_scheduler_job.py             | 15 +++--
 4 files changed, 106 insertions(+), 32 deletions(-)

diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 6555c7b..ad75309 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1572,6 +1572,14 @@
       type: string
       example: ~
       default: "5"
+    - name: clean_tis_without_dagrun
+      description: |
+        How often (in seconds) to check and tidy up 'running' TaskInstancess
+        that no longer have a matching DagRun
+      version_added: 2.0.0
+      type: float
+      example: ~
+      default: "15.0"
     - name: scheduler_heartbeat_sec
       description: |
         The scheduler constantly tries to trigger new tasks (look at the
@@ -1617,6 +1625,13 @@
       type: string
       example: ~
       default: "30"
+    - name: pool_metrics_interval
+      description: |
+        How often (in seconds) should pool usage stats be sent to statsd (if statsd_on is enabled)
+      version_added: 2.0.0
+      type: float
+      example: ~
+      default: "5.0"
     - name: scheduler_health_check_threshold
       description: |
         If the last scheduler heartbeat happened more than scheduler_health_check_threshold
@@ -1626,6 +1641,13 @@
       type: string
       example: ~
       default: "30"
+    - name: orphaned_tasks_check_interval
+      description: |
+        How often (in seconds) should the scheduler check for orphaned tasks and SchedulerJobs
+      version_added: 2.0.0
+      type: float
+      example: ~
+      default: "300.0"
     - name: child_process_log_directory
       description: ~
       version_added: ~
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index c2bbb64..bcd3b89 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -781,6 +781,10 @@ tls_key =
 # listen (in seconds).
 job_heartbeat_sec = 5
 
+# How often (in seconds) to check and tidy up 'running' TaskInstancess
+# that no longer have a matching DagRun
+clean_tis_without_dagrun = 15.0
+
 # The scheduler constantly tries to trigger new tasks (look at the
 # scheduler section in the docs for more information). This defines
 # how often the scheduler should run (in seconds).
@@ -802,10 +806,16 @@ dag_dir_list_interval = 300
 # How often should stats be printed to the logs. Setting to 0 will disable printing stats
 print_stats_interval = 30
 
+# How often (in seconds) should pool usage stats be sent to statsd (if statsd_on is enabled)
+pool_metrics_interval = 5.0
+
 # If the last scheduler heartbeat happened more than scheduler_health_check_threshold
 # ago (in seconds), scheduler is considered unhealthy.
 # This is used by the health check in the "/health" endpoint
 scheduler_health_check_threshold = 30
+
+# How often (in seconds) should the scheduler check for orphaned tasks and SchedulerJobs
+orphaned_tasks_check_interval = 300.0
 child_process_log_directory = {AIRFLOW_HOME}/logs/scheduler
 
 # Local task jobs periodically heartbeat to the DB. If the job has
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index b454aba..7d1aa72 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -22,6 +22,7 @@ import itertools
 import logging
 import multiprocessing
 import os
+import sched
 import signal
 import sys
 import threading
@@ -30,7 +31,7 @@ from collections import defaultdict
 from contextlib import ExitStack, redirect_stderr, redirect_stdout, suppress
 from datetime import timedelta
 from multiprocessing.connection import Connection as MultiprocessingConnection
-from typing import Any, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
+from typing import Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
 
 from setproctitle import setproctitle
 from sqlalchemy import and_, func, not_, or_
@@ -1272,9 +1273,6 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
             self.executor.job_id = self.id
             self.executor.start()
 
-            self.log.info("Resetting orphaned tasks for active dag runs")
-            self.adopt_or_reset_orphaned_tasks()
-
             self.register_signals()
 
             # Start after resetting orphaned tasks to avoid stressing out DB.
@@ -1338,6 +1336,41 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
             raise ValueError("Processor agent is not started.")
         is_unit_test: bool = conf.getboolean('core', 'unit_test_mode')
 
+        timers = sched.scheduler()
+
+        def call_regular_interval(
+            delay: float,
+            action: Callable,
+            arguments=(),
+            kwargs={},
+        ):  # pylint: disable=dangerous-default-value
+            def repeat(*args, **kwargs):
+                action(*args, **kwargs)
+                # This is not perfect. If we want a timer every 60s, but action
+                # takes 10s to run, this will run it every 70s.
+                # Good enough for now
+                timers.enter(delay, 1, repeat, args, kwargs)
+
+            timers.enter(delay, 1, repeat, arguments, kwargs)
+
+        # Check on start up, then every configured interval
+        self.adopt_or_reset_orphaned_tasks()
+
+        call_regular_interval(
+            conf.getfloat('scheduler', 'orphaned_tasks_check_interval', fallback=300.0),
+            self.adopt_or_reset_orphaned_tasks,
+        )
+
+        call_regular_interval(
+            conf.getfloat('scheduler', 'pool_metrics_interval', fallback=5.0),
+            self._emit_pool_metrics,
+        )
+
+        call_regular_interval(
+            conf.getfloat('scheduler', 'clean_tis_without_dagrun', fallback=15.0),
+            self._clean_tis_without_dagrun,
+        )
+
         for loop_count in itertools.count(start=1):
             loop_start_time = time.time()
 
@@ -1360,7 +1393,9 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
             # Heartbeat the scheduler periodically
             self.heartbeat(only_if_necessary=True)
 
-            self._emit_pool_metrics()
+            # Run any pending timed events
+            next_event = timers.run(blocking=False)
+            self.log.debug("Next timed event is in %f", next_event)
 
             loop_end_time = time.time()
             loop_duration = loop_end_time - loop_start_time
@@ -1370,7 +1405,7 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
                 # If the scheduler is doing things, don't sleep. This means when there is work to do, the
                 # scheduler will run "as quick as possible", but when it's stopped, it can sleep, dropping CPU
                 # usage when "idle"
-                time.sleep(self._processor_poll_interval)
+                time.sleep(min(self._processor_poll_interval, next_event))
 
             if loop_count >= self.num_runs > 0:
                 self.log.info(
@@ -1388,6 +1423,29 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
                 )
                 break
 
+    @provide_session
+    def _clean_tis_without_dagrun(self, session):
+        with prohibit_commit(session) as guard:
+            try:
+                self._change_state_for_tis_without_dagrun(
+                    old_states=[State.UP_FOR_RETRY], new_state=State.FAILED, session=session
+                )
+
+                self._change_state_for_tis_without_dagrun(
+                    old_states=[State.QUEUED, State.SCHEDULED, State.UP_FOR_RESCHEDULE, State.SENSING],
+                    new_state=State.NONE,
+                    session=session,
+                )
+
+                guard.commit()
+            except OperationalError as e:
+                if is_lock_not_available_error(error=e):
+                    self.log.debug("Lock held by another Scheduler")
+                    session.rollback()
+                else:
+                    raise
+            guard.commit()
+
     def _do_scheduling(self, session) -> int:
         """
         This function is where the main scheduling decisions take places. It:
@@ -1472,26 +1530,6 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
             session.expunge_all()
             # END: schedule TIs
 
-            # TODO[HA]: Do we need to do it every time?
-            try:
-                self._change_state_for_tis_without_dagrun(
-                    old_states=[State.UP_FOR_RETRY], new_state=State.FAILED, session=session
-                )
-
-                self._change_state_for_tis_without_dagrun(
-                    old_states=[State.QUEUED, State.SCHEDULED, State.UP_FOR_RESCHEDULE, State.SENSING],
-                    new_state=State.NONE,
-                    session=session,
-                )
-
-                guard.commit()
-            except OperationalError as e:
-                if is_lock_not_available_error(error=e):
-                    self.log.debug("Lock held by another Scheduler")
-                    session.rollback()
-                else:
-                    raise
-
             try:
                 if self.executor.slots_available <= 0:
                     # We know we can't do anything here, so don't even try!
@@ -1720,6 +1758,7 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
         :return: the number of TIs reset
         :rtype: int
         """
+        self.log.info("Resetting orphaned tasks for active dag runs")
         timeout = conf.getint('scheduler', 'scheduler_health_check_threshold')
 
         num_failed = (
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 5dd1bec..b4ac720 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1919,18 +1919,21 @@ class TestSchedulerJob(unittest.TestCase):
         ti.state = initial_task_state
         session.commit()
 
-        # Create scheduler and mock calls to processor. Run duration is set
-        # to a high value to ensure loop is entered. Poll interval is 0 to
-        # avoid sleep. Done flag is set to true to exist the loop immediately.
-        scheduler = SchedulerJob(num_runs=0, processor_poll_interval=0)
+        # This poll interval is large, bug the scheduler doesn't sleep that
+        # long, instead we hit the clean_tis_without_dagrun interval instead
+        scheduler = SchedulerJob(num_runs=2, processor_poll_interval=30)
+        scheduler.dagbag = dagbag
         executor = MockExecutor(do_update=False)
         executor.queued_tasks
         scheduler.executor = executor
         processor = mock.MagicMock()
-        processor.done = True
+        processor.done = False
         scheduler.processor_agent = processor
 
-        scheduler._run_scheduler_loop()
+        with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), conf_vars(
+            {('scheduler', 'clean_tis_without_dagrun'): '0.001'}
+        ):
+            scheduler._run_scheduler_loop()
 
         ti = dr.get_task_instance(task_id=op1.task_id, session=session)
         self.assertEqual(ti.state, expected_task_state)