You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by sa...@apache.org on 2017/07/19 01:05:10 UTC

aurora git commit: Observer task page to load consumption info from history

Repository: aurora
Updated Branches:
  refs/heads/master 243d6fa14 -> 908a450f5


Observer task page to load consumption info from history

# Observer task page to load consumption info from history

Resource consumptions of Thermos Processes are periodically calculated by TaskResourceMonitor threads (one thread per Thermos task). This information is used to display a (semi) fresh state of the tasks running on a host in the Observer host page, aka landing page. An aggregate history of the consumptions is kept at the task level, although TaskResourceMonitor needs to first collect the resource at the Process level and then aggregate them.

On the other hand, when an Observer _task page_ is visited, the resources consumption of Thermos Processes within that task are calculated again and displayed without being aggregated. This can become very slow since time to complete resource calculation is affected by the load on the host.

By applying this patch we take advantage of the periodic work and fulfill information resource requested in Observer task page from already collected resource consumptions.

Testing Done:
I stress tested this patch on a host that had a slow Observer page. Interestingly, I did not need to do much to make the Observer slow. There are a few points to be made clear first.
- We at Twitter limit the resources allocated to the Observer using `systemd`. The observer is allowed to use only 20% of a CPU core. The attached screen shots are from such a setup.
- Having assigned 20% of a cpu core to Observer, starting only 8 `task`s, each with 3 `process`es is enough to make the Observer slow; 11secs to load `task page`.

Reviewed at https://reviews.apache.org/r/60376/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/908a450f
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/908a450f
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/908a450f

Branch: refs/heads/master
Commit: 908a450f5c1a328f3c945bd73f6a2960c6738f00
Parents: 243d6fa
Author: Reza Motamedi <re...@gmail.com>
Authored: Tue Jul 18 18:03:42 2017 -0700
Committer: Santhosh Kumar <ss...@twitter.com>
Committed: Tue Jul 18 18:03:42 2017 -0700

----------------------------------------------------------------------
 .../apache/thermos/monitoring/resource.py       | 83 +++++++++++++++-----
 .../executor/common/test_resource_manager.py    |  2 +-
 .../apache/thermos/monitoring/test_resource.py  | 60 ++++++++++++--
 3 files changed, 119 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/908a450f/src/main/python/apache/thermos/monitoring/resource.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/resource.py b/src/main/python/apache/thermos/monitoring/resource.py
index 4346666..f5e3849 100644
--- a/src/main/python/apache/thermos/monitoring/resource.py
+++ b/src/main/python/apache/thermos/monitoring/resource.py
@@ -51,21 +51,40 @@ class ResourceMonitorBase(Interface):
 
   class Error(Exception): pass
 
-  class ResourceResult(namedtuple('ResourceResult', 'num_procs process_sample disk_usage')):
-    pass
+  class AggregateResourceResult(namedtuple('AggregateResourceResult',
+                                           'num_procs process_sample disk_usage')):
+    """ Class representing task level stats:
+        num_procs: total number of pids initiated by the task
+        process_sample: a .process.ProcessSample object representing resources consumed by the task
+        disk_usage: disk usage consumed in the task's sandbox
+    """
+
+  class FullResourceResult(namedtuple('FullResourceResult', 'proc_usage disk_usage')):
+    """ Class representing detailed information on task level stats:
+        proc_usage: a dictionary mapping ProcessStatus objects to ProcResourceResult objects. One
+                    entry per process in the task
+        disk_usage: disk usage consumed in the task's sandbox
+    """
+
+  class ProcResourceResult(namedtuple('ProcResourceResult', 'process_sample num_procs')):
+    """ Class representing process level stats:
+        process_sample: a .process.ProcessSample object representing resources consumed by
+                        the process
+        num_procs: total number of pids initiated by the process
+    """
 
   @abstractmethod
   def sample(self):
     """ Return a sample of the resource consumption of the task right now
 
-    Returns a tuple of (timestamp, ResourceResult)
+    Returns a tuple of (timestamp, AggregateResourceResult)
     """
 
   @abstractmethod
   def sample_at(self, time):
     """ Return a sample of the resource consumption as close as possible to the specified time
 
-    Returns a tuple of (timestamp, ResourceResult)
+    Returns a tuple of (timestamp, AggregateResourceResult)
     """
 
   @abstractmethod
@@ -77,8 +96,9 @@ class ResourceMonitorBase(Interface):
 
 
 class ResourceHistory(object):
-  """Simple class to contain a RingBuffer (fixed-length FIFO) history of resource samples, with the
-       mapping: timestamp => (number_of_procs, ProcessSample, disk_usage_in_bytes)
+  """ Simple class to contain a RingBuffer (fixed-length FIFO) history of resource samples, with the
+      mapping:
+      timestamp => ({process_status => (process_sample, number_of_procs)}, disk_usage_in_bytes)
   """
 
   def __init__(self, maxlen, initialize=True):
@@ -87,7 +107,7 @@ class ResourceHistory(object):
     self._maxlen = maxlen
     self._values = RingBuffer(maxlen, None)
     if initialize:
-      self.add(time.time(), ResourceMonitorBase.ResourceResult(0, ProcessSample.empty(), 0))
+      self.add(time.time(), ResourceMonitorBase.FullResourceResult({}, 0))
 
   def add(self, timestamp, value):
     """Store a new resource sample corresponding to the given timestamp"""
@@ -110,6 +130,17 @@ class ResourceHistory(object):
     return 'ResourceHistory(%s)' % ', '.join([str(r) for r in self._values])
 
 
+class HistoryProvider(object):
+  MAX_HISTORY = 10000  # magic number
+
+  def provides(self, history_time, min_collection_interval):
+    history_length = int(history_time.as_(Time.SECONDS) / min_collection_interval)
+    if history_length > self.MAX_HISTORY:
+      raise ValueError("Requested history length too large")
+    log.debug("Initialising ResourceHistory of length %s" % history_length)
+    return ResourceHistory(history_length)
+
+
 class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
   """ Lightweight thread to aggregate resource consumption for a task's constituent processes.
       Actual resource calculation is delegated to collectors; this class periodically polls the
@@ -117,7 +148,6 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
       history of previous sample results.
   """
 
-  MAX_HISTORY = 10000  # magic number
   PROCESS_COLLECTION_INTERVAL = Amount(20, Time.SECONDS)
   DISK_COLLECTION_INTERVAL = Amount(60, Time.SECONDS)
   HISTORY_TIME = Amount(1, Time.HOURS)
@@ -128,7 +158,8 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
                disk_collector=DiskCollector,
                process_collection_interval=PROCESS_COLLECTION_INTERVAL,
                disk_collection_interval=DISK_COLLECTION_INTERVAL,
-               history_time=HISTORY_TIME):
+               history_time=HISTORY_TIME,
+               history_provider=HistoryProvider()):
     """
       task_monitor: TaskMonitor object specifying the task whose resources should be monitored
       sandbox: Directory for which to monitor disk utilisation
@@ -142,11 +173,7 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
     self._process_collection_interval = process_collection_interval.as_(Time.SECONDS)
     self._disk_collection_interval = disk_collection_interval.as_(Time.SECONDS)
     min_collection_interval = min(self._process_collection_interval, self._disk_collection_interval)
-    history_length = int(history_time.as_(Time.SECONDS) / min_collection_interval)
-    if history_length > self.MAX_HISTORY:
-      raise ValueError("Requested history length too large")
-    log.debug("Initialising ResourceHistory of length %s" % history_length)
-    self._history = ResourceHistory(history_length)
+    self._history = history_provider.provides(history_time, min_collection_interval)
     self._kill_signal = threading.Event()
     ExceptionalThread.__init__(self, name='%s[%s]' % (self.__class__.__name__, task_id))
     self.daemon = True
@@ -157,7 +184,14 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
     return self.sample_at(time.time())
 
   def sample_at(self, timestamp):
-    return self._history.get(timestamp)
+    _timestamp, full_resources = self._history.get(timestamp)
+
+    aggregated_procs = sum(map(attrgetter('num_procs'), full_resources.proc_usage.values()))
+    aggregated_sample = sum(map(attrgetter('process_sample'), full_resources.proc_usage.values()),
+        ProcessSample.empty())
+
+    return _timestamp, self.AggregateResourceResult(
+        aggregated_procs, aggregated_sample, full_resources.disk_usage)
 
   def sample_by_process(self, process_name):
     try:
@@ -170,6 +204,12 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
       if process not in self._process_collectors:
         self._process_collectors[process] = ProcessTreeCollector(process.pid)
 
+      # The sample obtained from history is tuple of (timestamp, FullResourceResult), and per
+      # process sample can be lookup up from FullResourceResult
+      _, full_resources = self._history.get(time.time())
+      if process in full_resources.proc_usage:
+        return full_resources.proc_usage[process].process_sample
+
       self._process_collectors[process].sample()
       return self._process_collectors[process].value
 
@@ -215,11 +255,14 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread):
           log.debug('No sandbox detected yet for %s' % self._task_id)
 
       try:
-        aggregated_procs = sum(map(attrgetter('procs'), self._process_collectors.values()))
-        aggregated_sample = sum(map(attrgetter('value'), self._process_collectors.values()),
-                                ProcessSample.empty())
-        disk_value = self._disk_collector.value if self._disk_collector else 0
-        self._history.add(now, self.ResourceResult(aggregated_procs, aggregated_sample, disk_value))
+        disk_usage = self._disk_collector.value if self._disk_collector else 0
+
+        proc_usage_dict = dict()
+        for process, collector in self._process_collectors.items():
+          proc_usage_dict.update({process: self.ProcResourceResult(collector.value,
+              collector.procs)})
+
+        self._history.add(now, self.FullResourceResult(proc_usage_dict, disk_usage))
       except ValueError as err:
         log.warning("Error recording resource sample: %s" % err)
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/908a450f/src/test/python/apache/aurora/executor/common/test_resource_manager.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_resource_manager.py b/src/test/python/apache/aurora/executor/common/test_resource_manager.py
index a898e4d..23b3779 100644
--- a/src/test/python/apache/aurora/executor/common/test_resource_manager.py
+++ b/src/test/python/apache/aurora/executor/common/test_resource_manager.py
@@ -25,7 +25,7 @@ def _mock_resource_monitor(num_procs=0, process_sample=ProcessSample.empty(), di
   mock_resource_monitor = mock.Mock(spec=ResourceMonitorBase)
   mock_resource_monitor.sample.return_value = (
       12345,  # timestamp
-      ResourceMonitorBase.ResourceResult(num_procs, process_sample, disk_usage))
+      ResourceMonitorBase.AggregateResourceResult(num_procs, process_sample, disk_usage))
 
   return mock_resource_monitor
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/908a450f/src/test/python/apache/thermos/monitoring/test_resource.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/monitoring/test_resource.py b/src/test/python/apache/thermos/monitoring/test_resource.py
index d794a99..e577e55 100644
--- a/src/test/python/apache/thermos/monitoring/test_resource.py
+++ b/src/test/python/apache/thermos/monitoring/test_resource.py
@@ -16,10 +16,13 @@ from time import time
 from unittest import TestCase
 
 import mock
+import pytest
+from twitter.common.quantity import Amount, Time
 
 from apache.thermos.monitoring.monitor import TaskMonitor
 from apache.thermos.monitoring.process import ProcessSample
 from apache.thermos.monitoring.resource import (
+    HistoryProvider,
     ResourceHistory,
     ResourceMonitorBase,
     TaskResourceMonitor
@@ -28,6 +31,12 @@ from apache.thermos.monitoring.resource import (
 from gen.apache.thermos.ttypes import ProcessStatus
 
 
+class TestResourceHistoryProvider(TestCase):
+  def test_too_long_history(self):
+    with pytest.raises(ValueError):
+      HistoryProvider().provides(Amount(1, Time.DAYS), 1)
+
+
 class TestResourceHistory(TestCase):
   def setUp(self):
     self.max_len = 4
@@ -35,7 +44,7 @@ class TestResourceHistory(TestCase):
 
   def test_add(self):
     next_resource_stamp = time() + 100
-    value = ResourceMonitorBase.ResourceResult(1, 1, 0)
+    value = ResourceMonitorBase.FullResourceResult({}, 0)
 
     assert (next_resource_stamp, value) not in self.resource_history._values
     self.resource_history.add(next_resource_stamp, value)
@@ -47,8 +56,9 @@ class TestResourceHistory(TestCase):
 
   def test_get(self):
     resource_stamp = time() + 100
-    value = ResourceMonitorBase.ResourceResult(1, 1, 0)
-    value_wrong = ResourceMonitorBase.ResourceResult(1, 1, 50)
+
+    value = ResourceMonitorBase.FullResourceResult({}, 0)
+    value_wrong = ResourceMonitorBase.FullResourceResult({}, 50)
 
     self.resource_history.add(resource_stamp, value)
     self.resource_history.add(resource_stamp + 1000, value_wrong)
@@ -56,12 +66,19 @@ class TestResourceHistory(TestCase):
     assert resource_stamp, value == self.resource_history.get(resource_stamp)
 
 
-class TestTaskResouceMonitor(TestCase):
+class TestTaskResourceMonitor(TestCase):
+  class FakeResourceHistoryProvider(object):
+    def __init__(self, history):
+      self.history = history
+
+    def provides(self, history_time, min_collection_interval):
+      return self.history
+
   @mock.patch('apache.thermos.monitoring.process_collector_psutil.ProcessTreeCollector.sample',
       autospec=True, spec_set=True)
   @mock.patch('apache.thermos.monitoring.monitor.TaskMonitor.get_active_processes',
       autospec=True, spec_set=True)
-  def test_sample_by_process(self, mock_get_active_processes, mock_sample):
+  def test_sample_by_process_without_history(self, mock_get_active_processes, mock_sample):
     fake_process_name = 'fake-process-name'
     task_path = '.'
     task_monitor = TaskMonitor(task_path, 'fake-task-id')
@@ -80,6 +97,39 @@ class TestTaskResouceMonitor(TestCase):
 
   @mock.patch('apache.thermos.monitoring.monitor.TaskMonitor.get_active_processes',
       autospec=True, spec_set=True)
+  def test_sample_by_process_from_history(self, mock_get_active_processes):
+
+    fake_process_name_1 = 'fake-process-name-1'
+    fake_process_name_2 = 'fake-process-name-2'
+    task_path = '.'
+    task_monitor = TaskMonitor(task_path, 'fake-task-id')
+    fake_process_status_1 = ProcessStatus(process=fake_process_name_1)
+    fake_process_status_2 = ProcessStatus(process=fake_process_name_2)
+    mock_get_active_processes.return_value = [(fake_process_status_1, 1),
+                                              (fake_process_status_2, 2)]
+
+    fake_history = ResourceHistory(2)
+    fake_history.add(time(), ResourceMonitorBase.FullResourceResult(
+        {fake_process_status_1: ResourceMonitorBase.ProcResourceResult(ProcessSample.empty(), 1),
+         fake_process_status_2: ResourceMonitorBase.ProcResourceResult(ProcessSample.empty(), 2),
+         }, 10))
+
+    task_resource_monitor = TaskResourceMonitor('fake-task-id', task_monitor,
+        history_provider=self.FakeResourceHistoryProvider(fake_history))
+
+    assert task_resource_monitor.name == 'TaskResourceMonitor[fake-task-id]'
+    assert task_resource_monitor.sample_by_process(fake_process_name_1) == ProcessSample.empty()
+    assert task_resource_monitor.sample_by_process(fake_process_name_2) == ProcessSample.empty()
+
+    _, sample = task_resource_monitor.sample()
+    assert sample.num_procs == 3  # 1 pid in fake_process_status_1 and 2 in fake_process_status_2
+    assert sample.process_sample == ProcessSample.empty()
+    assert sample.disk_usage == 10
+    assert mock_get_active_processes.mock_calls == [mock.call(task_monitor),
+        mock.call(task_monitor)]
+
+  @mock.patch('apache.thermos.monitoring.monitor.TaskMonitor.get_active_processes',
+      autospec=True, spec_set=True)
   def test_sample_by_process_no_process(self, mock_get_active_processes):
     task_path = '.'