You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/06/08 01:17:52 UTC
git commit: Added a new 'statistics.json' endpoint to the
ResourceMonitor, this deprecates the old usage.json endpoint.
Updated Branches:
refs/heads/master 3f07a9437 -> 1523e5e04
Added a new 'statistics.json' endpoint to the ResourceMonitor, this
deprecates the old usage.json endpoint.
This is v2 of the monitoring statistics, added with a new 'statistics.json' endpoint.
The 'usage.json' endpoint is now deprecated, and will be removed at a later point.
From: Ben Mahler <be...@gmail.com>
Review: https://reviews.apache.org/r/11641
Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/1523e5e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/1523e5e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/1523e5e0
Branch: refs/heads/master
Commit: 1523e5e04332b455f658bdaf6a01f2c3e581b29e
Parents: 3f07a94
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri Jun 7 16:17:19 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 16:17:19 2013 -0700
----------------------------------------------------------------------
include/mesos/mesos.proto | 15 ++--
src/slave/cgroups_isolator.cpp | 21 ++++-
src/slave/cgroups_isolator.hpp | 2 +
src/slave/monitor.cpp | 155 ++++++++++++++++++++++++++++++-----
src/slave/monitor.hpp | 10 ++
src/slave/process_isolator.cpp | 51 +++++++++---
src/slave/process_isolator.hpp | 1 +
src/tests/isolator_tests.cpp | 21 ++++--
src/tests/monitor_tests.cpp | 77 +++++++++++++++---
9 files changed, 294 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/1523e5e0/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index ece6559..8cbcd9a 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -263,15 +263,18 @@ message ResourceStatistics {
required double timestamp = 1; // Snapshot time, in seconds since the Epoch.
// CPU Usage Information:
- // A percentage based cpu usage rate since the last snapshot.
- // This is akin to what the 'top' program shows.
- optional double cpu_usage = 2;
// Total CPU time spent in user mode, and kernel mode.
- optional double cpu_user_time = 3; // In seconds.
- optional double cpu_system_time = 4; // In seconds.
+ optional double cpus_user_time_secs = 2;
+ optional double cpus_system_time_secs = 3;
+
+ // Number of CPUs allocated.
+ required double cpus_limit = 4;
// Memory Usage Information:
- optional uint64 memory_rss = 5; // Resident Set Size (in bytes).
+ optional uint64 mem_rss_bytes = 5; // Resident Set Size.
+
+ // Amount of memory resources allocated.
+ optional uint64 mem_limit_bytes = 6;
// TODO(bmahler): Add disk usage.
// TODO(bmahler): Add network usage?
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/1523e5e0/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 9b3a3a5..553844b 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -651,6 +651,8 @@ void CgroupsIsolator::resourcesChanged(
return;
}
+ info->resources = resources;
+
LOG(INFO) << "Changing cgroup controls for executor " << executorId
<< " of framework " << frameworkId
<< " with resources " << resources;
@@ -689,6 +691,17 @@ Future<ResourceStatistics> CgroupsIsolator::usage(
ResourceStatistics result;
result.set_timestamp(Clock::now().secs());
+ // Set the resource allocations.
+ Option<Bytes> mem = info->resources.mem();
+ if (mem.isSome()) {
+ result.set_mem_limit_bytes(mem.get().bytes());
+ }
+
+ Option<double> cpus = info->resources.cpus();
+ if (cpus.isSome()) {
+ result.set_cpus_limit(cpus.get());
+ }
+
Try<hashmap<string, uint64_t> > stat =
cgroups::stat(hierarchy, info->name(), "cpuacct.stat");
@@ -700,8 +713,10 @@ Future<ResourceStatistics> CgroupsIsolator::usage(
// TODO(bmahler): Add namespacing to cgroups to enforce the expected
// structure, e.g., cgroups::cpuacct::stat.
if (stat.get().contains("user") && stat.get().contains("system")) {
- result.set_cpu_user_time((double) stat.get()["user"] / (double) ticks);
- result.set_cpu_system_time((double) stat.get()["system"] / (double) ticks);
+ result.set_cpus_user_time_secs(
+ (double) stat.get()["user"] / (double) ticks);
+ result.set_cpus_system_time_secs(
+ (double) stat.get()["system"] / (double) ticks);
}
stat = cgroups::stat(hierarchy, info->name(), "memory.stat");
@@ -714,7 +729,7 @@ Future<ResourceStatistics> CgroupsIsolator::usage(
// TODO(bmahler): Add namespacing to cgroups to enforce the expected
// structure, e.g, cgroups::memory::stat.
if (stat.get().contains("rss")) {
- result.set_memory_rss(stat.get()["rss"]);
+ result.set_mem_rss_bytes(stat.get()["rss"]);
}
return result;
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/1523e5e0/src/slave/cgroups_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.hpp b/src/slave/cgroups_isolator.hpp
index 7b8270d..124a4b3 100644
--- a/src/slave/cgroups_isolator.hpp
+++ b/src/slave/cgroups_isolator.hpp
@@ -169,6 +169,8 @@ private:
Flags flags; // Slave flags.
+ Resources resources; // Resources allocated to the cgroup.
+
// Used to cancel the OOM listening.
process::Future<uint64_t> oomNotifier;
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/1523e5e0/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index 5de1c15..4f3c91f 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -37,6 +37,8 @@
using namespace process;
+using process::statistics;
+
using std::map;
using std::string;
@@ -49,7 +51,15 @@ using process::wait; // Necessary on some OS's to disambiguate.
// Resource statistics constants.
// These match the names in the ResourceStatistics protobuf.
// TODO(bmahler): Later, when we have a richer monitoring story,
-// we will want to publish these outisde of this file.
+// we will want to publish these outside of this file.
+const std::string CPUS_TIME_SECS = "cpus_time_secs";
+const std::string CPUS_USER_TIME_SECS = "cpus_user_time_secs";
+const std::string CPUS_SYSTEM_TIME_SECS = "cpus_system_time_secs";
+const std::string CPUS_LIMIT = "cpus_limit";
+const std::string MEM_RSS_BYTES = "mem_rss_bytes";
+const std::string MEM_LIMIT_BYTES = "mem_limit_bytes";
+
+// TODO(bmahler): Deprecated statistical names, these will be removed!
const std::string CPU_TIME = "cpu_time";
const std::string CPU_USAGE = "cpu_usage";
const std::string MEMORY_RSS = "memory_rss";
@@ -61,11 +71,15 @@ void publish(
const ExecutorID& executorId,
const ResourceStatistics& statistics);
-Future<http::Response> _usage(
+Future<http::Response> _statisticsJSON(
const hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> >& executors,
const map<string, double>& statistics,
const Option<string>& jsonp);
+Future<http::Response> _usage(
+ const hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> >& executors,
+ const map<string, double>& statistics,
+ const Option<string>& jsonp);
Future<Nothing> ResourceMonitorProcess::watch(
const FrameworkID& frameworkId,
@@ -84,9 +98,9 @@ Future<Nothing> ResourceMonitorProcess::watch(
const string& prefix =
strings::join("/", frameworkId.value(), executorId.value(), "");
- process::statistics->meter(
+ ::statistics->meter(
"monitor",
- prefix + CPU_TIME,
+ prefix + CPUS_TIME_SECS,
new meters::TimeRate(prefix + CPU_USAGE));
// Schedule the resource collection.
@@ -105,8 +119,13 @@ Future<Nothing> ResourceMonitorProcess::unwatch(
// In case we've already noticed the executor was terminated,
// we need to archive the statistics first.
- process::statistics->archive("monitor", prefix + MEMORY_RSS);
- process::statistics->archive("monitor", prefix + CPU_TIME);
+ // No need to archive CPUS_USAGE as it is implicitly archived along
+ // with CPUS_TIME_SECS.
+ ::statistics->archive("monitor", prefix + CPUS_USER_TIME_SECS);
+ ::statistics->archive("monitor", prefix + CPUS_SYSTEM_TIME_SECS);
+ ::statistics->archive("monitor", prefix + CPUS_LIMIT);
+ ::statistics->archive("monitor", prefix + MEM_RSS_BYTES);
+ ::statistics->archive("monitor", prefix + MEM_LIMIT_BYTES);
if (!watches.contains(frameworkId) ||
!watches[frameworkId].contains(executorId)) {
@@ -195,20 +214,114 @@ void publish(
const string& prefix =
strings::join("/", frameworkId.value(), executorId.value(), "");
- // Publish memory statistic.
- process::statistics->set(
+ // Publish cpu usage statistics.
+ ::statistics->set(
+ "monitor",
+ prefix + CPUS_USER_TIME_SECS,
+ statistics.cpus_user_time_secs(),
+ time);
+ ::statistics->set(
"monitor",
- prefix + MEMORY_RSS,
- statistics.memory_rss(),
+ prefix + CPUS_SYSTEM_TIME_SECS,
+ statistics.cpus_system_time_secs(),
+ time);
+ ::statistics->set(
+ "monitor",
+ prefix + CPUS_LIMIT,
+ statistics.cpus_limit(),
+ time);
+ // The applied meter from watch() will publish the cpu usage.
+ ::statistics->set(
+ "monitor",
+ prefix + CPUS_TIME_SECS,
+ statistics.cpus_user_time_secs() + statistics.cpus_system_time_secs(),
time);
- // Publish cpu usage statistics. The applied meter from watch()
- // will publish the cpu usage percentage.
- process::statistics->set(
+ // Publish memory statistics.
+ ::statistics->set(
"monitor",
- prefix + CPU_TIME,
- statistics.cpu_user_time() + statistics.cpu_system_time(),
+ prefix + MEM_RSS_BYTES,
+ statistics.mem_rss_bytes(),
time);
+ ::statistics->set(
+ "monitor",
+ prefix + MEM_LIMIT_BYTES,
+ statistics.mem_limit_bytes(),
+ time);
+}
+
+
+Future<http::Response> ResourceMonitorProcess::statisticsJSON(
+ const http::Request& request)
+{
+ lambda::function<Future<http::Response>(const map<string, double>&)>
+ _statisticsJSON = lambda::bind(
+ slave::_statisticsJSON,
+ watches,
+ lambda::_1,
+ request.query.get("jsonp"));
+
+ return ::statistics->get("monitor").then(_statisticsJSON);
+}
+
+
+Future<http::Response> _statisticsJSON(
+ const hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> >& watches,
+ const map<string, double>& statistics,
+ const Option<string>& jsonp)
+{
+ JSON::Array result;
+
+ foreachkey (const FrameworkID& frameworkId, watches) {
+ foreachkey (const ExecutorID& executorId, watches.get(frameworkId).get()) {
+ const ExecutorInfo& info =
+ watches.get(frameworkId).get().get(executorId).get();
+ const string& prefix =
+ strings::join("/", frameworkId.value(), executorId.value(), "");
+
+ // Export zero values by default.
+ JSON::Object usage;
+ usage.values[CPUS_USER_TIME_SECS] = 0;
+ usage.values[CPUS_SYSTEM_TIME_SECS] = 0;
+ usage.values[CPUS_LIMIT] = 0;
+ usage.values[MEM_RSS_BYTES] = 0;
+ usage.values[MEM_LIMIT_BYTES] = 0;
+
+ // Set the cpu usage data if present.
+ if (statistics.count(prefix + CPUS_USER_TIME_SECS) > 0) {
+ usage.values[CPUS_USER_TIME_SECS] =
+ statistics.find(prefix + CPUS_USER_TIME_SECS)->second;
+ }
+ if (statistics.count(prefix + CPUS_SYSTEM_TIME_SECS) > 0) {
+ usage.values[CPUS_SYSTEM_TIME_SECS] =
+ statistics.find(prefix + CPUS_SYSTEM_TIME_SECS)->second;
+ }
+ if (statistics.count(prefix + CPUS_LIMIT) > 0) {
+ usage.values[CPUS_LIMIT] = statistics.find(prefix + CPUS_LIMIT)->second;
+ }
+
+ // Set the memory usage data if present.
+ if (statistics.count(prefix + MEM_RSS_BYTES) > 0) {
+ usage.values[MEM_RSS_BYTES] =
+ statistics.find(prefix + MEM_RSS_BYTES)->second;
+ }
+ if (statistics.count(prefix + MEM_LIMIT_BYTES) > 0) {
+ usage.values[MEM_LIMIT_BYTES] =
+ statistics.find(prefix + MEM_LIMIT_BYTES)->second;
+ }
+
+ JSON::Object entry;
+ entry.values["framework_id"] = frameworkId.value();
+ entry.values["executor_id"] = executorId.value();
+ entry.values["executor_name"] = info.name();
+ entry.values["source"] = info.source();
+ entry.values["statistics"] = usage;
+
+ result.values.push_back(entry);
+ }
+ }
+
+ return http::OK(result, jsonp);
}
@@ -222,7 +335,7 @@ Future<http::Response> ResourceMonitorProcess::usage(
lambda::_1,
request.query.get("jsonp"));
- return process::statistics->get("monitor").then(_usage);
+ return ::statistics->get("monitor").then(_usage);
}
@@ -250,11 +363,13 @@ Future<http::Response> _usage(
if (statistics.count(prefix + CPU_USAGE) > 0) {
usage.values[CPU_USAGE] = statistics.find(prefix + CPU_USAGE)->second;
}
- if (statistics.count(prefix + CPU_TIME) > 0) {
- usage.values[CPU_TIME] = statistics.find(prefix + CPU_TIME)->second;
+ if (statistics.count(prefix + CPUS_TIME_SECS) > 0) {
+ usage.values[CPU_TIME] =
+ statistics.find(prefix + CPUS_TIME_SECS)->second;
}
- if (statistics.count(prefix + MEMORY_RSS) > 0) {
- usage.values[MEMORY_RSS] = statistics.find(prefix + MEMORY_RSS)->second;
+ if (statistics.count(prefix + MEM_RSS_BYTES) > 0) {
+ usage.values[MEMORY_RSS] =
+ statistics.find(prefix + MEM_RSS_BYTES)->second;
}
JSON::Object entry;
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/1523e5e0/src/slave/monitor.hpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.hpp b/src/slave/monitor.hpp
index 8770dc0..e5fdcbd 100644
--- a/src/slave/monitor.hpp
+++ b/src/slave/monitor.hpp
@@ -44,6 +44,10 @@ class ResourceMonitorProcess;
// Provides resource monitoring for executors. Resource usage time
// series are stored using the Statistics module. Usage information
// is also exported via a JSON endpoint.
+// TODO(bmahler): Once the deprecated usage.json endpoint is removed,
+// clean this up!! It will be possible to drive collection directly
+// via the http endpoints, and the isolator can return all
+// information in one request.
// TODO(bmahler): Forward usage information to the master.
// TODO(bmahler): Consider pulling out the resource collection into
// a Collector abstraction. The monitor can then become a true
@@ -98,6 +102,7 @@ public:
protected:
virtual void initialize()
{
+ route("/statistics.json", &ResourceMonitorProcess::statisticsJSON);
route("/usage.json", &ResourceMonitorProcess::usage);
}
@@ -113,6 +118,11 @@ private:
const ExecutorID& executorId,
const Duration& interval);
+ // Returns the monitoring statistics. Requests have no parameters.
+ process::Future<process::http::Response> statisticsJSON(
+ const process::http::Request& request);
+
+ // TODO(bmahler): Deprecated.
// Returns the usage information. Requests have no parameters.
process::Future<process::http::Response> usage(
const process::http::Request& request);
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/1523e5e0/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index d4f7b76..b54bf7e 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -296,6 +296,19 @@ void ProcessIsolator::resourcesChanged(
const Resources& resources)
{
CHECK(initialized) << "Cannot do resourcesChanged before initialization!";
+
+ if (!infos.contains(frameworkId) ||
+ !infos[frameworkId].contains(executorId) ||
+ infos[frameworkId][executorId]->killed) {
+ LOG(INFO) << "Asked to update resources for an unknown/killed executor '"
+ << executorId << "' of framework " << frameworkId;
+ return;
+ }
+
+ ProcessInfo* info = CHECK_NOTNULL(infos[frameworkId][executorId]);
+
+ info->resources = resources;
+
// Do nothing; subclasses may override this.
}
@@ -365,6 +378,17 @@ Future<ResourceStatistics> ProcessIsolator::usage(
result.set_timestamp(Clock::now().secs());
+ // Set the resource allocations.
+ Option<Bytes> mem = info->resources.mem();
+ if (mem.isSome()) {
+ result.set_mem_limit_bytes(mem.get().bytes());
+ }
+
+ Option<double> cpus = info->resources.cpus();
+ if (cpus.isSome()) {
+ result.set_cpus_limit(cpus.get());
+ }
+
#ifdef __linux__
// Get the page size, used for memory accounting.
// NOTE: This is more portable than using getpagesize().
@@ -384,9 +408,11 @@ Future<ResourceStatistics> ProcessIsolator::usage(
return Future<ResourceStatistics>::failed(status.error());
}
- result.set_memory_rss(status.get().rss * pageSize);
- result.set_cpu_user_time((double) status.get().utime / (double) ticks);
- result.set_cpu_system_time((double) status.get().stime / (double) ticks);
+ result.set_mem_rss_bytes(status.get().rss * pageSize);
+ result.set_cpus_user_time_secs(
+ (double) status.get().utime / (double) ticks);
+ result.set_cpus_system_time_secs(
+ (double) status.get().stime / (double) ticks);
// Now aggregate all descendant process usage statistics.
Try<set<pid_t> > children = proc::children(info->pid.get(), true);
@@ -408,16 +434,15 @@ Future<ResourceStatistics> ProcessIsolator::usage(
continue;
}
- result.set_memory_rss(
- result.memory_rss() +
- status.get().rss * pageSize);
+ result.set_mem_rss_bytes(
+ result.mem_rss_bytes() + status.get().rss * pageSize);
- result.set_cpu_user_time(
- result.cpu_user_time() +
+ result.set_cpus_user_time_secs(
+ result.cpus_user_time_secs() +
(double) status.get().utime / (double) ticks);
- result.set_cpu_system_time(
- result.cpu_system_time() +
+ result.set_cpus_system_time_secs(
+ result.cpus_system_time_secs() +
(double) status.get().stime / (double) ticks);
}
#elif defined __APPLE__
@@ -441,11 +466,11 @@ Future<ResourceStatistics> ProcessIsolator::usage(
"Failed to get proc_pidinfo: " + stringify(size));
}
- result.set_memory_rss(task.pti_resident_size);
+ result.set_mem_rss_bytes(task.pti_resident_size);
// NOTE: CPU Times are in nanoseconds, but this is not documented!
- result.set_cpu_user_time(Nanoseconds(task.pti_total_user).secs());
- result.set_cpu_system_time(Nanoseconds(task.pti_total_system).secs());
+ result.set_cpus_user_time_secs(Nanoseconds(task.pti_total_user).secs());
+ result.set_cpus_system_time_secs(Nanoseconds(task.pti_total_system).secs());
#endif
return result;
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/1523e5e0/src/slave/process_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.hpp b/src/slave/process_isolator.hpp
index 9875f4a..ee693f6 100644
--- a/src/slave/process_isolator.hpp
+++ b/src/slave/process_isolator.hpp
@@ -101,6 +101,7 @@ private:
ExecutorID executorId;
Option<pid_t> pid; // PID of the forked executor process.
bool killed; // True if "killing" has been initiated via 'killExecutor'.
+ Resources resources; // Resources allocated to the process tree.
};
// TODO(benh): Make variables const by passing them via constructor.
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/1523e5e0/src/tests/isolator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/isolator_tests.cpp b/src/tests/isolator_tests.cpp
index aae8b2f..7013fa2 100644
--- a/src/tests/isolator_tests.cpp
+++ b/src/tests/isolator_tests.cpp
@@ -119,6 +119,12 @@ TYPED_TEST(IsolatorTest, Usage)
task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ Resources resources(offers.get()[0].resources());
+ Option<Bytes> mem = resources.mem();
+ ASSERT_SOME(mem);
+ Option<double> cpus = resources.cpus();
+ ASSERT_SOME(cpus);
+
const std::string& file = path::join(flags.work_dir, "ready");
// This task induces user/system load in a child process by
@@ -172,9 +178,9 @@ TYPED_TEST(IsolatorTest, Usage)
statistics = usage.get();
// If we meet our usage expectations, we're done!
- if (statistics.memory_rss() >= 1024u &&
- statistics.cpu_user_time() >= 0.125 &&
- statistics.cpu_system_time() >= 0.125) {
+ if (statistics.cpus_user_time_secs() >= 0.125 &&
+ statistics.cpus_system_time_secs() >= 0.125 &&
+ statistics.mem_rss_bytes() >= 1024u) {
break;
}
@@ -182,9 +188,12 @@ TYPED_TEST(IsolatorTest, Usage)
waited += Milliseconds(100);
} while (waited < Seconds(10));
- EXPECT_GE(statistics.memory_rss(), 1024u);
- EXPECT_GE(statistics.cpu_user_time(), 0.125);
- EXPECT_GE(statistics.cpu_system_time(), 0.125);
+
+ EXPECT_GE(statistics.cpus_user_time_secs(), 0.125);
+ EXPECT_GE(statistics.cpus_system_time_secs(), 0.125);
+ EXPECT_EQ(statistics.cpus_limit(), cpus.get());
+ EXPECT_GE(statistics.mem_rss_bytes(), 1024u);
+ EXPECT_EQ(statistics.mem_limit_bytes(), mem.get().bytes());
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/1523e5e0/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 53920a0..3142416 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -72,21 +72,34 @@ TEST(MonitorTest, WatchUnwatch)
executorInfo.set_name("name");
executorInfo.set_source("source");
+ ResourceStatistics initialStatistics;
+ initialStatistics.set_cpus_user_time_secs(0);
+ initialStatistics.set_cpus_system_time_secs(0);
+ initialStatistics.set_cpus_limit(1.0);
+ initialStatistics.set_mem_rss_bytes(0);
+ initialStatistics.set_mem_limit_bytes(2048);
+ initialStatistics.set_timestamp(Clock::now().secs());
+
ResourceStatistics statistics;
- statistics.set_cpu_user_time(5);
- statistics.set_cpu_system_time(1);
- statistics.set_memory_rss(1024);
- statistics.set_timestamp(Clock::now().secs());
+ statistics.set_cpus_user_time_secs(4);
+ statistics.set_cpus_system_time_secs(1);
+ statistics.set_cpus_limit(1.0);
+ statistics.set_mem_rss_bytes(1024);
+ statistics.set_mem_limit_bytes(2048);
+ statistics.set_timestamp(
+ initialStatistics.timestamp() +
+ slave::RESOURCE_MONITORING_INTERVAL.secs());
TestingIsolator isolator;
process::spawn(isolator);
- Future<Nothing> usage;
+ Future<Nothing> usage1, usage2;
EXPECT_CALL(isolator, usage(frameworkId, executorId))
- .WillOnce(DoAll(FutureSatisfy(&usage),
+ .WillOnce(DoAll(FutureSatisfy(&usage1),
+ Return(initialStatistics)))
+ .WillOnce(DoAll(FutureSatisfy(&usage2),
Return(statistics)));
-
slave::ResourceMonitor monitor(&isolator);
// We pause the clock first in order to make sure that we can
@@ -107,7 +120,16 @@ TEST(MonitorTest, WatchUnwatch)
process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
process::Clock::settle();
- AWAIT_READY(usage);
+ AWAIT_READY(usage1);
+
+ // Wait until the isolator has finished returning the statistics.
+ process::Clock::settle();
+
+ // The second collection will populate the cpus_usage.
+ process::Clock::advance(slave::RESOURCE_MONITORING_INTERVAL);
+ process::Clock::settle();
+
+ AWAIT_READY(usage2);
// Wait until the isolator has finished returning the statistics.
process::Clock::settle();
@@ -136,9 +158,42 @@ TEST(MonitorTest, WatchUnwatch)
"},"
"\"source\":\"source\""
"}]",
- statistics.cpu_user_time() + statistics.cpu_system_time(),
- statistics.cpu_usage(),
- statistics.memory_rss()).get(),
+ statistics.cpus_system_time_secs() + statistics.cpus_user_time_secs(),
+ (statistics.cpus_system_time_secs() +
+ statistics.cpus_user_time_secs()) /
+ slave::RESOURCE_MONITORING_INTERVAL.secs(),
+ statistics.mem_rss_bytes()).get(),
+ response);
+
+ response = process::http::get(upid, "statistics.json");
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ(
+ "application/json",
+ "Content-Type",
+ response);
+
+ // TODO(bmahler): Verify metering directly through statistics.
+ AWAIT_EXPECT_RESPONSE_BODY_EQ(
+ strings::format(
+ "[{"
+ "\"executor_id\":\"executor\","
+ "\"executor_name\":\"name\","
+ "\"framework_id\":\"framework\","
+ "\"source\":\"source\","
+ "\"statistics\":{"
+ "\"cpus_limit\":%g,"
+ "\"cpus_system_time_secs\":%g,"
+ "\"cpus_user_time_secs\":%g,"
+ "\"mem_limit_bytes\":%lu,"
+ "\"mem_rss_bytes\":%lu"
+ "}"
+ "}]",
+ statistics.cpus_limit(),
+ statistics.cpus_system_time_secs(),
+ statistics.cpus_user_time_secs(),
+ statistics.mem_limit_bytes(),
+ statistics.mem_rss_bytes()).get(),
response);
// Ensure the monitor stops polling the isolator.