You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/01/22 22:41:55 UTC
[2/8] git commit: Refactored the ResourceMonitor to use TimeSeries
directly instead of Statistics.
Refactored the ResourceMonitor to use TimeSeries directly instead of Statistics.
Review: https://reviews.apache.org/r/13604
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c5932908
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c5932908
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c5932908
Branch: refs/heads/master
Commit: c5932908d858f8a3e549a1d9081f89a0bf26368c
Parents: 157e011
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Jul 23 20:11:23 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 22 13:03:11 2014 -0800
----------------------------------------------------------------------
src/slave/monitor.cpp | 298 ++++++++-------------------------------
src/slave/monitor.hpp | 39 ++++-
src/tests/monitor_tests.cpp | 18 ++-
3 files changed, 108 insertions(+), 247 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c5932908/src/slave/monitor.cpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.cpp b/src/slave/monitor.cpp
index b41633f..864c1d5 100644
--- a/src/slave/monitor.cpp
+++ b/src/slave/monitor.cpp
@@ -31,14 +31,14 @@
#include <stout/json.hpp>
#include <stout/lambda.hpp>
+#include <stout/protobuf.hpp>
#include "slave/isolator.hpp"
#include "slave/monitor.hpp"
using namespace process;
-using process::statistics;
-
+using std::make_pair;
using std::map;
using std::string;
@@ -48,34 +48,13 @@ namespace slave {
using process::wait; // Necessary on some OS's to disambiguate.
-// Resource statistics constants.
-// These match the names in the ResourceStatistics protobuf.
-// TODO(bmahler): Later, when we have a richer monitoring story,
-// we will want to publish these outside of this file.
-// TODO(cdel): Check if we need any more of the cgroup stats.
-const std::string CPUS_USER_TIME_SECS = "cpus_user_time_secs";
-const std::string CPUS_SYSTEM_TIME_SECS = "cpus_system_time_secs";
-const std::string CPUS_LIMIT = "cpus_limit";
-const std::string MEM_RSS_BYTES = "mem_rss_bytes";
-const std::string MEM_LIMIT_BYTES = "mem_limit_bytes";
-const std::string MEM_FILE_BYTES = "mem_file_bytes";
-const std::string MEM_ANON_BYTES = "mem_anon_bytes";
-const std::string MEM_MAPPED_FILE_BYTES = "mem_mapped_file_bytes";
-const std::string CPUS_NR_PERIODS = "cpus_nr_periods";
-const std::string CPUS_NR_THROTTLED = "cpus_nr_throttled";
-const std::string CPUS_THROTTLED_TIME_SECS = "cpus_throttled_time_secs";
-
-
-// Local function prototypes.
-void publish(
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const ResourceStatistics& statistics);
-Future<http::Response> _statisticsJSON(
- const hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> >& executors,
- const map<string, double>& statistics,
- const Option<string>& jsonp);
+// TODO(bmahler): Consider exposing these as flags should the
+// need arise. These are conservative for the initial version.
+const Duration MONITORING_TIME_SERIES_WINDOW = Weeks(2);
+const size_t MONITORING_TIME_SERIES_CAPACITY = 1000;
+const size_t MONITORING_ARCHIVED_TIME_SERIES = 25;
+
Future<Nothing> ResourceMonitorProcess::watch(
const FrameworkID& frameworkId,
@@ -83,12 +62,15 @@ Future<Nothing> ResourceMonitorProcess::watch(
const ExecutorInfo& executorInfo,
const Duration& interval)
{
- if (watches.contains(frameworkId) &&
- watches[frameworkId].contains(executorId)) {
+ if (executors.contains(frameworkId) &&
+ executors[frameworkId].contains(executorId)) {
return Failure("Already watched");
}
- watches[frameworkId][executorId] = executorInfo;
+ executors[frameworkId][executorId] =
+ MonitoringInfo(executorInfo,
+ MONITORING_TIME_SERIES_WINDOW,
+ MONITORING_TIME_SERIES_CAPACITY);
// Schedule the resource collection.
delay(interval, self(), &Self::collect, frameworkId, executorId, interval);
@@ -101,34 +83,17 @@ Future<Nothing> ResourceMonitorProcess::unwatch(
const FrameworkID& frameworkId,
const ExecutorID& executorId)
{
- const string& prefix =
- strings::join("/", frameworkId.value(), executorId.value(), "");
-
- // In case we've already noticed the executor was terminated,
- // we need to archive the statistics first.
- // No need to archive CPUS_USAGE as it is implicitly archived along
- // with CPUS_TIME_SECS.
- ::statistics->archive("monitor", prefix + CPUS_USER_TIME_SECS);
- ::statistics->archive("monitor", prefix + CPUS_SYSTEM_TIME_SECS);
- ::statistics->archive("monitor", prefix + CPUS_LIMIT);
- ::statistics->archive("monitor", prefix + MEM_RSS_BYTES);
- ::statistics->archive("monitor", prefix + MEM_LIMIT_BYTES);
- ::statistics->archive("monitor", prefix + MEM_FILE_BYTES);
- ::statistics->archive("monitor", prefix + MEM_ANON_BYTES);
- ::statistics->archive("monitor", prefix + MEM_MAPPED_FILE_BYTES);
- ::statistics->archive("monitor", prefix + CPUS_NR_PERIODS);
- ::statistics->archive("monitor", prefix + CPUS_NR_THROTTLED);
- ::statistics->archive("monitor", prefix + CPUS_THROTTLED_TIME_SECS);
-
- if (!watches.contains(frameworkId) ||
- !watches[frameworkId].contains(executorId)) {
+ if (!executors.contains(frameworkId) ||
+ !executors[frameworkId].contains(executorId)) {
return Failure("Not watched");
}
- watches[frameworkId].erase(executorId);
+ // Add the monitoring information to the archive.
+ archive.push_back(executors[frameworkId][executorId]);
+ executors[frameworkId].erase(executorId);
- if (watches[frameworkId].empty()) {
- watches.erase(frameworkId);
+ if (executors[frameworkId].empty()) {
+ executors.erase(frameworkId);
}
return Nothing();
@@ -141,8 +106,8 @@ void ResourceMonitorProcess::collect(
const Duration& interval)
{
// Has the executor been unwatched?
- if (!watches.contains(frameworkId) ||
- !watches[frameworkId].contains(executorId)) {
+ if (!executors.contains(frameworkId) ||
+ !executors[frameworkId].contains(executorId)) {
return;
}
@@ -163,23 +128,35 @@ void ResourceMonitorProcess::_collect(
const Duration& interval)
{
// Has the executor been unwatched?
- if (!watches.contains(frameworkId) ||
- !watches[frameworkId].contains(executorId)) {
+ if (!executors.contains(frameworkId) ||
+ !executors[frameworkId].contains(executorId)) {
return;
}
- if (statistics.isReady()) {
- // Publish the data to the statistics module.
- VLOG(1) << "Publishing resource usage for executor '" << executorId
+ if (statistics.isDiscarded()) {
+ // Note that the isolator may have terminated, causing pending
+ // dispatches to be deleted.
+ VLOG(1) << "Ignoring discarded future collecting resource usage for "
+ << "executor '" << executorId
<< "' of framework '" << frameworkId << "'";
- publish(frameworkId, executorId, statistics.get());
- } else {
- // Note that the isolator might have been terminated and pending
- // dispatches deleted, causing the future to get discarded.
+ } else if (statistics.isFailed()) {
+ // TODO(bmahler): Have the Isolators discard the result when the
+ // executor was killed or completed.
VLOG(1)
<< "Failed to collect resource usage for executor '" << executorId
- << "' of framework '" << frameworkId << "': "
- << (statistics.isFailed() ? statistics.failure() : "Future discarded");
+ << "' of framework '" << frameworkId << "': " << statistics.failure();
+ } else {
+ Try<Time> time = Time::create(statistics.get().timestamp());
+
+ if (time.isError()) {
+ LOG(ERROR) << "Invalid timestamp " << statistics.get().timestamp()
+ << " for executor '" << executorId
+ << "' of framework '" << frameworkId << ": " << time.error();
+ } else {
+ // Add the statistics to the time series.
+ executors[frameworkId][executorId].statistics.set(
+ statistics.get(), time.get());
+ }
}
// Schedule the next collection.
@@ -187,192 +164,37 @@ void ResourceMonitorProcess::_collect(
}
-// TODO(bmahler): With slave recovery, executor uuid's will be exposed
-// to the isolator. This means that we will be able to publish
-// statistics per executor run, rather than across all runs.
-void publish(
- const FrameworkID& frameworkId,
- const ExecutorID& executorId,
- const ResourceStatistics& statistics)
-{
- Try<Time> time_ = Time::create(statistics.timestamp());
- if (time_.isError()) {
- LOG(ERROR) << "Not publishing the statistics because we cannot create a "
- << "Duration from its timestamp: " << time_.error();
- return;
- }
-
- Time time = time_.get();
-
- const string& prefix =
- strings::join("/", frameworkId.value(), executorId.value(), "");
-
- // Publish cpu usage statistics.
- ::statistics->set(
- "monitor",
- prefix + CPUS_USER_TIME_SECS,
- statistics.cpus_user_time_secs(),
- time);
- ::statistics->set(
- "monitor",
- prefix + CPUS_SYSTEM_TIME_SECS,
- statistics.cpus_system_time_secs(),
- time);
- ::statistics->set(
- "monitor",
- prefix + CPUS_LIMIT,
- statistics.cpus_limit(),
- time);
-
- // Publish memory statistics.
- ::statistics->set(
- "monitor",
- prefix + MEM_RSS_BYTES,
- statistics.mem_rss_bytes(),
- time);
- ::statistics->set(
- "monitor",
- prefix + MEM_LIMIT_BYTES,
- statistics.mem_limit_bytes(),
- time);
- ::statistics->set(
- "monitor",
- prefix + MEM_FILE_BYTES,
- statistics.mem_file_bytes(),
- time);
- ::statistics->set(
- "monitor",
- prefix + MEM_ANON_BYTES,
- statistics.mem_anon_bytes(),
- time);
- ::statistics->set(
- "monitor",
- prefix + MEM_MAPPED_FILE_BYTES,
- statistics.mem_mapped_file_bytes(),
- time);
-
- // Publish cpu.stat statistics.
- ::statistics->set(
- "monitor",
- prefix + CPUS_NR_PERIODS,
- statistics.cpus_nr_periods(),
- time);
- ::statistics->set(
- "monitor",
- prefix + CPUS_NR_THROTTLED,
- statistics.cpus_nr_throttled(),
- time);
- ::statistics->set(
- "monitor",
- prefix + CPUS_THROTTLED_TIME_SECS,
- statistics.cpus_throttled_time_secs(),
- time);
-}
-
-
Future<http::Response> ResourceMonitorProcess::statisticsJSON(
const http::Request& request)
{
- lambda::function<Future<http::Response>(const map<string, double>&)>
- _statisticsJSON = lambda::bind(
- slave::_statisticsJSON,
- watches,
- lambda::_1,
- request.query.get("jsonp"));
-
- return ::statistics->get("monitor").then(_statisticsJSON);
-}
-
-
-Future<http::Response> _statisticsJSON(
- const hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> >& watches,
- const map<string, double>& statistics,
- const Option<string>& jsonp)
-{
JSON::Array result;
- foreachkey (const FrameworkID& frameworkId, watches) {
- foreachkey (const ExecutorID& executorId, watches.get(frameworkId).get()) {
- const ExecutorInfo& info =
- watches.get(frameworkId).get().get(executorId).get();
- const string& prefix =
- strings::join("/", frameworkId.value(), executorId.value(), "");
-
- // Export zero values by default.
- JSON::Object usage;
- usage.values[CPUS_USER_TIME_SECS] = 0;
- usage.values[CPUS_SYSTEM_TIME_SECS] = 0;
- usage.values[CPUS_LIMIT] = 0;
- usage.values[MEM_RSS_BYTES] = 0;
- usage.values[MEM_LIMIT_BYTES] = 0;
- usage.values[MEM_FILE_BYTES] = 0;
- usage.values[MEM_ANON_BYTES] = 0;
- usage.values[MEM_MAPPED_FILE_BYTES] = 0;
- usage.values[CPUS_NR_PERIODS] = 0;
- usage.values[CPUS_NR_THROTTLED] = 0;
- usage.values[CPUS_THROTTLED_TIME_SECS] = 0;
-
- // Set the cpu usage data if present.
- if (statistics.count(prefix + CPUS_USER_TIME_SECS) > 0) {
- usage.values[CPUS_USER_TIME_SECS] =
- statistics.find(prefix + CPUS_USER_TIME_SECS)->second;
- }
- if (statistics.count(prefix + CPUS_SYSTEM_TIME_SECS) > 0) {
- usage.values[CPUS_SYSTEM_TIME_SECS] =
- statistics.find(prefix + CPUS_SYSTEM_TIME_SECS)->second;
- }
- if (statistics.count(prefix + CPUS_LIMIT) > 0) {
- usage.values[CPUS_LIMIT] = statistics.find(prefix + CPUS_LIMIT)->second;
- }
+ foreachkey (const FrameworkID& frameworkId, executors) {
+ foreachkey (const ExecutorID& executorId, executors[frameworkId]) {
+ const TimeSeries<ResourceStatistics>& timeseries =
+ executors[frameworkId][executorId].statistics;
- // Set the memory usage data if present.
- if (statistics.count(prefix + MEM_RSS_BYTES) > 0) {
- usage.values[MEM_RSS_BYTES] =
- statistics.find(prefix + MEM_RSS_BYTES)->second;
- }
- if (statistics.count(prefix + MEM_LIMIT_BYTES) > 0) {
- usage.values[MEM_LIMIT_BYTES] =
- statistics.find(prefix + MEM_LIMIT_BYTES)->second;
- }
- if (statistics.count(prefix + MEM_FILE_BYTES) > 0) {
- usage.values[MEM_FILE_BYTES] =
- statistics.find(prefix + MEM_FILE_BYTES)->second;
- }
- if (statistics.count(prefix + MEM_ANON_BYTES) > 0) {
- usage.values[MEM_ANON_BYTES] =
- statistics.find(prefix + MEM_ANON_BYTES)->second;
- }
- if (statistics.count(prefix + MEM_MAPPED_FILE_BYTES) > 0) {
- usage.values[MEM_MAPPED_FILE_BYTES] =
- statistics.find(prefix + MEM_MAPPED_FILE_BYTES)->second;
+ if (timeseries.empty()) {
+ continue;
}
- // Set the cpu.stat data if present.
- if (statistics.count(prefix + CPUS_NR_PERIODS) > 0) {
- usage.values[CPUS_NR_PERIODS] =
- statistics.find(prefix + CPUS_NR_PERIODS)->second;
- }
- if (statistics.count(prefix + CPUS_NR_THROTTLED) > 0) {
- usage.values[CPUS_NR_THROTTLED] =
- statistics.find(prefix + CPUS_NR_THROTTLED)->second;
- }
- if (statistics.count(prefix + CPUS_THROTTLED_TIME_SECS) > 0) {
- usage.values[CPUS_THROTTLED_TIME_SECS] =
- statistics.find(prefix + CPUS_THROTTLED_TIME_SECS)->second;
- }
+ const ExecutorInfo& executorInfo =
+ executors[frameworkId][executorId].executorInfo;
JSON::Object entry;
entry.values["framework_id"] = frameworkId.value();
entry.values["executor_id"] = executorId.value();
- entry.values["executor_name"] = info.name();
- entry.values["source"] = info.source();
- entry.values["statistics"] = usage;
+ entry.values["executor_name"] = executorInfo.name();
+ entry.values["source"] = executorInfo.source();
+
+ const ResourceStatistics& statistics = timeseries.latest().get().second;
+ entry.values["statistics"] = JSON::Protobuf(statistics);
result.values.push_back(entry);
}
}
- return http::OK(result, jsonp);
+ return http::OK(result, request.query.get("jsonp"));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/c5932908/src/slave/monitor.hpp
----------------------------------------------------------------------
diff --git a/src/slave/monitor.hpp b/src/slave/monitor.hpp
index 8156454..20cf20e 100644
--- a/src/slave/monitor.hpp
+++ b/src/slave/monitor.hpp
@@ -21,10 +21,14 @@
#include <string>
+#include <boost/circular_buffer.hpp>
+
#include <mesos/mesos.hpp>
#include <process/future.hpp>
+#include <process/statistics.hpp>
+#include <stout/cache.hpp>
#include <stout/duration.hpp>
#include <stout/hashmap.hpp>
#include <stout/nothing.hpp>
@@ -42,6 +46,13 @@ class Isolator;
class ResourceMonitorProcess;
+const extern Duration MONITORING_TIME_SERIES_WINDOW;
+const extern size_t MONITORING_TIME_SERIES_CAPACITY;
+
+// Number of time series to maintain for completed executors.
+const extern size_t MONITORING_ARCHIVED_TIME_SERIES;
+
+
// Provides resource monitoring for executors. Resource usage time
// series are stored using the Statistics module. Usage information
// is also exported via a JSON endpoint.
@@ -82,7 +93,9 @@ class ResourceMonitorProcess : public process::Process<ResourceMonitorProcess>
{
public:
ResourceMonitorProcess(Isolator* _isolator)
- : ProcessBase("monitor"), isolator(_isolator) {}
+ : ProcessBase("monitor"),
+ isolator(_isolator),
+ archive(MONITORING_ARCHIVED_TIME_SERIES) {}
virtual ~ResourceMonitorProcess() {}
@@ -100,6 +113,10 @@ protected:
virtual void initialize()
{
route("/statistics.json", None(), &ResourceMonitorProcess::statisticsJSON);
+
+ // TODO(bmahler): Add a archive.json endpoint that exposes
+ // historical information, once we have path parameters for
+ // routes.
}
private:
@@ -120,8 +137,24 @@ private:
Isolator* isolator;
- // The executor info is stored for each watched executor.
- hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> > watches;
+ // Monitoring information for an executor.
+ struct MonitoringInfo {
+ // boost::circular_buffer needs a default constructor.
+ MonitoringInfo() {}
+
+ MonitoringInfo(const ExecutorInfo& _executorInfo,
+ const Duration& window,
+ size_t capacity)
+ : executorInfo(_executorInfo), statistics(window, capacity) {}
+
+ ExecutorInfo executorInfo; // Non-const for assignability.
+ process::TimeSeries<ResourceStatistics> statistics;
+ };
+
+ hashmap<FrameworkID, hashmap<ExecutorID, MonitoringInfo> > executors;
+
+ // Fixed-size history of monitoring information.
+ boost::circular_buffer<MonitoringInfo> archive;
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/c5932908/src/tests/monitor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/monitor_tests.cpp b/src/tests/monitor_tests.cpp
index 407ea14..145ef35 100644
--- a/src/tests/monitor_tests.cpp
+++ b/src/tests/monitor_tests.cpp
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+#include <limits>
#include <map>
#include <gmock/gmock.h>
@@ -48,6 +49,7 @@ using process::http::NotFound;
using process::http::OK;
using process::http::Response;
+using std::numeric_limits;
using std::string;
using testing::_;
@@ -55,9 +57,6 @@ using testing::DoAll;
using testing::Return;
-// TODO(bmahler): Add additional tests:
-// 1. Check that the data has been published to statistics.
-// 2. Check that metering is occurring on subsequent resource data.
TEST(MonitorTest, WatchUnwatch)
{
FrameworkID frameworkId;
@@ -73,8 +72,11 @@ TEST(MonitorTest, WatchUnwatch)
executorInfo.set_source("source");
ResourceStatistics initialStatistics;
+ initialStatistics.set_cpus_nr_periods(100);
+ initialStatistics.set_cpus_nr_throttled(2);
initialStatistics.set_cpus_user_time_secs(0);
initialStatistics.set_cpus_system_time_secs(0);
+ initialStatistics.set_cpus_throttled_time_secs(0.5);
initialStatistics.set_cpus_limit(2.5);
initialStatistics.set_mem_rss_bytes(0);
initialStatistics.set_mem_file_bytes(0);
@@ -153,7 +155,8 @@ TEST(MonitorTest, WatchUnwatch)
"Content-Type",
response);
- // TODO(bmahler): Verify metering directly through statistics.
+ // TODO(bmahler): Use JSON equality instead to avoid having to use
+ // numeric limits for double precision.
AWAIT_EXPECT_RESPONSE_BODY_EQ(
strings::format(
"[{"
@@ -172,7 +175,9 @@ TEST(MonitorTest, WatchUnwatch)
"\"mem_file_bytes\":%lu,"
"\"mem_limit_bytes\":%lu,"
"\"mem_mapped_file_bytes\":%lu,"
- "\"mem_rss_bytes\":%lu"
+ "\"mem_rss_bytes\":%lu,"
+ "\"timestamp\":"
+ "%." + stringify(numeric_limits<double>::digits10) + "g"
"}"
"}]",
statistics.cpus_limit(),
@@ -185,7 +190,8 @@ TEST(MonitorTest, WatchUnwatch)
statistics.mem_file_bytes(),
statistics.mem_limit_bytes(),
statistics.mem_mapped_file_bytes(),
- statistics.mem_rss_bytes()).get(),
+ statistics.mem_rss_bytes(),
+ statistics.timestamp()).get(),
response);
// Ensure the monitor stops polling the isolator.