You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/05/08 21:59:16 UTC
git commit: Added an optional timeout to metrics collection.
Repository: mesos
Updated Branches:
refs/heads/master d2e404864 -> 02e6fa98a
Added an optional timeout to metrics collection.
Review: https://reviews.apache.org/r/21092
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/02e6fa98
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/02e6fa98
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/02e6fa98
Branch: refs/heads/master
Commit: 02e6fa98a2497c05881f3ff62cf6ca3d8a7b368f
Parents: d2e4048
Author: Dominic Hamon <dh...@twopensource.com>
Authored: Thu May 8 12:15:23 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Thu May 8 12:59:08 2014 -0700
----------------------------------------------------------------------
.../include/process/metrics/metrics.hpp | 4 +
3rdparty/libprocess/src/metrics/metrics.cpp | 52 +++++++++--
3rdparty/libprocess/src/tests/metrics_tests.cpp | 93 ++++++++++++++++++++
3 files changed, 143 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/02e6fa98/3rdparty/libprocess/include/process/metrics/metrics.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/metrics.hpp b/3rdparty/libprocess/include/process/metrics/metrics.hpp
index d5176ee..0a97f12 100644
--- a/3rdparty/libprocess/include/process/metrics/metrics.hpp
+++ b/3rdparty/libprocess/include/process/metrics/metrics.hpp
@@ -13,6 +13,7 @@
#include <stout/hashmap.hpp>
#include <stout/nothing.hpp>
+#include <stout/option.hpp>
namespace process {
namespace metrics {
@@ -45,8 +46,11 @@ private:
Future<http::Response> snapshot(const http::Request& request);
Future<http::Response> _snapshot(const http::Request& request);
+ static std::list<Future<double> > _snapshotTimeout(
+ const std::list<Future<double> >& futures);
static Future<http::Response> __snapshot(
const http::Request& request,
+ const Option<Duration>& timeout,
const hashmap<std::string, Future<double> >& metrics,
const hashmap<std::string, Option<Statistics<double> > >& statistics);
http://git-wip-us.apache.org/repos/asf/mesos/blob/02e6fa98/3rdparty/libprocess/src/metrics/metrics.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/metrics/metrics.cpp b/3rdparty/libprocess/src/metrics/metrics.cpp
index bb36c72..38905d3 100644
--- a/3rdparty/libprocess/src/metrics/metrics.cpp
+++ b/3rdparty/libprocess/src/metrics/metrics.cpp
@@ -51,6 +51,10 @@ string MetricsProcess::help()
"This endpoint provides information regarding the current metrics ",
"tracked by the system.",
"",
+ "The optional query parameter 'timeout' determines the maximum ",
+ "amount of time the endpoint will take to respond. If the timeout ",
+ "is exceeded, some metrics may not be included in the response.",
+ "",
"The key is the metric name, and the value is a double-type."));
}
@@ -87,35 +91,71 @@ Future<http::Response> MetricsProcess::snapshot(const http::Request& request)
Future<http::Response> MetricsProcess::_snapshot(const http::Request& request)
{
+ // Parse the 'timeout' parameter.
+ Option<Duration> timeout;
+
+ if (request.query.contains("timeout")) {
+ string parameter = request.query.get("timeout").get();
+
+ Try<Duration> duration = Duration::parse(parameter);
+
+ if (duration.isError()) {
+ return http::BadRequest(
+ "Invalid timeout '" + parameter + "':" + duration.error() + ".\n");
+ }
+
+ timeout = duration.get();
+ }
+
hashmap<string, Future<double> > futures;
hashmap<string, Option<Statistics<double> > > statistics;
foreachkey (const string& metric, metrics) {
CHECK_NOTNULL(metrics[metric].get());
futures[metric] = metrics[metric]->value();
- // TODO(dhamon): It would be nice to get these in parallel.
+ // TODO(dhamon): It would be nice to compute these asynchronously.
statistics[metric] = metrics[metric]->statistics();
}
- return await(futures.values())
- .then(lambda::bind(__snapshot, request, futures, statistics));
+ if (timeout.isSome()) {
+ return await(futures.values())
+ .after(timeout.get(), lambda::bind(_snapshotTimeout, futures.values()))
+ .then(lambda::bind(__snapshot, request, timeout, futures, statistics));
+ } else {
+ return await(futures.values())
+ .then(lambda::bind(__snapshot, request, timeout, futures, statistics));
+ }
+}
+
+
+list<Future<double> > MetricsProcess::_snapshotTimeout(
+ const list<Future<double> >& futures)
+{
+ // Stop waiting for all futures to transition and return a 'ready'
+ // list to proceed handling the request.
+ return futures;
}
Future<http::Response> MetricsProcess::__snapshot(
const http::Request& request,
+ const Option<Duration>& timeout,
const hashmap<string, Future<double> >& metrics,
const hashmap<string, Option<Statistics<double> > >& statistics)
{
JSON::Object object;
foreachpair (const string& key, const Future<double>& value, metrics) {
- // Value.
- if (value.isReady()) {
+ // TODO(dhamon): Maybe add the failure message for this metric to the
+ // response if value.isFailed().
+ if (value.isPending()) {
+ CHECK_SOME(timeout);
+ VLOG(1) << "Exceeded timeout of " << timeout.get() << " when attempting "
+ << "to get metric '" << key << "'";
+ } else if (value.isReady()) {
object.values[key] = value.get();
}
- // Statistics.
Option<Statistics<double> > statistics_ = statistics.get(key).get();
if (statistics_.isSome()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/02e6fa98/3rdparty/libprocess/src/tests/metrics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp b/3rdparty/libprocess/src/tests/metrics_tests.cpp
index 9f04b9b..8a13121 100644
--- a/3rdparty/libprocess/src/tests/metrics_tests.cpp
+++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
@@ -18,6 +18,7 @@
using namespace process;
+using process::http::BadRequest;
using process::http::OK;
using process::http::Response;
@@ -38,6 +39,11 @@ public:
{
return Failure("failure");
}
+
+ Future<double> pending()
+ {
+ return Future<double>();
+ }
};
@@ -155,9 +161,11 @@ TEST(Metrics, Snapshot)
ASSERT_TRUE(pid);
Gauge gauge("test/gauge", defer(pid, &GaugeProcess::get));
+ Gauge gaugeFail("test/gauge_fail", defer(pid, &GaugeProcess::fail));
Counter counter("test/counter");
AWAIT_READY(metrics::add(gauge));
+ AWAIT_READY(metrics::add(gaugeFail));
AWAIT_READY(metrics::add(counter));
// Advance the clock to avoid rate limit.
@@ -165,21 +173,106 @@ TEST(Metrics, Snapshot)
// Get the snapshot.
response = http::get(upid, "snapshot");
+
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
JSON::Object expected;
expected.values["test/counter"] = 0.0;
expected.values["test/gauge"] = 42.0;
+
AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(expected), response);
// Remove the metrics and ensure they are no longer in the snapshot.
AWAIT_READY(metrics::remove(gauge));
+ AWAIT_READY(metrics::remove(gaugeFail));
AWAIT_READY(metrics::remove(counter));
// Advance the clock to avoid rate limit.
Clock::advance(Seconds(1));
response = http::get(upid, "snapshot");
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(JSON::Object()), response);
+
+ terminate(process);
+ wait(process);
+}
+
+
+TEST(Metrics, SnapshotTimeout)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ UPID upid("metrics", process::ip(), process::port());
+
+ Clock::pause();
+
+ // Advance the clock to avoid rate limit.
+ Clock::advance(Seconds(1));
+
+ // Ensure the timeout parameter is validated.
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+ BadRequest().status,
+ http::get(upid, "snapshot?timeout=foobar"));
+
+ // Advance the clock to avoid rate limit.
+ Clock::advance(Seconds(1));
+
+ // Before adding any metrics, the response should be empty.
+ Future<Response> response = http::get(upid, "snapshot?timeout=2secs");
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+ AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(JSON::Object()), response);
+
+ // Add a gauge and a counter.
+ GaugeProcess process;
+ PID<GaugeProcess> pid = spawn(&process);
+ ASSERT_TRUE(pid);
+
+ Gauge gauge("test/gauge", defer(pid, &GaugeProcess::get));
+ Gauge gaugeFail("test/gauge_fail", defer(pid, &GaugeProcess::fail));
+ Gauge gaugeTimeout("test/gauge_timeout", defer(pid, &GaugeProcess::pending));
+ Counter counter("test/counter");
+
+ AWAIT_READY(metrics::add(gauge));
+ AWAIT_READY(metrics::add(gaugeFail));
+ AWAIT_READY(metrics::add(gaugeTimeout));
+ AWAIT_READY(metrics::add(counter));
+
+ // Advance the clock to avoid rate limit.
+ Clock::advance(Seconds(1));
+
+ // Get the snapshot.
+ response = http::get(upid, "snapshot?timeout=2secs");
+
+ // Make sure the request is pending before the timeout is exceeded.
+ Clock::settle();
+
+ ASSERT_TRUE(response.isPending());
+
+ // Advance the clock to trigger the timeout.
+ Clock::advance(Seconds(2));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ JSON::Object expected;
+ expected.values["test/counter"] = 0.0;
+ expected.values["test/gauge"] = 42.0;
+
+ AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(expected), response);
+
+ // Remove the metrics and ensure they are no longer in the snapshot.
+ AWAIT_READY(metrics::remove(gauge));
+ AWAIT_READY(metrics::remove(gaugeFail));
+ AWAIT_READY(metrics::remove(gaugeTimeout));
+ AWAIT_READY(metrics::remove(counter));
+
+ // Advance the clock to avoid rate limit.
+ Clock::advance(Seconds(1));
+
+ response = http::get(upid, "snapshot?timeout=2secs");
+
AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(JSON::Object()), response);