You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by se...@apache.org on 2017/03/21 10:16:59 UTC

aurora git commit: Make Thermos observer resource collection intervals configurable

Repository: aurora
Updated Branches:
  refs/heads/master b8f72d146 -> 33acb899b


Make Thermos observer resource collection intervals configurable

We have noticed that on hosts with lots of active tasks (~100) the observer UI
is not usable. Thermos fully utilizes one core but does not render any requests.

Dumping `/threads` indicates the observer might be backlogged by the hundred
concurrent `TaskResourceMonitor` threads. Due to the Python GIL only one can
make progress at a time though.

This patch is now adding options to control the resource collection interval,
giving operators a possibility to reduce the CPU pressure.

Testing Done:
./pants test.pytest src/{test,main}/python:: -- -v

Bugs closed: AURORA-1907

Reviewed at https://reviews.apache.org/r/57757/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/33acb899
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/33acb899
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/33acb899

Branch: refs/heads/master
Commit: 33acb899b8cbfd9914f028524cdd9428beeb06e3
Parents: b8f72d1
Author: Stephan Erb <se...@apache.org>
Authored: Tue Mar 21 09:29:41 2017 +0100
Committer: Stephan Erb <se...@apache.org>
Committed: Tue Mar 21 09:29:41 2017 +0100

----------------------------------------------------------------------
 RELEASE-NOTES.md                                | 12 +--
 docs/README.md                                  |  1 +
 docs/reference/observer-configuration.md        | 89 ++++++++++++++++++++
 .../apache/aurora/tools/thermos_observer.py     | 24 +++++-
 .../apache/thermos/monitoring/resource.py       | 25 ++++--
 .../apache/thermos/observer/task_observer.py    | 21 +++--
 .../tools/test_thermos_observer_entry_point.py  | 40 ---------
 7 files changed, 149 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index d58d2bd..5babea5 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -1,12 +1,12 @@
-0.18.0
-=====
+0.18.0 (unreleased)
+===================
 
 ### New/updated:
 
 - Add message parameter to `killTasks` RPC.
-- Add prune_tasks endpoint to aurora_admin. See aurora_admin prune_tasks -h for usage information.
+- Add `prune_tasks` endpoint to `aurora_admin`. See `aurora_admin prune_tasks -h` for usage information.
 - Add support for per-task volume mounts for Mesos containers to the Aurora config DSL.
-* Added the `-mesos_driver` flag to the scheduler with three possible options:
+- Added the `-mesos_driver` flag to the scheduler with three possible options:
   `SCHEDULER_DRIVER`, `V0_MESOS`, `V1_MESOS`. The first uses the original driver
   and the latter two use two new drivers from `libmesos`. `V0_MESOS` uses the
   `SCHEDULER_DRIVER` under the hood and `V1_MESOS` uses a new HTTP API aware
@@ -14,7 +14,9 @@
   Performance sensitive users should stick with the `SCHEDULER_DRIVER` or
   `V0_MESOS` drivers.
 - Add support for new MesosContainerizer rolled out in Mesos 1.2.0.
-* Please upgrade Aurora to 0.18 before upgrading Mesos to 1.2.0.
+  Please upgrade Aurora to 0.18 before upgrading Mesos to 1.2.0.
+- Add observer command line options to control the resource collection interval
+  for observed tasks. See [here](docs/reference/observer-configuration.md) for details.
 
 0.17.0
 ======

http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/docs/README.md
----------------------------------------------------------------------
diff --git a/docs/README.md b/docs/README.md
index 1d679e2..dfd3a23 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -54,6 +54,7 @@ The complete reference of commands, configuration options, and scheduler interna
     - [Client Hooks](reference/client-hooks.md)
     - [Client Cluster Configuration](reference/client-cluster-configuration.md)
  * [Scheduler Configuration](reference/scheduler-configuration.md)
+ * [Observer Configuration](reference/observer-configuration.md)
 
 ## Additional Resources
  * [Tools integrating with Aurora](additional-resources/tools.md)

http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/docs/reference/observer-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/observer-configuration.md b/docs/reference/observer-configuration.md
new file mode 100644
index 0000000..8a443c9
--- /dev/null
+++ b/docs/reference/observer-configuration.md
@@ -0,0 +1,89 @@
+# Observer Configuration Reference
+
+The Aurora/Thermos observer can take a variety of configuration options through command-line arguments.
+A list of the available options can be seen by running `thermos_observer --long-help`.
+
+Please refer to the [Operator Configuration Guide](../operations/configuration.md) for details on how
+to properly set the most important options.
+
+```
+$ thermos_observer.pex --long-help
+Options:
+  -h, --help, --short-help
+                        show this help message and exit.
+  --long-help           show options from all registered modules, not just the
+                        __main__ module.
+  --mesos-root=MESOS_ROOT
+                        The mesos root directory to search for Thermos
+                        executor sandboxes [default: /var/lib/mesos]
+  --ip=IP               The IP address the observer will bind to. [default:
+                        0.0.0.0]
+  --port=PORT           The port on which the observer should listen.
+                        [default: 1338]
+  --polling_interval_secs=POLLING_INTERVAL_SECS
+                        The number of seconds between observer refresh
+                        attempts. [default: 5]
+  --task_process_collection_interval_secs=TASK_PROCESS_COLLECTION_INTERVAL_SECS
+                        The number of seconds between per task process
+                        resource collections. [default: 20]
+  --task_disk_collection_interval_secs=TASK_DISK_COLLECTION_INTERVAL_SECS
+                        The number of seconds between per task disk resource
+                        collections. [default: 60]
+
+  From module twitter.common.app:
+    --app_daemonize     Daemonize this application. [default: False]
+    --app_profile_output=FILENAME
+                        Dump the profiling output to a binary profiling
+                        format. [default: None]
+    --app_daemon_stderr=TWITTER_COMMON_APP_DAEMON_STDERR
+                        Direct this app's stderr to this file if daemonized.
+                        [default: /dev/null]
+    --app_debug         Print extra debugging information during application
+                        initialization. [default: False]
+    --app_rc_filename   Print the filename for the rc file and quit. [default:
+                        False]
+    --app_daemon_stdout=TWITTER_COMMON_APP_DAEMON_STDOUT
+                        Direct this app's stdout to this file if daemonized.
+                        [default: /dev/null]
+    --app_profiling     Run profiler on the code while it runs.  Note this can
+                        cause slowdowns. [default: False]
+    --app_ignore_rc_file
+                        Ignore default arguments from the rc file. [default:
+                        False]
+    --app_pidfile=TWITTER_COMMON_APP_PIDFILE
+                        The pidfile to use if --app_daemonize is specified.
+                        [default: None]
+
+  From module twitter.common.log.options:
+    --log_to_stdout=[scheme:]LEVEL
+                        OBSOLETE - legacy flag, use --log_to_stderr instead.
+                        [default: ERROR]
+    --log_to_stderr=[scheme:]LEVEL
+                        The level at which logging to stderr [default: ERROR].
+                        Takes either LEVEL or scheme:LEVEL, where LEVEL is one
+                        of ['INFO', 'NONE', 'WARN', 'ERROR', 'DEBUG', 'FATAL']
+                        and scheme is one of ['google', 'plain'].
+    --log_to_disk=[scheme:]LEVEL
+                        The level at which logging to disk [default: INFO].
+                        Takes either LEVEL or scheme:LEVEL, where LEVEL is one
+                        of ['INFO', 'NONE', 'WARN', 'ERROR', 'DEBUG', 'FATAL']
+                        and scheme is one of ['google', 'plain'].
+    --log_dir=DIR       The directory into which log files will be generated
+                        [default: /var/tmp].
+    --log_simple        Write a single log file rather than one log file per
+                        log level [default: False].
+    --log_to_scribe=[scheme:]LEVEL
+                        The level at which logging to scribe [default: NONE].
+                        Takes either LEVEL or scheme:LEVEL, where LEVEL is one
+                        of ['INFO', 'NONE', 'WARN', 'ERROR', 'DEBUG', 'FATAL']
+                        and scheme is one of ['google', 'plain'].
+    --scribe_category=CATEGORY
+                        The category used when logging to the scribe daemon.
+                        [default: python_default].
+    --scribe_buffer     Buffer messages when scribe is unavailable rather than
+                        dropping them. [default: False].
+    --scribe_host=HOST  The host running the scribe daemon. [default:
+                        localhost].
+    --scribe_port=PORT  The port used to connect to the scribe daemon.
+                        [default: 1463].
+```

http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/src/main/python/apache/aurora/tools/thermos_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/tools/thermos_observer.py b/src/main/python/apache/aurora/tools/thermos_observer.py
index 4bba019..0318f99 100644
--- a/src/main/python/apache/aurora/tools/thermos_observer.py
+++ b/src/main/python/apache/aurora/tools/thermos_observer.py
@@ -25,6 +25,7 @@ from twitter.common.log.options import LogOptions
 from twitter.common.quantity import Amount, Time
 
 from apache.aurora.executor.common.path_detector import MesosPathDetector
+from apache.thermos.monitoring.resource import TaskResourceMonitor
 from apache.thermos.observer.http.configure import configure_server
 from apache.thermos.observer.task_observer import TaskObserver
 
@@ -60,6 +61,22 @@ app.add_option(
       help='The number of seconds between observer refresh attempts.')
 
 
+app.add_option(
+    '--task_process_collection_interval_secs',
+      dest='task_process_collection_interval_secs',
+      type='int',
+      default=int(TaskResourceMonitor.PROCESS_COLLECTION_INTERVAL.as_(Time.SECONDS)),
+      help='The number of seconds between per task process resource collections.')
+
+
+app.add_option(
+    '--task_disk_collection_interval_secs',
+      dest='task_disk_collection_interval_secs',
+      type='int',
+      default=int(TaskResourceMonitor.DISK_COLLECTION_INTERVAL.as_(Time.SECONDS)),
+      help='The number of seconds between per task disk resource collections.')
+
+
 # Allow an interruptible sleep so that ^C works.
 def sleep_forever():
   while True:
@@ -68,8 +85,11 @@ def sleep_forever():
 
 def initialize(options):
   path_detector = MesosPathDetector(options.mesos_root)
-  polling_interval = Amount(options.polling_interval_secs, Time.SECONDS)
-  return TaskObserver(path_detector, interval=polling_interval)
+  return TaskObserver(
+      path_detector,
+      Amount(options.polling_interval_secs, Time.SECONDS),
+      Amount(options.task_process_collection_interval_secs, Time.SECONDS),
+      Amount(options.task_disk_collection_interval_secs, Time.SECONDS))
 
 
 def handle_error(exc_type, value, traceback):

http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/src/main/python/apache/thermos/monitoring/resource.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/resource.py b/src/main/python/apache/thermos/monitoring/resource.py
index 53d0ff1..4346666 100644
--- a/src/main/python/apache/thermos/monitoring/resource.py
+++ b/src/main/python/apache/thermos/monitoring/resource.py
@@ -118,15 +118,17 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
   """
 
   MAX_HISTORY = 10000  # magic number
+  PROCESS_COLLECTION_INTERVAL = Amount(20, Time.SECONDS)
+  DISK_COLLECTION_INTERVAL = Amount(60, Time.SECONDS)
+  HISTORY_TIME = Amount(1, Time.HOURS)
 
   def __init__(self,
                task_id,
                task_monitor,
-               process_collector=ProcessTreeCollector,
                disk_collector=DiskCollector,
-               process_collection_interval=Amount(20, Time.SECONDS),
-               disk_collection_interval=Amount(1, Time.MINUTES),
-               history_time=Amount(1, Time.HOURS)):
+               process_collection_interval=PROCESS_COLLECTION_INTERVAL,
+               disk_collection_interval=DISK_COLLECTION_INTERVAL,
+               history_time=HISTORY_TIME):
     """
       task_monitor: TaskMonitor object specifying the task whose resources should be monitored
       sandbox: Directory for which to monitor disk utilisation
@@ -135,7 +137,6 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
     self._task_id = task_id
     log.debug('Initialising resource collection for task %s' % self._task_id)
     self._process_collectors = dict()  # ProcessStatus => ProcessTreeCollector
-    self._process_collector_factory = process_collector
     self._disk_collector_class = disk_collector
     self._disk_collector = None
     self._process_collection_interval = process_collection_interval.as_(Time.SECONDS)
@@ -167,7 +168,7 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
     else:
       # Since this might be called out of band (before the main loop is aware of the process)
       if process not in self._process_collectors:
-        self._process_collectors[process] = self._process_collector_factory(process.pid)
+        self._process_collectors[process] = ProcessTreeCollector(process.pid)
 
       self._process_collectors[process].sample()
       return self._process_collectors[process].value
@@ -189,7 +190,6 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
     next_disk_collection = 0
 
     while not self._kill_signal.is_set():
-
       now = time.time()
 
       if now > next_process_collection:
@@ -199,7 +199,7 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
         for process in current - actives:
           self._process_collectors.pop(process)
         for process in actives - current:
-          self._process_collectors[process] = self._process_collector_factory(process.pid)
+          self._process_collectors[process] = ProcessTreeCollector(process.pid)
         for process, collector in self._process_collectors.items():
           collector.sample()
 
@@ -223,6 +223,9 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
       except ValueError as err:
         log.warning("Error recording resource sample: %s" % err)
 
+      log.debug("TaskResourceMonitor: finished collection of %s in %.2fs" % (
+          self._task_id, (time.time() - now)))
+
       # Sleep until any of the following conditions are met:
       # - it's time for the next disk collection
       # - it's time for the next process collection
@@ -236,6 +239,10 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
       else:
         waiter = self._kill_signal
 
-      waiter.wait(timeout=max(0, next_collection))
+      if next_collection > 0:
+        waiter.wait(timeout=next_collection)
+      else:
+        log.warning('Task resource collection is backlogged. Consider increasing '
+                    'process_collection_interval and disk_collection_interval.')
 
     log.debug('Stopping resource monitoring for task "%s"' % self._task_id)

http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/src/main/python/apache/thermos/observer/task_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/task_observer.py b/src/main/python/apache/thermos/observer/task_observer.py
index 1485de8..4bb5d23 100644
--- a/src/main/python/apache/thermos/observer/task_observer.py
+++ b/src/main/python/apache/thermos/observer/task_observer.py
@@ -21,6 +21,7 @@ polls a designated Thermos checkpoint root and collates information about all ta
 """
 import os
 import threading
+import time
 from operator import attrgetter
 
 from twitter.common import log
@@ -31,7 +32,7 @@ from twitter.common.quantity import Amount, Time
 from apache.thermos.common.path import TaskPath
 from apache.thermos.monitoring.monitor import TaskMonitor
 from apache.thermos.monitoring.process import ProcessSample
-from apache.thermos.monitoring.resource import ResourceMonitorBase, TaskResourceMonitor
+from apache.thermos.monitoring.resource import TaskResourceMonitor
 
 from .detector import ObserverTaskDetector
 from .observed_task import ActiveObservedTask, FinishedObservedTask
@@ -55,17 +56,17 @@ class TaskObserver(ExceptionalThread, Lockable):
 
   def __init__(self,
                path_detector,
-               resource_monitor_class=TaskResourceMonitor,
-               interval=POLLING_INTERVAL):
+               interval=POLLING_INTERVAL,
+               task_process_collection_interval=TaskResourceMonitor.PROCESS_COLLECTION_INTERVAL,
+               task_disk_collection_interval=TaskResourceMonitor.DISK_COLLECTION_INTERVAL):
     self._detector = ObserverTaskDetector(
         path_detector,
         self.__on_active,
         self.__on_finished,
         self.__on_removed)
-    if not issubclass(resource_monitor_class, ResourceMonitorBase):
-      raise ValueError("resource monitor class must implement ResourceMonitorBase!")
-    self._resource_monitor_class = resource_monitor_class
     self._interval = interval
+    self._task_process_collection_interval = task_process_collection_interval
+    self._task_disk_collection_interval = task_disk_collection_interval
     self._active_tasks = {}    # task_id => ActiveObservedTask
     self._finished_tasks = {}  # task_id => FinishedObservedTask
     self._stop_event = threading.Event()
@@ -100,7 +101,11 @@ class TaskObserver(ExceptionalThread, Lockable):
       log.error('Found an active task (%s) in finished tasks?' % task_id)
       return
     task_monitor = TaskMonitor(root, task_id)
-    resource_monitor = self._resource_monitor_class(task_id, task_monitor)
+    resource_monitor = TaskResourceMonitor(
+        task_id,
+        task_monitor,
+        process_collection_interval=self._task_process_collection_interval,
+        disk_collection_interval=self._task_disk_collection_interval)
     resource_monitor.start()
     self._active_tasks[task_id] = ActiveObservedTask(
         root,
@@ -132,7 +137,9 @@ class TaskObserver(ExceptionalThread, Lockable):
     while not self._stop_event.is_set():
       self._stop_event.wait(self._interval.as_(Time.SECONDS))
       with self.lock:
+        start = time.time()
         self._detector.refresh()
+        log.debug("TaskObserver: finished checkpoint refresh in %.2fs" % (time.time() - start))
 
   @Lockable.sync
   def process_from_name(self, task_id, process_id):

http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py b/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py
deleted file mode 100644
index e1c8dec..0000000
--- a/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import os
-import unittest
-
-from mock import Mock, create_autospec, patch
-from twitter.common.quantity import Amount, Time
-
-from apache.aurora.tools.thermos_observer import initialize
-from apache.thermos.observer.task_observer import TaskObserver
-
-
-class ThermosObserverMainTest(unittest.TestCase):
-  def test_initialize(self):
-    expected_interval = Amount(15, Time.SECONDS)
-    mock_options = Mock(spec_set=['root', 'mesos_root', 'polling_interval_secs'])
-    mock_options.root = ''
-    mock_options.mesos_root = os.path.abspath('.')
-    mock_options.polling_interval_secs = int(expected_interval.as_(Time.SECONDS))
-    mock_task_observer = create_autospec(spec=TaskObserver)
-    with patch(
-        'apache.aurora.tools.thermos_observer.TaskObserver',
-        return_value=mock_task_observer) as mock_observer:
-
-      initialize(mock_options)
-
-      assert len(mock_observer.mock_calls) == 1
-      args = mock_observer.mock_calls[0][2]
-      assert expected_interval == args['interval']