You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/01/22 22:42:01 UTC
[8/8] git commit: Updated the monitoring endpoint to return
instantaneous values.
Updated the monitoring endpoint to return instantaneous values.
Review: https://reviews.apache.org/r/13605
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b095fe9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b095fe9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b095fe9
Branch: refs/heads/master
Commit: 8b095fe9f0e37ae80095ae537af690777305f0b7
Parents: c593290
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Jul 25 14:50:09 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 22 13:03:11 2014 -0800
----------------------------------------------------------------------
src/slave/monitor.cpp | 116 +++++++++++++++++++++++++-----
src/slave/monitor.hpp | 33 ++++++++-
src/tests/monitor_tests.cpp | 152 +++++++++++++++++++++++++++++----------
3 files changed, 243 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b095fe9/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index 864c1d5..bb3723e 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -16,15 +16,18 @@
* limitations under the License.
*/
+#include <list>
#include <map>
#include <string>
#include <mesos/mesos.hpp>
#include <process/clock.hpp>
+#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/future.hpp>
+#include <process/help.hpp>
#include <process/http.hpp>
#include <process/process.hpp>
#include <process/statistics.hpp>
@@ -38,6 +41,7 @@
using namespace process;
+using std::list;
using std::make_pair;
using std::map;
using std::string;
@@ -164,40 +168,114 @@ void ResourceMonitorProcess::_collect(
}
-Future<http::Response> ResourceMonitorProcess::statisticsJSON(
+ResourceMonitorProcess::Usage ResourceMonitorProcess::usage(
+ const FrameworkID& frameworkId,
+ const ExecutorInfo& executorInfo)
+{
+ Usage usage;
+ usage.frameworkId = frameworkId;
+ usage.executorInfo = executorInfo;
+ usage.statistics = dispatch(
+ isolator, &Isolator::usage, frameworkId, executorInfo.executor_id());
+
+ return usage;
+}
+
+
+Future<http::Response> ResourceMonitorProcess::statistics(
const http::Request& request)
{
- JSON::Array result;
+ return limiter.acquire()
+ .then(defer(self(), &Self::_statistics, request));
+}
- foreachkey (const FrameworkID& frameworkId, executors) {
- foreachkey (const ExecutorID& executorId, executors[frameworkId]) {
- const TimeSeries<ResourceStatistics>& timeseries =
- executors[frameworkId][executorId].statistics;
- if (timeseries.empty()) {
- continue;
- }
+Future<http::Response> ResourceMonitorProcess::_statistics(
+ const http::Request& request)
+{
+ list<Usage> usages;
+ list<Future<ResourceStatistics> > futures;
- const ExecutorInfo& executorInfo =
- executors[frameworkId][executorId].executorInfo;
+ foreachkey (const FrameworkID& frameworkId, executors) {
+ foreachvalue (const MonitoringInfo& info, executors[frameworkId]) {
+ // TODO(bmahler): Consider a batch usage API on the Isolator.
+ usages.push_back(usage(frameworkId, info.executorInfo));
+ futures.push_back(usages.back().statistics);
+ }
+ }
- JSON::Object entry;
- entry.values["framework_id"] = frameworkId.value();
- entry.values["executor_id"] = executorId.value();
- entry.values["executor_name"] = executorInfo.name();
- entry.values["source"] = executorInfo.source();
+ return process::await(futures)
+ .then(defer(self(), &Self::__statistics, usages, request));
+}
- const ResourceStatistics& statistics = timeseries.latest().get().second;
- entry.values["statistics"] = JSON::Protobuf(statistics);
- result.values.push_back(entry);
+Future<http::Response> ResourceMonitorProcess::__statistics(
+ const list<ResourceMonitorProcess::Usage>& usages,
+ const http::Request& request)
+{
+ JSON::Array result;
+
+ foreach (const Usage& usage, usages) {
+ if (usage.statistics.isFailed()) {
+ LOG(WARNING) << "Failed to get resource usage for executor "
+ << usage.executorInfo.executor_id()
+ << " of framework " << usage.frameworkId
+ << ": " << usage.statistics.failure();
+ continue;
+ } else if (usage.statistics.isDiscarded()) {
+ continue;
}
+
+ JSON::Object entry;
+ entry.values["framework_id"] = usage.frameworkId.value();
+ entry.values["executor_id"] = usage.executorInfo.executor_id().value();
+ entry.values["executor_name"] = usage.executorInfo.name();
+ entry.values["source"] = usage.executorInfo.source();
+ entry.values["statistics"] = JSON::Protobuf(usage.statistics.get());
+
+ result.values.push_back(entry);
}
return http::OK(result, request.query.get("jsonp"));
}
+const string ResourceMonitorProcess::STATISTICS_HELP = HELP(
+ TLDR(
+ "Retrieve resource monitoring information."),
+ USAGE(
+ "/statistics.json"),
+ DESCRIPTION(
+ "Returns the current resource consumption data for executors",
+ "running under this slave.",
+ "",
+ "Example:",
+ "",
+ "```",
+ "[{",
+ " \"executor_id\":\"executor\",",
+ " \"executor_name\":\"name\",",
+ " \"framework_id\":\"framework\",",
+ " \"source\":\"source\",",
+ " \"statistics\":",
+ " {",
+ " \"cpus_limit\":8.25,",
+ " \"cpus_nr_periods\":769021,",
+ " \"cpus_nr_throttled\":1046,",
+ " \"cpus_system_time_secs\":34501.45,",
+ " \"cpus_throttled_time_secs\":352.597023453,",
+ " \"cpus_user_time_secs\":96348.84,",
+ " \"mem_anon_bytes\":4845449216,",
+ " \"mem_file_bytes\":260165632,",
+ " \"mem_limit_bytes\":7650410496,",
+ " \"mem_mapped_file_bytes\":7159808,",
+ " \"mem_rss_bytes\":5105614848,",
+ " \"timestamp\":1388534400.0",
+ " }",
+ "}]",
+ "```"));
+
+
ResourceMonitor::ResourceMonitor(Isolator* isolator)
{
process = new ResourceMonitorProcess(isolator);
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b095fe9/src/slave/monitor.hpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.hpp b/src/slave/monitor.hpp
index 20cf20e..b677410 100644
--- a/src/slave/monitor.hpp
+++ b/src/slave/monitor.hpp
@@ -26,6 +26,7 @@
#include <mesos/mesos.hpp>
#include <process/future.hpp>
+#include <process/limiter.hpp>
#include <process/statistics.hpp>
#include <stout/cache.hpp>
@@ -95,6 +96,7 @@ public:
ResourceMonitorProcess(Isolator* _isolator)
: ProcessBase("monitor"),
isolator(_isolator),
+ limiter(2, Seconds(1)), // 2 permits per second.
archive(MONITORING_ARCHIVED_TIME_SERIES) {}
virtual ~ResourceMonitorProcess() {}
@@ -112,7 +114,9 @@ public:
protected:
virtual void initialize()
{
- route("/statistics.json", None(), &ResourceMonitorProcess::statisticsJSON);
+ route("/statistics.json",
+ STATISTICS_HELP,
+ &ResourceMonitorProcess::statistics);
// TODO(bmahler): Add a archive.json endpoint that exposes
// historical information, once we have path parameters for
@@ -124,19 +128,42 @@ private:
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const Duration& interval);
-
void _collect(
const process::Future<ResourceStatistics>& statistics,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const Duration& interval);
+ // This is a convenience struct for bundling usage information.
+ struct Usage
+ {
+ FrameworkID frameworkId;
+ ExecutorInfo executorInfo;
+ process::Future<ResourceStatistics> statistics;
+ };
+
+ // Helper for returning the usage for a particular executor.
+ Usage usage(
+ const FrameworkID& frameworkId,
+ const ExecutorInfo& executorInfo);
+
+ // HTTP Endpoints.
// Returns the monitoring statistics. Requests have no parameters.
- process::Future<process::http::Response> statisticsJSON(
+ process::Future<process::http::Response> statistics(
+ const process::http::Request& request);
+ process::Future<process::http::Response> _statistics(
+ const process::http::Request& request);
+ process::Future<process::http::Response> __statistics(
+ const std::list<Usage>& usages,
const process::http::Request& request);
+ static const std::string STATISTICS_HELP;
+
Isolator* isolator;
+ // Used to rate limit the statistics.json endpoint.
+ process::RateLimiter limiter;
+
// Monitoring information for an executor.
struct MonitoringInfo {
// boost::circular_buffer needs a default constructor.
http://git-wip-us.apache.org/repos/asf/mesos/blob/8b095fe9/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 145ef35..7988c90 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -57,7 +57,7 @@ using testing::DoAll;
using testing::Return;
-TEST(MonitorTest, WatchUnwatch)
+TEST(MonitorTest, Collection)
{
FrameworkID frameworkId;
frameworkId.set_value("framework");
@@ -71,46 +71,43 @@ TEST(MonitorTest, WatchUnwatch)
executorInfo.set_name("name");
executorInfo.set_source("source");
- ResourceStatistics initialStatistics;
- initialStatistics.set_cpus_nr_periods(100);
- initialStatistics.set_cpus_nr_throttled(2);
- initialStatistics.set_cpus_user_time_secs(0);
- initialStatistics.set_cpus_system_time_secs(0);
- initialStatistics.set_cpus_throttled_time_secs(0.5);
- initialStatistics.set_cpus_limit(2.5);
- initialStatistics.set_mem_rss_bytes(0);
- initialStatistics.set_mem_file_bytes(0);
- initialStatistics.set_mem_anon_bytes(0);
- initialStatistics.set_mem_mapped_file_bytes(0);
- initialStatistics.set_mem_limit_bytes(2048);
- initialStatistics.set_timestamp(Clock::now().secs());
-
- ResourceStatistics statistics;
- statistics.set_cpus_nr_periods(100);
- statistics.set_cpus_nr_throttled(2);
- statistics.set_cpus_user_time_secs(4);
- statistics.set_cpus_system_time_secs(1);
- statistics.set_cpus_throttled_time_secs(0.5);
- statistics.set_cpus_limit(2.5);
- statistics.set_mem_rss_bytes(1024);
- statistics.set_mem_file_bytes(512);
- statistics.set_mem_anon_bytes(512);
- statistics.set_mem_mapped_file_bytes(256);
- statistics.set_mem_limit_bytes(2048);
- statistics.set_timestamp(
- initialStatistics.timestamp() +
- slave::RESOURCE_MONITORING_INTERVAL.secs());
+ ResourceStatistics statistics1;
+ statistics1.set_cpus_nr_periods(100);
+ statistics1.set_cpus_nr_throttled(2);
+ statistics1.set_cpus_user_time_secs(4);
+ statistics1.set_cpus_system_time_secs(1);
+ statistics1.set_cpus_throttled_time_secs(0.5);
+ statistics1.set_cpus_limit(1.0);
+ statistics1.set_mem_rss_bytes(1024);
+ statistics1.set_mem_file_bytes(0);
+ statistics1.set_mem_anon_bytes(0);
+ statistics1.set_mem_mapped_file_bytes(0);
+ statistics1.set_mem_limit_bytes(2048);
+ statistics1.set_timestamp(0);
+
+ ResourceStatistics statistics2;
+ statistics2.CopyFrom(statistics1);
+ statistics2.set_timestamp(
+ statistics2.timestamp() + slave::RESOURCE_MONITORING_INTERVAL.secs());
+
+ ResourceStatistics statistics3;
+ statistics3.CopyFrom(statistics2);
+ statistics3.set_timestamp(
+ statistics3.timestamp() + slave::RESOURCE_MONITORING_INTERVAL.secs());
TestingIsolator isolator;
process::spawn(isolator);
- Future<Nothing> usage1, usage2;
+ Future<Nothing> usage1, usage2, usage3;
EXPECT_CALL(isolator, usage(frameworkId, executorId))
.WillOnce(DoAll(FutureSatisfy(&usage1),
- Return(initialStatistics)))
+ Return(statistics1)))
.WillOnce(DoAll(FutureSatisfy(&usage2),
- Return(statistics)));
+ Return(statistics2)))
+ .WillOnce(DoAll(FutureSatisfy(&usage3),
+ Return(statistics3)));
+
slave::ResourceMonitor monitor(&isolator);
// We pause the clock first in order to make sure that we can
@@ -136,7 +133,7 @@ TEST(MonitorTest, WatchUnwatch)
// Wait until the isolator has finished returning the statistics.
process::Clock::settle();
- // The second collection will populate the cpus_usage.
+ // Expect a second collection to occur after the interval.
process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
process::Clock::settle();
@@ -145,10 +142,92 @@ TEST(MonitorTest, WatchUnwatch)
// Wait until the isolator has finished returning the statistics.
process::Clock::settle();
+ // Expect a third collection to occur after the interval.
+ process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
+ process::Clock::settle();
+
+ AWAIT_READY(usage3);
+
+ // Wait until the isolator has finished returning the statistics.
+ process::Clock::settle();
+
+ // Ensure the monitor stops polling the isolator.
+ monitor.unwatch(frameworkId, executorId);
+
+ // Wait until ResourceMonitorProcess::unwatch has completed.
+ process::Clock::settle();
+
+ // This time, Isolator::usage should not get called.
+ EXPECT_CALL(isolator, usage(frameworkId, executorId))
+ .Times(0);
+
+ process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
+ process::Clock::settle();
+}
+
+
+TEST(MonitorTest, Statistics)
+{
+ FrameworkID frameworkId;
+ frameworkId.set_value("framework");
+
+ ExecutorID executorId;
+ executorId.set_value("executor");
+
+ ExecutorInfo executorInfo;
+ executorInfo.mutable_executor_id()->CopyFrom(executorId);
+ executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+ executorInfo.set_name("name");
+ executorInfo.set_source("source");
+
+ ResourceStatistics statistics;
+ statistics.set_cpus_nr_periods(100);
+ statistics.set_cpus_nr_throttled(2);
+ statistics.set_cpus_user_time_secs(4);
+ statistics.set_cpus_system_time_secs(1);
+ statistics.set_cpus_throttled_time_secs(0.5);
+ statistics.set_cpus_limit(1.0);
+ statistics.set_mem_file_bytes(0);
+ statistics.set_mem_anon_bytes(0);
+ statistics.set_mem_mapped_file_bytes(0);
+ statistics.set_mem_rss_bytes(1024);
+ statistics.set_mem_limit_bytes(2048);
+ statistics.set_timestamp(0);
+
+ TestingIsolator isolator;
+
+ process::spawn(isolator);
+
+ Future<Nothing> usage;
+ EXPECT_CALL(isolator, usage(frameworkId, executorId))
+ .WillOnce(DoAll(FutureSatisfy(&usage),
+ Return(statistics)));
+
+ slave::ResourceMonitor monitor(&isolator);
+
+ // We pause the clock first to ensure unexpected collections
+ // are avoided.
+ process::Clock::pause();
+
+ monitor.watch(
+ frameworkId,
+ executorId,
+ executorInfo,
+ slave::RESOURCE_MONITORING_INTERVAL);
+
+ // Now wait for ResouorceMonitorProcess::watch to finish.
+ process::Clock::settle();
+
process::UPID upid("monitor", process::ip(), process::port());
+ // Request the statistics, this will ask the isolator.
Future<Response> response = process::http::get(upid, "statistics.json");
+ AWAIT_READY(response);
+
+ // The collection should have occurred on the isolator.
+ ASSERT_TRUE(usage.isReady());
+
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ(
"application/json",
@@ -204,11 +283,12 @@ TEST(MonitorTest, WatchUnwatch)
EXPECT_CALL(isolator, usage(frameworkId, executorId))
.Times(0);
+ response = process::http::get(upid, "statistics.json");
+
+ // Ensure the rate limiter acquires its permit.
process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
process::Clock::settle();
- response = process::http::get(upid, "statistics.json");
-
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ(
"application/json",