You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by se...@apache.org on 2017/03/21 10:16:59 UTC
aurora git commit: Make Thermos observer resource collection
intervals configurable
Repository: aurora
Updated Branches:
refs/heads/master b8f72d146 -> 33acb899b
Make Thermos observer resource collection intervals configurable
We have noticed that on hosts with lots of active tasks (~100) the observer UI
is not usable. Thermos fully utilizes one core but does not render any requests.
Dumping `/threads` indicates the observer might be backlogged by the hundred
concurrent `TaskResourceMonitor` threads. Due to the Python GIL only one can
make progress at a time though.
This patch is now adding options to control the resource collection interval,
giving operators a possibility to reduce the CPU pressure.
Testing Done:
./pants test.pytest src/{test,main}/python:: -- -v
Bugs closed: AURORA-1907
Reviewed at https://reviews.apache.org/r/57757/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/33acb899
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/33acb899
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/33acb899
Branch: refs/heads/master
Commit: 33acb899b8cbfd9914f028524cdd9428beeb06e3
Parents: b8f72d1
Author: Stephan Erb <se...@apache.org>
Authored: Tue Mar 21 09:29:41 2017 +0100
Committer: Stephan Erb <se...@apache.org>
Committed: Tue Mar 21 09:29:41 2017 +0100
----------------------------------------------------------------------
RELEASE-NOTES.md | 12 +--
docs/README.md | 1 +
docs/reference/observer-configuration.md | 89 ++++++++++++++++++++
.../apache/aurora/tools/thermos_observer.py | 24 +++++-
.../apache/thermos/monitoring/resource.py | 25 ++++--
.../apache/thermos/observer/task_observer.py | 21 +++--
.../tools/test_thermos_observer_entry_point.py | 40 ---------
7 files changed, 149 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index d58d2bd..5babea5 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -1,12 +1,12 @@
-0.18.0
-=====
+0.18.0 (unreleased)
+===================
### New/updated:
- Add message parameter to `killTasks` RPC.
-- Add prune_tasks endpoint to aurora_admin. See aurora_admin prune_tasks -h for usage information.
+- Add `prune_tasks` endpoint to `aurora_admin`. See `aurora_admin prune_tasks -h` for usage information.
- Add support for per-task volume mounts for Mesos containers to the Aurora config DSL.
-* Added the `-mesos_driver` flag to the scheduler with three possible options:
+- Added the `-mesos_driver` flag to the scheduler with three possible options:
`SCHEDULER_DRIVER`, `V0_MESOS`, `V1_MESOS`. The first uses the original driver
and the latter two use two new drivers from `libmesos`. `V0_MESOS` uses the
`SCHEDULER_DRIVER` under the hood and `V1_MESOS` uses a new HTTP API aware
@@ -14,7 +14,9 @@
Performance sensitive users should stick with the `SCHEDULER_DRIVER` or
`V0_MESOS` drivers.
- Add support for new MesosContainerizer rolled out in Mesos 1.2.0.
-* Please upgrade Aurora to 0.18 before upgrading Mesos to 1.2.0.
+ Please upgrade Aurora to 0.18 before upgrading Mesos to 1.2.0.
+- Add observer command line options to control the resource collection interval
+ for observed tasks. See [here](docs/reference/observer-configuration.md) for details.
0.17.0
======
http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/docs/README.md
----------------------------------------------------------------------
diff --git a/docs/README.md b/docs/README.md
index 1d679e2..dfd3a23 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -54,6 +54,7 @@ The complete reference of commands, configuration options, and scheduler interna
- [Client Hooks](reference/client-hooks.md)
- [Client Cluster Configuration](reference/client-cluster-configuration.md)
* [Scheduler Configuration](reference/scheduler-configuration.md)
+ * [Observer Configuration](reference/observer-configuration.md)
## Additional Resources
* [Tools integrating with Aurora](additional-resources/tools.md)
http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/docs/reference/observer-configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/observer-configuration.md b/docs/reference/observer-configuration.md
new file mode 100644
index 0000000..8a443c9
--- /dev/null
+++ b/docs/reference/observer-configuration.md
@@ -0,0 +1,89 @@
+# Observer Configuration Reference
+
+The Aurora/Thermos observer can take a variety of configuration options through command-line arguments.
+A list of the available options can be seen by running `thermos_observer --long-help`.
+
+Please refer to the [Operator Configuration Guide](../operations/configuration.md) for details on how
+to properly set the most important options.
+
+```
+$ thermos_observer.pex --long-help
+Options:
+ -h, --help, --short-help
+ show this help message and exit.
+ --long-help show options from all registered modules, not just the
+ __main__ module.
+ --mesos-root=MESOS_ROOT
+ The mesos root directory to search for Thermos
+ executor sandboxes [default: /var/lib/mesos]
+ --ip=IP The IP address the observer will bind to. [default:
+ 0.0.0.0]
+ --port=PORT The port on which the observer should listen.
+ [default: 1338]
+ --polling_interval_secs=POLLING_INTERVAL_SECS
+ The number of seconds between observer refresh
+ attempts. [default: 5]
+ --task_process_collection_interval_secs=TASK_PROCESS_COLLECTION_INTERVAL_SECS
+ The number of seconds between per task process
+ resource collections. [default: 20]
+ --task_disk_collection_interval_secs=TASK_DISK_COLLECTION_INTERVAL_SECS
+ The number of seconds between per task disk resource
+ collections. [default: 60]
+
+ From module twitter.common.app:
+ --app_daemonize Daemonize this application. [default: False]
+ --app_profile_output=FILENAME
+ Dump the profiling output to a binary profiling
+ format. [default: None]
+ --app_daemon_stderr=TWITTER_COMMON_APP_DAEMON_STDERR
+ Direct this app's stderr to this file if daemonized.
+ [default: /dev/null]
+ --app_debug Print extra debugging information during application
+ initialization. [default: False]
+ --app_rc_filename Print the filename for the rc file and quit. [default:
+ False]
+ --app_daemon_stdout=TWITTER_COMMON_APP_DAEMON_STDOUT
+ Direct this app's stdout to this file if daemonized.
+ [default: /dev/null]
+ --app_profiling Run profiler on the code while it runs. Note this can
+ cause slowdowns. [default: False]
+ --app_ignore_rc_file
+ Ignore default arguments from the rc file. [default:
+ False]
+ --app_pidfile=TWITTER_COMMON_APP_PIDFILE
+ The pidfile to use if --app_daemonize is specified.
+ [default: None]
+
+ From module twitter.common.log.options:
+ --log_to_stdout=[scheme:]LEVEL
+ OBSOLETE - legacy flag, use --log_to_stderr instead.
+ [default: ERROR]
+ --log_to_stderr=[scheme:]LEVEL
+ The level at which logging to stderr [default: ERROR].
+ Takes either LEVEL or scheme:LEVEL, where LEVEL is one
+ of ['INFO', 'NONE', 'WARN', 'ERROR', 'DEBUG', 'FATAL']
+ and scheme is one of ['google', 'plain'].
+ --log_to_disk=[scheme:]LEVEL
+ The level at which logging to disk [default: INFO].
+ Takes either LEVEL or scheme:LEVEL, where LEVEL is one
+ of ['INFO', 'NONE', 'WARN', 'ERROR', 'DEBUG', 'FATAL']
+ and scheme is one of ['google', 'plain'].
+ --log_dir=DIR The directory into which log files will be generated
+ [default: /var/tmp].
+ --log_simple Write a single log file rather than one log file per
+ log level [default: False].
+ --log_to_scribe=[scheme:]LEVEL
+ The level at which logging to scribe [default: NONE].
+ Takes either LEVEL or scheme:LEVEL, where LEVEL is one
+ of ['INFO', 'NONE', 'WARN', 'ERROR', 'DEBUG', 'FATAL']
+ and scheme is one of ['google', 'plain'].
+ --scribe_category=CATEGORY
+ The category used when logging to the scribe daemon.
+ [default: python_default].
+ --scribe_buffer Buffer messages when scribe is unavailable rather than
+ dropping them. [default: False].
+ --scribe_host=HOST The host running the scribe daemon. [default:
+ localhost].
+ --scribe_port=PORT The port used to connect to the scribe daemon.
+ [default: 1463].
+```
http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/src/main/python/apache/aurora/tools/thermos_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/tools/thermos_observer.py b/src/main/python/apache/aurora/tools/thermos_observer.py
index 4bba019..0318f99 100644
--- a/src/main/python/apache/aurora/tools/thermos_observer.py
+++ b/src/main/python/apache/aurora/tools/thermos_observer.py
@@ -25,6 +25,7 @@ from twitter.common.log.options import LogOptions
from twitter.common.quantity import Amount, Time
from apache.aurora.executor.common.path_detector import MesosPathDetector
+from apache.thermos.monitoring.resource import TaskResourceMonitor
from apache.thermos.observer.http.configure import configure_server
from apache.thermos.observer.task_observer import TaskObserver
@@ -60,6 +61,22 @@ app.add_option(
help='The number of seconds between observer refresh attempts.')
+app.add_option(
+ '--task_process_collection_interval_secs',
+ dest='task_process_collection_interval_secs',
+ type='int',
+ default=int(TaskResourceMonitor.PROCESS_COLLECTION_INTERVAL.as_(Time.SECONDS)),
+ help='The number of seconds between per task process resource collections.')
+
+
+app.add_option(
+ '--task_disk_collection_interval_secs',
+ dest='task_disk_collection_interval_secs',
+ type='int',
+ default=int(TaskResourceMonitor.DISK_COLLECTION_INTERVAL.as_(Time.SECONDS)),
+ help='The number of seconds between per task disk resource collections.')
+
+
# Allow an interruptible sleep so that ^C works.
def sleep_forever():
while True:
@@ -68,8 +85,11 @@ def sleep_forever():
def initialize(options):
path_detector = MesosPathDetector(options.mesos_root)
- polling_interval = Amount(options.polling_interval_secs, Time.SECONDS)
- return TaskObserver(path_detector, interval=polling_interval)
+ return TaskObserver(
+ path_detector,
+ Amount(options.polling_interval_secs, Time.SECONDS),
+ Amount(options.task_process_collection_interval_secs, Time.SECONDS),
+ Amount(options.task_disk_collection_interval_secs, Time.SECONDS))
def handle_error(exc_type, value, traceback):
http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/src/main/python/apache/thermos/monitoring/resource.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/resource.py b/src/main/python/apache/thermos/monitoring/resource.py
index 53d0ff1..4346666 100644
--- a/src/main/python/apache/thermos/monitoring/resource.py
+++ b/src/main/python/apache/thermos/monitoring/resource.py
@@ -118,15 +118,17 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
"""
MAX_HISTORY = 10000 # magic number
+ PROCESS_COLLECTION_INTERVAL = Amount(20, Time.SECONDS)
+ DISK_COLLECTION_INTERVAL = Amount(60, Time.SECONDS)
+ HISTORY_TIME = Amount(1, Time.HOURS)
def __init__(self,
task_id,
task_monitor,
- process_collector=ProcessTreeCollector,
disk_collector=DiskCollector,
- process_collection_interval=Amount(20, Time.SECONDS),
- disk_collection_interval=Amount(1, Time.MINUTES),
- history_time=Amount(1, Time.HOURS)):
+ process_collection_interval=PROCESS_COLLECTION_INTERVAL,
+ disk_collection_interval=DISK_COLLECTION_INTERVAL,
+ history_time=HISTORY_TIME):
"""
task_monitor: TaskMonitor object specifying the task whose resources should be monitored
sandbox: Directory for which to monitor disk utilisation
@@ -135,7 +137,6 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
self._task_id = task_id
log.debug('Initialising resource collection for task %s' % self._task_id)
self._process_collectors = dict() # ProcessStatus => ProcessTreeCollector
- self._process_collector_factory = process_collector
self._disk_collector_class = disk_collector
self._disk_collector = None
self._process_collection_interval = process_collection_interval.as_(Time.SECONDS)
@@ -167,7 +168,7 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
else:
# Since this might be called out of band (before the main loop is aware of the process)
if process not in self._process_collectors:
- self._process_collectors[process] = self._process_collector_factory(process.pid)
+ self._process_collectors[process] = ProcessTreeCollector(process.pid)
self._process_collectors[process].sample()
return self._process_collectors[process].value
@@ -189,7 +190,6 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
next_disk_collection = 0
while not self._kill_signal.is_set():
-
now = time.time()
if now > next_process_collection:
@@ -199,7 +199,7 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
for process in current - actives:
self._process_collectors.pop(process)
for process in actives - current:
- self._process_collectors[process] = self._process_collector_factory(process.pid)
+ self._process_collectors[process] = ProcessTreeCollector(process.pid)
for process, collector in self._process_collectors.items():
collector.sample()
@@ -223,6 +223,9 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
except ValueError as err:
log.warning("Error recording resource sample: %s" % err)
+ log.debug("TaskResourceMonitor: finished collection of %s in %.2fs" % (
+ self._task_id, (time.time() - now)))
+
# Sleep until any of the following conditions are met:
# - it's time for the next disk collection
# - it's time for the next process collection
@@ -236,6 +239,10 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
else:
waiter = self._kill_signal
- waiter.wait(timeout=max(0, next_collection))
+ if next_collection > 0:
+ waiter.wait(timeout=next_collection)
+ else:
+ log.warning('Task resource collection is backlogged. Consider increasing '
+ 'process_collection_interval and disk_collection_interval.')
log.debug('Stopping resource monitoring for task "%s"' % self._task_id)
http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/src/main/python/apache/thermos/observer/task_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/task_observer.py b/src/main/python/apache/thermos/observer/task_observer.py
index 1485de8..4bb5d23 100644
--- a/src/main/python/apache/thermos/observer/task_observer.py
+++ b/src/main/python/apache/thermos/observer/task_observer.py
@@ -21,6 +21,7 @@ polls a designated Thermos checkpoint root and collates information about all ta
"""
import os
import threading
+import time
from operator import attrgetter
from twitter.common import log
@@ -31,7 +32,7 @@ from twitter.common.quantity import Amount, Time
from apache.thermos.common.path import TaskPath
from apache.thermos.monitoring.monitor import TaskMonitor
from apache.thermos.monitoring.process import ProcessSample
-from apache.thermos.monitoring.resource import ResourceMonitorBase, TaskResourceMonitor
+from apache.thermos.monitoring.resource import TaskResourceMonitor
from .detector import ObserverTaskDetector
from .observed_task import ActiveObservedTask, FinishedObservedTask
@@ -55,17 +56,17 @@ class TaskObserver(ExceptionalThread, Lockable):
def __init__(self,
path_detector,
- resource_monitor_class=TaskResourceMonitor,
- interval=POLLING_INTERVAL):
+ interval=POLLING_INTERVAL,
+ task_process_collection_interval=TaskResourceMonitor.PROCESS_COLLECTION_INTERVAL,
+ task_disk_collection_interval=TaskResourceMonitor.DISK_COLLECTION_INTERVAL):
self._detector = ObserverTaskDetector(
path_detector,
self.__on_active,
self.__on_finished,
self.__on_removed)
- if not issubclass(resource_monitor_class, ResourceMonitorBase):
- raise ValueError("resource monitor class must implement ResourceMonitorBase!")
- self._resource_monitor_class = resource_monitor_class
self._interval = interval
+ self._task_process_collection_interval = task_process_collection_interval
+ self._task_disk_collection_interval = task_disk_collection_interval
self._active_tasks = {} # task_id => ActiveObservedTask
self._finished_tasks = {} # task_id => FinishedObservedTask
self._stop_event = threading.Event()
@@ -100,7 +101,11 @@ class TaskObserver(ExceptionalThread, Lockable):
log.error('Found an active task (%s) in finished tasks?' % task_id)
return
task_monitor = TaskMonitor(root, task_id)
- resource_monitor = self._resource_monitor_class(task_id, task_monitor)
+ resource_monitor = TaskResourceMonitor(
+ task_id,
+ task_monitor,
+ process_collection_interval=self._task_process_collection_interval,
+ disk_collection_interval=self._task_disk_collection_interval)
resource_monitor.start()
self._active_tasks[task_id] = ActiveObservedTask(
root,
@@ -132,7 +137,9 @@ class TaskObserver(ExceptionalThread, Lockable):
while not self._stop_event.is_set():
self._stop_event.wait(self._interval.as_(Time.SECONDS))
with self.lock:
+ start = time.time()
self._detector.refresh()
+ log.debug("TaskObserver: finished checkpoint refresh in %.2fs" % (time.time() - start))
@Lockable.sync
def process_from_name(self, task_id, process_id):
http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py b/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py
deleted file mode 100644
index e1c8dec..0000000
--- a/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import os
-import unittest
-
-from mock import Mock, create_autospec, patch
-from twitter.common.quantity import Amount, Time
-
-from apache.aurora.tools.thermos_observer import initialize
-from apache.thermos.observer.task_observer import TaskObserver
-
-
-class ThermosObserverMainTest(unittest.TestCase):
- def test_initialize(self):
- expected_interval = Amount(15, Time.SECONDS)
- mock_options = Mock(spec_set=['root', 'mesos_root', 'polling_interval_secs'])
- mock_options.root = ''
- mock_options.mesos_root = os.path.abspath('.')
- mock_options.polling_interval_secs = int(expected_interval.as_(Time.SECONDS))
- mock_task_observer = create_autospec(spec=TaskObserver)
- with patch(
- 'apache.aurora.tools.thermos_observer.TaskObserver',
- return_value=mock_task_observer) as mock_observer:
-
- initialize(mock_options)
-
- assert len(mock_observer.mock_calls) == 1
- args = mock_observer.mock_calls[0][2]
- assert expected_interval == args['interval']