You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/01/22 22:42:01 UTC

[8/8] git commit: Updated the monitoring endpoint to return instantaneous values.

Updated the monitoring endpoint to return instantaneous values.

Review: https://reviews.apache.org/r/13605


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b095fe9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b095fe9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b095fe9

Branch: refs/heads/master
Commit: 8b095fe9f0e37ae80095ae537af690777305f0b7
Parents: c593290
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jul 25 14:50:09 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 22 13:03:11 2014 -0800

----------------------------------------------------------------------
 src/slave/monitor.cpp       | 116 +++++++++++++++++++++++++-----
 src/slave/monitor.hpp       |  33 ++++++++-
 src/tests/monitor_tests.cpp | 152 +++++++++++++++++++++++++++++----------
 3 files changed, 243 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8b095fe9/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index 864c1d5..bb3723e 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -16,15 +16,18 @@
  * limitations under the License.
  */
 
+#include <list>
 #include <map>
 #include <string>
 
 #include <mesos/mesos.hpp>
 
 #include <process/clock.hpp>
+#include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/future.hpp>
+#include <process/help.hpp>
 #include <process/http.hpp>
 #include <process/process.hpp>
 #include <process/statistics.hpp>
@@ -38,6 +41,7 @@
 
 using namespace process;
 
+using std::list;
 using std::make_pair;
 using std::map;
 using std::string;
@@ -164,40 +168,114 @@ void ResourceMonitorProcess::_collect(
 }
 
 
-Future<http::Response> ResourceMonitorProcess::statisticsJSON(
+ResourceMonitorProcess::Usage ResourceMonitorProcess::usage(
+    const FrameworkID& frameworkId,
+    const ExecutorInfo& executorInfo)
+{
+  Usage usage;
+  usage.frameworkId = frameworkId;
+  usage.executorInfo = executorInfo;
+  usage.statistics = dispatch(
+      isolator, &Isolator::usage, frameworkId, executorInfo.executor_id());
+
+  return usage;
+}
+
+
+Future<http::Response> ResourceMonitorProcess::statistics(
     const http::Request& request)
 {
-  JSON::Array result;
+  return limiter.acquire()
+    .then(defer(self(), &Self::_statistics, request));
+}
 
-  foreachkey (const FrameworkID& frameworkId, executors) {
-    foreachkey (const ExecutorID& executorId, executors[frameworkId]) {
-      const TimeSeries<ResourceStatistics>& timeseries =
-        executors[frameworkId][executorId].statistics;
 
-      if (timeseries.empty()) {
-        continue;
-      }
+Future<http::Response> ResourceMonitorProcess::_statistics(
+    const http::Request& request)
+{
+  list<Usage> usages;
+  list<Future<ResourceStatistics> > futures;
 
-      const ExecutorInfo& executorInfo =
-        executors[frameworkId][executorId].executorInfo;
+  foreachkey (const FrameworkID& frameworkId, executors) {
+    foreachvalue (const MonitoringInfo& info, executors[frameworkId]) {
+      // TODO(bmahler): Consider a batch usage API on the Isolator.
+      usages.push_back(usage(frameworkId, info.executorInfo));
+      futures.push_back(usages.back().statistics);
+    }
+  }
 
-      JSON::Object entry;
-      entry.values["framework_id"] = frameworkId.value();
-      entry.values["executor_id"] = executorId.value();
-      entry.values["executor_name"] = executorInfo.name();
-      entry.values["source"] = executorInfo.source();
+  return process::await(futures)
+    .then(defer(self(), &Self::__statistics, usages, request));
+}
 
-      const ResourceStatistics& statistics = timeseries.latest().get().second;
-      entry.values["statistics"] = JSON::Protobuf(statistics);
 
-      result.values.push_back(entry);
+Future<http::Response> ResourceMonitorProcess::__statistics(
+    const list<ResourceMonitorProcess::Usage>& usages,
+    const http::Request& request)
+{
+  JSON::Array result;
+
+  foreach (const Usage& usage, usages) {
+    if (usage.statistics.isFailed()) {
+      LOG(WARNING) << "Failed to get resource usage for executor "
+                   << usage.executorInfo.executor_id()
+                   << " of framework " << usage.frameworkId
+                   << ": " << usage.statistics.failure();
+      continue;
+    } else if (usage.statistics.isDiscarded()) {
+      continue;
     }
+
+    JSON::Object entry;
+    entry.values["framework_id"] = usage.frameworkId.value();
+    entry.values["executor_id"] = usage.executorInfo.executor_id().value();
+    entry.values["executor_name"] = usage.executorInfo.name();
+    entry.values["source"] = usage.executorInfo.source();
+    entry.values["statistics"] = JSON::Protobuf(usage.statistics.get());
+
+    result.values.push_back(entry);
   }
 
   return http::OK(result, request.query.get("jsonp"));
 }
 
 
+const string ResourceMonitorProcess::STATISTICS_HELP = HELP(
+    TLDR(
+        "Retrieve resource monitoring information."),
+    USAGE(
+        "/statistics.json"),
+    DESCRIPTION(
+        "Returns the current resource consumption data for executors",
+        "running under this slave.",
+        "",
+        "Example:",
+        "",
+        "```",
+        "[{",
+        "    \"executor_id\":\"executor\",",
+        "    \"executor_name\":\"name\",",
+        "    \"framework_id\":\"framework\",",
+        "    \"source\":\"source\",",
+        "    \"statistics\":",
+        "    {",
+        "        \"cpus_limit\":8.25,",
+        "        \"cpus_nr_periods\":769021,",
+        "        \"cpus_nr_throttled\":1046,",
+        "        \"cpus_system_time_secs\":34501.45,",
+        "        \"cpus_throttled_time_secs\":352.597023453,",
+        "        \"cpus_user_time_secs\":96348.84,",
+        "        \"mem_anon_bytes\":4845449216,",
+        "        \"mem_file_bytes\":260165632,",
+        "        \"mem_limit_bytes\":7650410496,",
+        "        \"mem_mapped_file_bytes\":7159808,",
+        "        \"mem_rss_bytes\":5105614848,",
+        "        \"timestamp\":1388534400.0",
+        "    }",
+        "}]",
+        "```"));
+
+
 ResourceMonitor::ResourceMonitor(Isolator* isolator)
 {
   process = new ResourceMonitorProcess(isolator);

http://git-wip-us.apache.org/repos/asf/mesos/blob/8b095fe9/src/slave/monitor.hpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.hpp b/src/slave/monitor.hpp
index 20cf20e..b677410 100644
--- a/src/slave/monitor.hpp
+++ b/src/slave/monitor.hpp
@@ -26,6 +26,7 @@
 #include <mesos/mesos.hpp>
 
 #include <process/future.hpp>
+#include <process/limiter.hpp>
 #include <process/statistics.hpp>
 
 #include <stout/cache.hpp>
@@ -95,6 +96,7 @@ public:
   ResourceMonitorProcess(Isolator* _isolator)
     : ProcessBase("monitor"),
       isolator(_isolator),
+      limiter(2, Seconds(1)), // 2 permits per second.
       archive(MONITORING_ARCHIVED_TIME_SERIES) {}
 
   virtual ~ResourceMonitorProcess() {}
@@ -112,7 +114,9 @@ public:
 protected:
   virtual void initialize()
   {
-    route("/statistics.json", None(), &ResourceMonitorProcess::statisticsJSON);
+    route("/statistics.json",
+          STATISTICS_HELP,
+          &ResourceMonitorProcess::statistics);
 
     // TODO(bmahler): Add a archive.json endpoint that exposes
     // historical information, once we have path parameters for
@@ -124,19 +128,42 @@ private:
       const FrameworkID& frameworkId,
       const ExecutorID& executorId,
       const Duration& interval);
-
   void _collect(
       const process::Future<ResourceStatistics>& statistics,
       const FrameworkID& frameworkId,
       const ExecutorID& executorId,
       const Duration& interval);
 
+  // This is a convenience struct for bundling usage information.
+  struct Usage
+  {
+    FrameworkID frameworkId;
+    ExecutorInfo executorInfo;
+    process::Future<ResourceStatistics> statistics;
+  };
+
+  // Helper for returning the usage for a particular executor.
+  Usage usage(
+      const FrameworkID& frameworkId,
+      const ExecutorInfo& executorInfo);
+
+  // HTTP Endpoints.
   // Returns the monitoring statistics. Requests have no parameters.
-  process::Future<process::http::Response> statisticsJSON(
+  process::Future<process::http::Response> statistics(
+      const process::http::Request& request);
+  process::Future<process::http::Response> _statistics(
+      const process::http::Request& request);
+  process::Future<process::http::Response> __statistics(
+      const std::list<Usage>& usages,
       const process::http::Request& request);
 
+  static const std::string STATISTICS_HELP;
+
   Isolator* isolator;
 
+  // Used to rate limit the statistics.json endpoint.
+  process::RateLimiter limiter;
+
   // Monitoring information for an executor.
   struct MonitoringInfo {
     // boost::circular_buffer needs a default constructor.

http://git-wip-us.apache.org/repos/asf/mesos/blob/8b095fe9/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 145ef35..7988c90 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -57,7 +57,7 @@ using testing::DoAll;
 using testing::Return;
 
 
-TEST(MonitorTest, WatchUnwatch)
+TEST(MonitorTest, Collection)
 {
   FrameworkID frameworkId;
   frameworkId.set_value("framework");
@@ -71,46 +71,43 @@ TEST(MonitorTest, WatchUnwatch)
   executorInfo.set_name("name");
   executorInfo.set_source("source");
 
-  ResourceStatistics initialStatistics;
-  initialStatistics.set_cpus_nr_periods(100);
-  initialStatistics.set_cpus_nr_throttled(2);
-  initialStatistics.set_cpus_user_time_secs(0);
-  initialStatistics.set_cpus_system_time_secs(0);
-  initialStatistics.set_cpus_throttled_time_secs(0.5);
-  initialStatistics.set_cpus_limit(2.5);
-  initialStatistics.set_mem_rss_bytes(0);
-  initialStatistics.set_mem_file_bytes(0);
-  initialStatistics.set_mem_anon_bytes(0);
-  initialStatistics.set_mem_mapped_file_bytes(0);
-  initialStatistics.set_mem_limit_bytes(2048);
-  initialStatistics.set_timestamp(Clock::now().secs());
-
-  ResourceStatistics statistics;
-  statistics.set_cpus_nr_periods(100);
-  statistics.set_cpus_nr_throttled(2);
-  statistics.set_cpus_user_time_secs(4);
-  statistics.set_cpus_system_time_secs(1);
-  statistics.set_cpus_throttled_time_secs(0.5);
-  statistics.set_cpus_limit(2.5);
-  statistics.set_mem_rss_bytes(1024);
-  statistics.set_mem_file_bytes(512);
-  statistics.set_mem_anon_bytes(512);
-  statistics.set_mem_mapped_file_bytes(256);
-  statistics.set_mem_limit_bytes(2048);
-  statistics.set_timestamp(
-      initialStatistics.timestamp() +
-      slave::RESOURCE_MONITORING_INTERVAL.secs());
+  ResourceStatistics statistics1;
+  statistics1.set_cpus_nr_periods(100);
+  statistics1.set_cpus_nr_throttled(2);
+  statistics1.set_cpus_user_time_secs(4);
+  statistics1.set_cpus_system_time_secs(1);
+  statistics1.set_cpus_throttled_time_secs(0.5);
+  statistics1.set_cpus_limit(1.0);
+  statistics1.set_mem_rss_bytes(1024);
+  statistics1.set_mem_file_bytes(0);
+  statistics1.set_mem_anon_bytes(0);
+  statistics1.set_mem_mapped_file_bytes(0);
+  statistics1.set_mem_limit_bytes(2048);
+  statistics1.set_timestamp(0);
+
+  ResourceStatistics statistics2;
+  statistics2.CopyFrom(statistics1);
+  statistics2.set_timestamp(
+      statistics2.timestamp() + slave::RESOURCE_MONITORING_INTERVAL.secs());
+
+  ResourceStatistics statistics3;
+  statistics3.CopyFrom(statistics2);
+  statistics3.set_timestamp(
+      statistics3.timestamp() + slave::RESOURCE_MONITORING_INTERVAL.secs());
 
   TestingIsolator isolator;
 
   process::spawn(isolator);
 
-  Future<Nothing> usage1, usage2;
+  Future<Nothing> usage1, usage2, usage3;
   EXPECT_CALL(isolator, usage(frameworkId, executorId))
     .WillOnce(DoAll(FutureSatisfy(&usage1),
-                    Return(initialStatistics)))
+                    Return(statistics1)))
     .WillOnce(DoAll(FutureSatisfy(&usage2),
-                    Return(statistics)));
+                    Return(statistics2)))
+    .WillOnce(DoAll(FutureSatisfy(&usage3),
+                    Return(statistics3)));
+
   slave::ResourceMonitor monitor(&isolator);
 
   // We pause the clock first in order to make sure that we can
@@ -136,7 +133,7 @@ TEST(MonitorTest, WatchUnwatch)
   // Wait until the isolator has finished returning the statistics.
   process::Clock::settle();
 
-  // The second collection will populate the cpus_usage.
+  // Expect a second collection to occur after the interval.
   process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
   process::Clock::settle();
 
@@ -145,10 +142,92 @@ TEST(MonitorTest, WatchUnwatch)
   // Wait until the isolator has finished returning the statistics.
   process::Clock::settle();
 
+  // Expect a third collection to occur after the interval.
+  process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
+  process::Clock::settle();
+
+  AWAIT_READY(usage3);
+
+  // Wait until the isolator has finished returning the statistics.
+  process::Clock::settle();
+
+  // Ensure the monitor stops polling the isolator.
+  monitor.unwatch(frameworkId, executorId);
+
+  // Wait until ResourceMonitorProcess::unwatch has completed.
+  process::Clock::settle();
+
+  // This time, Isolator::usage should not get called.
+  EXPECT_CALL(isolator, usage(frameworkId, executorId))
+    .Times(0);
+
+  process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
+  process::Clock::settle();
+}
+
+
+TEST(MonitorTest, Statistics)
+{
+  FrameworkID frameworkId;
+  frameworkId.set_value("framework");
+
+  ExecutorID executorId;
+  executorId.set_value("executor");
+
+  ExecutorInfo executorInfo;
+  executorInfo.mutable_executor_id()->CopyFrom(executorId);
+  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+  executorInfo.set_name("name");
+  executorInfo.set_source("source");
+
+  ResourceStatistics statistics;
+  statistics.set_cpus_nr_periods(100);
+  statistics.set_cpus_nr_throttled(2);
+  statistics.set_cpus_user_time_secs(4);
+  statistics.set_cpus_system_time_secs(1);
+  statistics.set_cpus_throttled_time_secs(0.5);
+  statistics.set_cpus_limit(1.0);
+  statistics.set_mem_file_bytes(0);
+  statistics.set_mem_anon_bytes(0);
+  statistics.set_mem_mapped_file_bytes(0);
+  statistics.set_mem_rss_bytes(1024);
+  statistics.set_mem_limit_bytes(2048);
+  statistics.set_timestamp(0);
+
+  TestingIsolator isolator;
+
+  process::spawn(isolator);
+
+  Future<Nothing> usage;
+  EXPECT_CALL(isolator, usage(frameworkId, executorId))
+    .WillOnce(DoAll(FutureSatisfy(&usage),
+                    Return(statistics)));
+
+  slave::ResourceMonitor monitor(&isolator);
+
+  // We pause the clock first to ensure unexpected collections
+  // are avoided.
+  process::Clock::pause();
+
+  monitor.watch(
+      frameworkId,
+      executorId,
+      executorInfo,
+      slave::RESOURCE_MONITORING_INTERVAL);
+
+  // Now wait for ResouorceMonitorProcess::watch to finish.
+  process::Clock::settle();
+
   process::UPID upid("monitor", process::ip(), process::port());
 
+  // Request the statistics, this will ask the isolator.
   Future<Response> response = process::http::get(upid, "statistics.json");
 
+  AWAIT_READY(response);
+
+  // The collection should have occurred on the isolator.
+  ASSERT_TRUE(usage.isReady());
+
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
   AWAIT_EXPECT_RESPONSE_HEADER_EQ(
       "application/json",
@@ -204,11 +283,12 @@ TEST(MonitorTest, WatchUnwatch)
   EXPECT_CALL(isolator, usage(frameworkId, executorId))
     .Times(0);
 
+  response = process::http::get(upid, "statistics.json");
+
+  // Ensure the rate limiter acquires its permit.
   process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
   process::Clock::settle();
 
-  response = process::http::get(upid, "statistics.json");
-
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
   AWAIT_EXPECT_RESPONSE_HEADER_EQ(
       "application/json",