You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/05/08 21:59:16 UTC

git commit: Added an optional timeout to metrics collection.

Repository: mesos
Updated Branches:
  refs/heads/master d2e404864 -> 02e6fa98a


Added an optional timeout to metrics collection.

Review: https://reviews.apache.org/r/21092


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/02e6fa98
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/02e6fa98
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/02e6fa98

Branch: refs/heads/master
Commit: 02e6fa98a2497c05881f3ff62cf6ca3d8a7b368f
Parents: d2e4048
Author: Dominic Hamon <dh...@twopensource.com>
Authored: Thu May 8 12:15:23 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Thu May 8 12:59:08 2014 -0700

----------------------------------------------------------------------
 .../include/process/metrics/metrics.hpp         |  4 +
 3rdparty/libprocess/src/metrics/metrics.cpp     | 52 +++++++++--
 3rdparty/libprocess/src/tests/metrics_tests.cpp | 93 ++++++++++++++++++++
 3 files changed, 143 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/02e6fa98/3rdparty/libprocess/include/process/metrics/metrics.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/metrics.hpp b/3rdparty/libprocess/include/process/metrics/metrics.hpp
index d5176ee..0a97f12 100644
--- a/3rdparty/libprocess/include/process/metrics/metrics.hpp
+++ b/3rdparty/libprocess/include/process/metrics/metrics.hpp
@@ -13,6 +13,7 @@
 
 #include <stout/hashmap.hpp>
 #include <stout/nothing.hpp>
+#include <stout/option.hpp>
 
 namespace process {
 namespace metrics {
@@ -45,8 +46,11 @@ private:
 
   Future<http::Response> snapshot(const http::Request& request);
   Future<http::Response> _snapshot(const http::Request& request);
+  static std::list<Future<double> > _snapshotTimeout(
+      const std::list<Future<double> >& futures);
   static Future<http::Response> __snapshot(
       const http::Request& request,
+      const Option<Duration>& timeout,
       const hashmap<std::string, Future<double> >& metrics,
       const hashmap<std::string, Option<Statistics<double> > >& statistics);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/02e6fa98/3rdparty/libprocess/src/metrics/metrics.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/metrics/metrics.cpp b/3rdparty/libprocess/src/metrics/metrics.cpp
index bb36c72..38905d3 100644
--- a/3rdparty/libprocess/src/metrics/metrics.cpp
+++ b/3rdparty/libprocess/src/metrics/metrics.cpp
@@ -51,6 +51,10 @@ string MetricsProcess::help()
           "This endpoint provides information regarding the current metrics ",
           "tracked by the system.",
           "",
+          "The optional query parameter 'timeout' determines the maximum ",
+          "amount of time the endpoint will take to respond. If the timeout ",
+          "is exceeded, some metrics may not be included in the response.",
+          "",
           "The key is the metric name, and the value is a double-type."));
 }
 
@@ -87,35 +91,71 @@ Future<http::Response> MetricsProcess::snapshot(const http::Request& request)
 
 Future<http::Response> MetricsProcess::_snapshot(const http::Request& request)
 {
+  // Parse the 'timeout' parameter.
+  Option<Duration> timeout;
+
+  if (request.query.contains("timeout")) {
+    string parameter = request.query.get("timeout").get();
+
+    Try<Duration> duration = Duration::parse(parameter);
+
+    if (duration.isError()) {
+      return http::BadRequest(
+          "Invalid timeout '" + parameter + "':" + duration.error() + ".\n");
+    }
+
+    timeout = duration.get();
+  }
+
   hashmap<string, Future<double> > futures;
   hashmap<string, Option<Statistics<double> > > statistics;
 
   foreachkey (const string& metric, metrics) {
     CHECK_NOTNULL(metrics[metric].get());
     futures[metric] = metrics[metric]->value();
-    // TODO(dhamon): It would be nice to get these in parallel.
+    // TODO(dhamon): It would be nice to compute these asynchronously.
     statistics[metric] = metrics[metric]->statistics();
   }
 
-  return await(futures.values())
-    .then(lambda::bind(__snapshot, request, futures, statistics));
+  if (timeout.isSome()) {
+    return await(futures.values())
+      .after(timeout.get(), lambda::bind(_snapshotTimeout, futures.values()))
+      .then(lambda::bind(__snapshot, request, timeout, futures, statistics));
+  } else {
+    return await(futures.values())
+      .then(lambda::bind(__snapshot, request, timeout, futures, statistics));
+  }
+}
+
+
+list<Future<double> > MetricsProcess::_snapshotTimeout(
+    const list<Future<double> >& futures)
+{
+  // Stop waiting for all futures to transition and return a 'ready'
+  // list to proceed handling the request.
+  return futures;
 }
 
 
 Future<http::Response> MetricsProcess::__snapshot(
     const http::Request& request,
+    const Option<Duration>& timeout,
     const hashmap<string, Future<double> >& metrics,
     const hashmap<string, Option<Statistics<double> > >& statistics)
 {
   JSON::Object object;
 
   foreachpair (const string& key, const Future<double>& value, metrics) {
-    // Value.
-    if (value.isReady()) {
+    // TODO(dhamon): Maybe add the failure message for this metric to the
+    // response if value.isFailed().
+    if (value.isPending()) {
+      CHECK_SOME(timeout);
+      VLOG(1) << "Exceeded timeout of " << timeout.get() << " when attempting "
+              << "to get metric '" << key << "'";
+    } else if (value.isReady()) {
       object.values[key] = value.get();
     }
 
-    // Statistics.
     Option<Statistics<double> > statistics_ = statistics.get(key).get();
 
     if (statistics_.isSome()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/02e6fa98/3rdparty/libprocess/src/tests/metrics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp b/3rdparty/libprocess/src/tests/metrics_tests.cpp
index 9f04b9b..8a13121 100644
--- a/3rdparty/libprocess/src/tests/metrics_tests.cpp
+++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
@@ -18,6 +18,7 @@
 
 using namespace process;
 
+using process::http::BadRequest;
 using process::http::OK;
 using process::http::Response;
 
@@ -38,6 +39,11 @@ public:
   {
     return Failure("failure");
   }
+
+  Future<double> pending()
+  {
+    return Future<double>();
+  }
 };
 
 
@@ -155,9 +161,11 @@ TEST(Metrics, Snapshot)
   ASSERT_TRUE(pid);
 
   Gauge gauge("test/gauge", defer(pid, &GaugeProcess::get));
+  Gauge gaugeFail("test/gauge_fail", defer(pid, &GaugeProcess::fail));
   Counter counter("test/counter");
 
   AWAIT_READY(metrics::add(gauge));
+  AWAIT_READY(metrics::add(gaugeFail));
   AWAIT_READY(metrics::add(counter));
 
   // Advance the clock to avoid rate limit.
@@ -165,21 +173,106 @@ TEST(Metrics, Snapshot)
 
   // Get the snapshot.
   response = http::get(upid, "snapshot");
+
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
 
   JSON::Object expected;
   expected.values["test/counter"] = 0.0;
   expected.values["test/gauge"] = 42.0;
+
   AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(expected), response);
 
   // Remove the metrics and ensure they are no longer in the snapshot.
   AWAIT_READY(metrics::remove(gauge));
+  AWAIT_READY(metrics::remove(gaugeFail));
   AWAIT_READY(metrics::remove(counter));
 
   // Advance the clock to avoid rate limit.
   Clock::advance(Seconds(1));
 
   response = http::get(upid, "snapshot");
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(JSON::Object()), response);
+
+  terminate(process);
+  wait(process);
+}
+
+
+TEST(Metrics, SnapshotTimeout)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  UPID upid("metrics", process::ip(), process::port());
+
+  Clock::pause();
+
+  // Advance the clock to avoid rate limit.
+  Clock::advance(Seconds(1));
+
+  // Ensure the timeout parameter is validated.
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+      BadRequest().status,
+      http::get(upid, "snapshot?timeout=foobar"));
+
+  // Advance the clock to avoid rate limit.
+  Clock::advance(Seconds(1));
+
+  // Before adding any metrics, the response should be empty.
+  Future<Response> response = http::get(upid, "snapshot?timeout=2secs");
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(JSON::Object()), response);
+
+  // Add a gauge and a counter.
+  GaugeProcess process;
+  PID<GaugeProcess> pid = spawn(&process);
+  ASSERT_TRUE(pid);
+
+  Gauge gauge("test/gauge", defer(pid, &GaugeProcess::get));
+  Gauge gaugeFail("test/gauge_fail", defer(pid, &GaugeProcess::fail));
+  Gauge gaugeTimeout("test/gauge_timeout", defer(pid, &GaugeProcess::pending));
+  Counter counter("test/counter");
+
+  AWAIT_READY(metrics::add(gauge));
+  AWAIT_READY(metrics::add(gaugeFail));
+  AWAIT_READY(metrics::add(gaugeTimeout));
+  AWAIT_READY(metrics::add(counter));
+
+  // Advance the clock to avoid rate limit.
+  Clock::advance(Seconds(1));
+
+  // Get the snapshot.
+  response = http::get(upid, "snapshot?timeout=2secs");
+
+  // Make sure the request is pending before the timeout is exceeded.
+  Clock::settle();
+
+  ASSERT_TRUE(response.isPending());
+
+  // Advance the clock to trigger the timeout.
+  Clock::advance(Seconds(2));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  JSON::Object expected;
+  expected.values["test/counter"] = 0.0;
+  expected.values["test/gauge"] = 42.0;
+
+  AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(expected), response);
+
+  // Remove the metrics and ensure they are no longer in the snapshot.
+  AWAIT_READY(metrics::remove(gauge));
+  AWAIT_READY(metrics::remove(gaugeFail));
+  AWAIT_READY(metrics::remove(gaugeTimeout));
+  AWAIT_READY(metrics::remove(counter));
+
+  // Advance the clock to avoid rate limit.
+  Clock::advance(Seconds(1));
+
+  response = http::get(upid, "snapshot?timeout=2secs");
+
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
   AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(JSON::Object()), response);