You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/01/22 22:41:55 UTC

[2/8] git commit: Refactored the ResourceMonitor to use TimeSeries directly instead of Statistics.

Refactored the ResourceMonitor to use TimeSeries directly instead of Statistics.

Review: https://reviews.apache.org/r/13604


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c5932908
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c5932908
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c5932908

Branch: refs/heads/master
Commit: c5932908d858f8a3e549a1d9081f89a0bf26368c
Parents: 157e011
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Jul 23 20:11:23 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 22 13:03:11 2014 -0800

----------------------------------------------------------------------
 src/slave/monitor.cpp       | 298 ++++++++-------------------------------
 src/slave/monitor.hpp       |  39 ++++-
 src/tests/monitor_tests.cpp |  18 ++-
 3 files changed, 108 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c5932908/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index b41633f..864c1d5 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -31,14 +31,14 @@
 
 #include <stout/json.hpp>
 #include <stout/lambda.hpp>
+#include <stout/protobuf.hpp>
 
 #include "slave/isolator.hpp"
 #include "slave/monitor.hpp"
 
 using namespace process;
 
-using process::statistics;
-
+using std::make_pair;
 using std::map;
 using std::string;
 
@@ -48,34 +48,13 @@ namespace slave {
 
 using process::wait; // Necessary on some OS's to disambiguate.
 
-// Resource statistics constants.
-// These match the names in the ResourceStatistics protobuf.
-// TODO(bmahler): Later, when we have a richer monitoring story,
-// we will want to publish these outside of this file.
-// TODO(cdel): Check if we need any more of the cgroup stats.
-const std::string CPUS_USER_TIME_SECS   = "cpus_user_time_secs";
-const std::string CPUS_SYSTEM_TIME_SECS = "cpus_system_time_secs";
-const std::string CPUS_LIMIT            = "cpus_limit";
-const std::string MEM_RSS_BYTES         = "mem_rss_bytes";
-const std::string MEM_LIMIT_BYTES       = "mem_limit_bytes";
-const std::string MEM_FILE_BYTES        = "mem_file_bytes";
-const std::string MEM_ANON_BYTES        = "mem_anon_bytes";
-const std::string MEM_MAPPED_FILE_BYTES = "mem_mapped_file_bytes";
-const std::string CPUS_NR_PERIODS       = "cpus_nr_periods";
-const std::string CPUS_NR_THROTTLED     = "cpus_nr_throttled";
-const std::string CPUS_THROTTLED_TIME_SECS = "cpus_throttled_time_secs";
-
-
-// Local function prototypes.
-void publish(
-    const FrameworkID& frameworkId,
-    const ExecutorID& executorId,
-    const ResourceStatistics& statistics);
 
-Future<http::Response> _statisticsJSON(
-    const hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> >& executors,
-    const map<string, double>& statistics,
-    const Option<string>& jsonp);
+// TODO(bmahler): Consider exposing these as flags should the
+// need arise. These are conservative for the initial version.
+const Duration MONITORING_TIME_SERIES_WINDOW = Weeks(2);
+const size_t MONITORING_TIME_SERIES_CAPACITY = 1000;
+const size_t MONITORING_ARCHIVED_TIME_SERIES = 25;
+
 
 Future<Nothing> ResourceMonitorProcess::watch(
     const FrameworkID& frameworkId,
@@ -83,12 +62,15 @@ Future<Nothing> ResourceMonitorProcess::watch(
     const ExecutorInfo& executorInfo,
     const Duration& interval)
 {
-  if (watches.contains(frameworkId) &&
-      watches[frameworkId].contains(executorId)) {
+  if (executors.contains(frameworkId) &&
+      executors[frameworkId].contains(executorId)) {
     return Failure("Already watched");
   }
 
-  watches[frameworkId][executorId] = executorInfo;
+  executors[frameworkId][executorId] =
+      MonitoringInfo(executorInfo,
+                     MONITORING_TIME_SERIES_WINDOW,
+                     MONITORING_TIME_SERIES_CAPACITY);
 
   // Schedule the resource collection.
   delay(interval, self(), &Self::collect, frameworkId, executorId, interval);
@@ -101,34 +83,17 @@ Future<Nothing> ResourceMonitorProcess::unwatch(
     const FrameworkID& frameworkId,
     const ExecutorID& executorId)
 {
-  const string& prefix =
-    strings::join("/", frameworkId.value(), executorId.value(), "");
-
-  // In case we've already noticed the executor was terminated,
-  // we need to archive the statistics first.
-  // No need to archive CPUS_USAGE as it is implicitly archived along
-  // with CPUS_TIME_SECS.
-  ::statistics->archive("monitor", prefix + CPUS_USER_TIME_SECS);
-  ::statistics->archive("monitor", prefix + CPUS_SYSTEM_TIME_SECS);
-  ::statistics->archive("monitor", prefix + CPUS_LIMIT);
-  ::statistics->archive("monitor", prefix + MEM_RSS_BYTES);
-  ::statistics->archive("monitor", prefix + MEM_LIMIT_BYTES);
-  ::statistics->archive("monitor", prefix + MEM_FILE_BYTES);
-  ::statistics->archive("monitor", prefix + MEM_ANON_BYTES);
-  ::statistics->archive("monitor", prefix + MEM_MAPPED_FILE_BYTES);
-  ::statistics->archive("monitor", prefix + CPUS_NR_PERIODS);
-  ::statistics->archive("monitor", prefix + CPUS_NR_THROTTLED);
-  ::statistics->archive("monitor", prefix + CPUS_THROTTLED_TIME_SECS);
-
-  if (!watches.contains(frameworkId) ||
-      !watches[frameworkId].contains(executorId)) {
+  if (!executors.contains(frameworkId) ||
+      !executors[frameworkId].contains(executorId)) {
     return Failure("Not watched");
   }
 
-  watches[frameworkId].erase(executorId);
+  // Add the monitoring information to the archive.
+  archive.push_back(executors[frameworkId][executorId]);
+  executors[frameworkId].erase(executorId);
 
-  if (watches[frameworkId].empty()) {
-    watches.erase(frameworkId);
+  if (executors[frameworkId].empty()) {
+    executors.erase(frameworkId);
   }
 
   return Nothing();
@@ -141,8 +106,8 @@ void ResourceMonitorProcess::collect(
     const Duration& interval)
 {
   // Has the executor been unwatched?
-  if (!watches.contains(frameworkId) ||
-      !watches[frameworkId].contains(executorId)) {
+  if (!executors.contains(frameworkId) ||
+      !executors[frameworkId].contains(executorId)) {
     return;
   }
 
@@ -163,23 +128,35 @@ void ResourceMonitorProcess::_collect(
     const Duration& interval)
 {
   // Has the executor been unwatched?
-  if (!watches.contains(frameworkId) ||
-      !watches[frameworkId].contains(executorId)) {
+  if (!executors.contains(frameworkId) ||
+      !executors[frameworkId].contains(executorId)) {
     return;
   }
 
-  if (statistics.isReady()) {
-    // Publish the data to the statistics module.
-    VLOG(1) << "Publishing resource usage for executor '" << executorId
+  if (statistics.isDiscarded()) {
+    // Note that the isolator may have terminated, causing pending
+    // dispatches to be deleted.
+    VLOG(1) << "Ignoring discarded future collecting resource usage for "
+            << "executor '" << executorId
             << "' of framework '" << frameworkId << "'";
-    publish(frameworkId, executorId, statistics.get());
-  } else {
-    // Note that the isolator might have been terminated and pending
-    // dispatches deleted, causing the future to get discarded.
+  } else if (statistics.isFailed()) {
+    // TODO(bmahler): Have the Isolators discard the result when the
+    // executor was killed or completed.
     VLOG(1)
       << "Failed to collect resource usage for executor '" << executorId
-      << "' of framework '" << frameworkId << "': "
-      << (statistics.isFailed() ? statistics.failure() : "Future discarded");
+      << "' of framework '" << frameworkId << "': " << statistics.failure();
+  } else {
+    Try<Time> time = Time::create(statistics.get().timestamp());
+
+    if (time.isError()) {
+      LOG(ERROR) << "Invalid timestamp " << statistics.get().timestamp()
+                 << " for executor '" << executorId
+                 << "' of framework '" << frameworkId << ": " << time.error();
+    } else {
+      // Add the statistics to the time series.
+      executors[frameworkId][executorId].statistics.set(
+          statistics.get(), time.get());
+    }
   }
 
   // Schedule the next collection.
@@ -187,192 +164,37 @@ void ResourceMonitorProcess::_collect(
 }
 
 
-// TODO(bmahler): With slave recovery, executor uuid's will be exposed
-// to the isolator. This means that we will be able to publish
-// statistics per executor run, rather than across all runs.
-void publish(
-    const FrameworkID& frameworkId,
-    const ExecutorID& executorId,
-    const ResourceStatistics& statistics)
-{
-  Try<Time> time_ = Time::create(statistics.timestamp());
-  if (time_.isError()) {
-    LOG(ERROR) << "Not publishing the statistics because we cannot create a "
-               << "Duration from its timestamp: " << time_.error();
-    return;
-  }
-
-  Time time = time_.get();
-
-  const string& prefix =
-    strings::join("/", frameworkId.value(), executorId.value(), "");
-
-  // Publish cpu usage statistics.
-  ::statistics->set(
-      "monitor",
-      prefix + CPUS_USER_TIME_SECS,
-      statistics.cpus_user_time_secs(),
-      time);
-  ::statistics->set(
-      "monitor",
-      prefix + CPUS_SYSTEM_TIME_SECS,
-      statistics.cpus_system_time_secs(),
-      time);
-  ::statistics->set(
-      "monitor",
-      prefix + CPUS_LIMIT,
-      statistics.cpus_limit(),
-      time);
-
-  // Publish memory statistics.
-  ::statistics->set(
-      "monitor",
-      prefix + MEM_RSS_BYTES,
-      statistics.mem_rss_bytes(),
-      time);
-  ::statistics->set(
-      "monitor",
-      prefix + MEM_LIMIT_BYTES,
-      statistics.mem_limit_bytes(),
-      time);
-  ::statistics->set(
-      "monitor",
-      prefix + MEM_FILE_BYTES,
-      statistics.mem_file_bytes(),
-      time);
-  ::statistics->set(
-      "monitor",
-      prefix + MEM_ANON_BYTES,
-      statistics.mem_anon_bytes(),
-      time);
-  ::statistics->set(
-      "monitor",
-      prefix + MEM_MAPPED_FILE_BYTES,
-      statistics.mem_mapped_file_bytes(),
-      time);
-
-  // Publish cpu.stat statistics.
-  ::statistics->set(
-      "monitor",
-      prefix + CPUS_NR_PERIODS,
-      statistics.cpus_nr_periods(),
-      time);
-  ::statistics->set(
-      "monitor",
-      prefix + CPUS_NR_THROTTLED,
-      statistics.cpus_nr_throttled(),
-      time);
-  ::statistics->set(
-      "monitor",
-      prefix + CPUS_THROTTLED_TIME_SECS,
-      statistics.cpus_throttled_time_secs(),
-      time);
-}
-
-
 Future<http::Response> ResourceMonitorProcess::statisticsJSON(
     const http::Request& request)
 {
-  lambda::function<Future<http::Response>(const map<string, double>&)>
-    _statisticsJSON = lambda::bind(
-      slave::_statisticsJSON,
-      watches,
-      lambda::_1,
-      request.query.get("jsonp"));
-
-  return ::statistics->get("monitor").then(_statisticsJSON);
-}
-
-
-Future<http::Response> _statisticsJSON(
-    const hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> >& watches,
-    const map<string, double>& statistics,
-    const Option<string>& jsonp)
-{
   JSON::Array result;
 
-  foreachkey (const FrameworkID& frameworkId, watches) {
-    foreachkey (const ExecutorID& executorId, watches.get(frameworkId).get()) {
-      const ExecutorInfo& info =
-        watches.get(frameworkId).get().get(executorId).get();
-      const string& prefix =
-        strings::join("/", frameworkId.value(), executorId.value(), "");
-
-      // Export zero values by default.
-      JSON::Object usage;
-      usage.values[CPUS_USER_TIME_SECS] = 0;
-      usage.values[CPUS_SYSTEM_TIME_SECS] = 0;
-      usage.values[CPUS_LIMIT] = 0;
-      usage.values[MEM_RSS_BYTES] = 0;
-      usage.values[MEM_LIMIT_BYTES] = 0;
-      usage.values[MEM_FILE_BYTES] = 0;
-      usage.values[MEM_ANON_BYTES] = 0;
-      usage.values[MEM_MAPPED_FILE_BYTES] = 0;
-      usage.values[CPUS_NR_PERIODS] = 0;
-      usage.values[CPUS_NR_THROTTLED] = 0;
-      usage.values[CPUS_THROTTLED_TIME_SECS] = 0;
-
-      // Set the cpu usage data if present.
-      if (statistics.count(prefix + CPUS_USER_TIME_SECS) > 0) {
-        usage.values[CPUS_USER_TIME_SECS] =
-          statistics.find(prefix + CPUS_USER_TIME_SECS)->second;
-      }
-      if (statistics.count(prefix + CPUS_SYSTEM_TIME_SECS) > 0) {
-        usage.values[CPUS_SYSTEM_TIME_SECS] =
-          statistics.find(prefix + CPUS_SYSTEM_TIME_SECS)->second;
-      }
-      if (statistics.count(prefix + CPUS_LIMIT) > 0) {
-        usage.values[CPUS_LIMIT] = statistics.find(prefix + CPUS_LIMIT)->second;
-      }
+  foreachkey (const FrameworkID& frameworkId, executors) {
+    foreachkey (const ExecutorID& executorId, executors[frameworkId]) {
+      const TimeSeries<ResourceStatistics>& timeseries =
+        executors[frameworkId][executorId].statistics;
 
-      // Set the memory usage data if present.
-      if (statistics.count(prefix + MEM_RSS_BYTES) > 0) {
-        usage.values[MEM_RSS_BYTES] =
-          statistics.find(prefix + MEM_RSS_BYTES)->second;
-      }
-      if (statistics.count(prefix + MEM_LIMIT_BYTES) > 0) {
-        usage.values[MEM_LIMIT_BYTES] =
-          statistics.find(prefix + MEM_LIMIT_BYTES)->second;
-      }
-      if (statistics.count(prefix + MEM_FILE_BYTES) > 0) {
-        usage.values[MEM_FILE_BYTES] =
-          statistics.find(prefix + MEM_FILE_BYTES)->second;
-      }
-      if (statistics.count(prefix + MEM_ANON_BYTES) > 0) {
-        usage.values[MEM_ANON_BYTES] =
-          statistics.find(prefix + MEM_ANON_BYTES)->second;
-      }
-      if (statistics.count(prefix + MEM_MAPPED_FILE_BYTES) > 0) {
-        usage.values[MEM_MAPPED_FILE_BYTES] =
-          statistics.find(prefix + MEM_MAPPED_FILE_BYTES)->second;
+      if (timeseries.empty()) {
+        continue;
       }
 
-      // Set the cpu.stat data if present.
-      if (statistics.count(prefix + CPUS_NR_PERIODS) > 0) {
-        usage.values[CPUS_NR_PERIODS] =
-          statistics.find(prefix + CPUS_NR_PERIODS)->second;
-      }
-      if (statistics.count(prefix + CPUS_NR_THROTTLED) > 0) {
-        usage.values[CPUS_NR_THROTTLED] =
-          statistics.find(prefix + CPUS_NR_THROTTLED)->second;
-      }
-      if (statistics.count(prefix + CPUS_THROTTLED_TIME_SECS) > 0) {
-        usage.values[CPUS_THROTTLED_TIME_SECS] =
-          statistics.find(prefix + CPUS_THROTTLED_TIME_SECS)->second;
-      }
+      const ExecutorInfo& executorInfo =
+        executors[frameworkId][executorId].executorInfo;
 
       JSON::Object entry;
       entry.values["framework_id"] = frameworkId.value();
       entry.values["executor_id"] = executorId.value();
-      entry.values["executor_name"] = info.name();
-      entry.values["source"] = info.source();
-      entry.values["statistics"] = usage;
+      entry.values["executor_name"] = executorInfo.name();
+      entry.values["source"] = executorInfo.source();
+
+      const ResourceStatistics& statistics = timeseries.latest().get().second;
+      entry.values["statistics"] = JSON::Protobuf(statistics);
 
       result.values.push_back(entry);
     }
   }
 
-  return http::OK(result, jsonp);
+  return http::OK(result, request.query.get("jsonp"));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c5932908/src/slave/monitor.hpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.hpp b/src/slave/monitor.hpp
index 8156454..20cf20e 100644
--- a/src/slave/monitor.hpp
+++ b/src/slave/monitor.hpp
@@ -21,10 +21,14 @@
 
 #include <string>
 
+#include <boost/circular_buffer.hpp>
+
 #include <mesos/mesos.hpp>
 
 #include <process/future.hpp>
+#include <process/statistics.hpp>
 
+#include <stout/cache.hpp>
 #include <stout/duration.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/nothing.hpp>
@@ -42,6 +46,13 @@ class Isolator;
 class ResourceMonitorProcess;
 
 
+const extern Duration MONITORING_TIME_SERIES_WINDOW;
+const extern size_t MONITORING_TIME_SERIES_CAPACITY;
+
+// Number of time series to maintain for completed executors.
+const extern size_t MONITORING_ARCHIVED_TIME_SERIES;
+
+
 // Provides resource monitoring for executors. Resource usage time
 // series are stored using the Statistics module. Usage information
 // is also exported via a JSON endpoint.
@@ -82,7 +93,9 @@ class ResourceMonitorProcess : public process::Process<ResourceMonitorProcess>
 {
 public:
   ResourceMonitorProcess(Isolator* _isolator)
-    : ProcessBase("monitor"), isolator(_isolator) {}
+    : ProcessBase("monitor"),
+      isolator(_isolator),
+      archive(MONITORING_ARCHIVED_TIME_SERIES) {}
 
   virtual ~ResourceMonitorProcess() {}
 
@@ -100,6 +113,10 @@ protected:
   virtual void initialize()
   {
     route("/statistics.json", None(), &ResourceMonitorProcess::statisticsJSON);
+
+    // TODO(bmahler): Add a archive.json endpoint that exposes
+    // historical information, once we have path parameters for
+    // routes.
   }
 
 private:
@@ -120,8 +137,24 @@ private:
 
   Isolator* isolator;
 
-  // The executor info is stored for each watched executor.
-  hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> > watches;
+  // Monitoring information for an executor.
+  struct MonitoringInfo {
+    // boost::circular_buffer needs a default constructor.
+    MonitoringInfo() {}
+
+    MonitoringInfo(const ExecutorInfo& _executorInfo,
+                   const Duration& window,
+                   size_t capacity)
+      : executorInfo(_executorInfo), statistics(window, capacity) {}
+
+    ExecutorInfo executorInfo;   // Non-const for assignability.
+    process::TimeSeries<ResourceStatistics> statistics;
+  };
+
+  hashmap<FrameworkID, hashmap<ExecutorID, MonitoringInfo> > executors;
+
+  // Fixed-size history of monitoring information.
+  boost::circular_buffer<MonitoringInfo> archive;
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/c5932908/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 407ea14..145ef35 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+#include <limits>
 #include <map>
 
 #include <gmock/gmock.h>
@@ -48,6 +49,7 @@ using process::http::NotFound;
 using process::http::OK;
 using process::http::Response;
 
+using std::numeric_limits;
 using std::string;
 
 using testing::_;
@@ -55,9 +57,6 @@ using testing::DoAll;
 using testing::Return;
 
 
-// TODO(bmahler): Add additional tests:
-//   1. Check that the data has been published to statistics.
-//   2. Check that metering is occurring on subsequent resource data.
 TEST(MonitorTest, WatchUnwatch)
 {
   FrameworkID frameworkId;
@@ -73,8 +72,11 @@ TEST(MonitorTest, WatchUnwatch)
   executorInfo.set_source("source");
 
   ResourceStatistics initialStatistics;
+  initialStatistics.set_cpus_nr_periods(100);
+  initialStatistics.set_cpus_nr_throttled(2);
   initialStatistics.set_cpus_user_time_secs(0);
   initialStatistics.set_cpus_system_time_secs(0);
+  initialStatistics.set_cpus_throttled_time_secs(0.5);
   initialStatistics.set_cpus_limit(2.5);
   initialStatistics.set_mem_rss_bytes(0);
   initialStatistics.set_mem_file_bytes(0);
@@ -153,7 +155,8 @@ TEST(MonitorTest, WatchUnwatch)
       "Content-Type",
       response);
 
-  // TODO(bmahler): Verify metering directly through statistics.
+  // TODO(bmahler): Use JSON equality instead to avoid having to use
+  // numeric limits for double precision.
   AWAIT_EXPECT_RESPONSE_BODY_EQ(
       strings::format(
           "[{"
@@ -172,7 +175,9 @@ TEST(MonitorTest, WatchUnwatch)
                   "\"mem_file_bytes\":%lu,"
                   "\"mem_limit_bytes\":%lu,"
                   "\"mem_mapped_file_bytes\":%lu,"
-                  "\"mem_rss_bytes\":%lu"
+                  "\"mem_rss_bytes\":%lu,"
+                  "\"timestamp\":"
+                      "%." + stringify(numeric_limits<double>::digits10) + "g"
               "}"
           "}]",
           statistics.cpus_limit(),
@@ -185,7 +190,8 @@ TEST(MonitorTest, WatchUnwatch)
           statistics.mem_file_bytes(),
           statistics.mem_limit_bytes(),
           statistics.mem_mapped_file_bytes(),
-          statistics.mem_rss_bytes()).get(),
+          statistics.mem_rss_bytes(),
+          statistics.timestamp()).get(),
       response);
 
   // Ensure the monitor stops polling the isolator.