You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/05/27 23:55:45 UTC

git commit: Ported libprocess system/stats.json endpoint to new metrics library.

Repository: mesos
Updated Branches:
  refs/heads/master c5aa1dd22 -> 997397051


Ported libprocess system/stats.json endpoint to new metrics library.

Review: https://reviews.apache.org/r/19545


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/99739705
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/99739705
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/99739705

Branch: refs/heads/master
Commit: 99739705143330abaeee7d5de82ba487631c5625
Parents: c5aa1dd
Author: Dominic Hamon <dh...@twopensource.com>
Authored: Tue May 27 14:55:16 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Tue May 27 14:55:17 2014 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |   1 +
 3rdparty/libprocess/include/process/system.hpp  | 146 ++++++++++++++++---
 3rdparty/libprocess/src/tests/metrics_tests.cpp | 146 ++++++++++++-------
 3rdparty/libprocess/src/tests/system_tests.cpp  |  56 +++++++
 4 files changed, 281 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/99739705/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index fa9b4fd..b687068 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -147,6 +147,7 @@ tests_SOURCES =							\
   src/tests/shared_tests.cpp					\
   src/tests/statistics_tests.cpp				\
   src/tests/subprocess_tests.cpp				\
+  src/tests/system_tests.cpp					\
   src/tests/timeseries_tests.cpp				\
   src/tests/time_tests.cpp
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/99739705/3rdparty/libprocess/include/process/system.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/system.hpp b/3rdparty/libprocess/include/process/system.hpp
index fbd1eaa..366a4b2 100644
--- a/3rdparty/libprocess/include/process/system.hpp
+++ b/3rdparty/libprocess/include/process/system.hpp
@@ -4,9 +4,13 @@
 #include <string>
 
 #include <process/future.hpp>
+#include <process/help.hpp>
 #include <process/http.hpp>
 #include <process/process.hpp>
 
+#include <process/metrics/gauge.hpp>
+#include <process/metrics/metrics.hpp>
+
 #include <stout/os.hpp>
 
 namespace process {
@@ -17,18 +21,132 @@ namespace process {
 class System : public Process<System>
 {
 public:
-  System() : ProcessBase("system") {}
+  System()
+    : ProcessBase("system"),
+      load_1min(
+          self().id + "/load_1min",
+          defer(self(), &System::_load_1min)),
+      load_5min(
+          self().id + "/load_5min",
+          defer(self(), &System::_load_5min)),
+      load_15min(
+          self().id + "/load_15min",
+          defer(self(), &System::_load_15min)),
+      cpus_total(
+          self().id + "/cpus_total",
+          defer(self(), &System::_cpus_total)),
+      mem_total_bytes(
+          self().id + "/mem_total_bytes",
+          defer(self(), &System::_mem_total_bytes)),
+      mem_free_bytes(
+          self().id + "/mem_free_bytes",
+          defer(self(), &System::_mem_free_bytes)) {}
 
   virtual ~System() {}
 
 protected:
   virtual void initialize()
   {
-    route("/stats.json", STATS_HELP, &System::stats);
+    // TODO(dhamon): Check return values
+    metrics::add(load_1min);
+    metrics::add(load_5min);
+    metrics::add(load_15min);
+    metrics::add(cpus_total);
+    metrics::add(mem_total_bytes);
+    metrics::add(mem_free_bytes);
+
+    route("/stats.json", statsHelp(), &System::stats);
+  }
+
+  virtual void finalize()
+  {
+    metrics::remove(load_1min);
+    metrics::remove(load_5min);
+    metrics::remove(load_15min);
+    metrics::remove(cpus_total);
+    metrics::remove(mem_total_bytes);
+    metrics::remove(mem_free_bytes);
   }
 
 private:
-  static const std::string STATS_HELP;
+  static std::string statsHelp()
+  {
+    return HELP(
+      TLDR(
+          "Shows local system metrics."),
+      USAGE(
+          "/system/stats.json"),
+      DESCRIPTION(
+          ">        cpus_total          Total number of available CPUs",
+          ">        load_1min           Average system load for last"
+          " minute in uptime(1) style",
+          ">        load_5min           Average system load for last"
+          " 5 minutes in uptime(1) style",
+          ">        load_15min          Average system load for last"
+          " 15 minutes in uptime(1) style",
+          ">        memory_total_bytes  Total system memory in bytes",
+          ">        memory_free_bytes   Free system memory in bytes"));
+  }
+
+  // Gauge handlers.
+  Future<double> _load_1min()
+  {
+    Try<os::Load> load = os::loadavg();
+    if (load.isSome()) {
+      return load.get().one;
+    }
+    return Failure("loadavg not available.");
+  }
+
+
+  Future<double> _load_5min()
+  {
+    Try<os::Load> load = os::loadavg();
+    if (load.isSome()) {
+      return load.get().five;
+    }
+    return Failure("loadavg not available.");
+  }
+
+
+  Future<double> _load_15min()
+  {
+    Try<os::Load> load = os::loadavg();
+    if (load.isSome()) {
+      return load.get().fifteen;
+    }
+    return Failure("loadavg not available.");
+  }
+
+
+  Future<double> _cpus_total()
+  {
+    Try<long> cpus = os::cpus();
+    if (cpus.isSome()) {
+      return cpus.get();
+    }
+    return Failure("cpus not available.");
+  }
+
+
+  Future<double> _mem_total_bytes()
+  {
+    Try<os::Memory> memory = os::memory();
+    if (memory.isSome()) {
+      return memory.get().total.bytes();
+    }
+    return Failure("memory not available.");
+  }
+
+
+  Future<double> _mem_free_bytes()
+  {
+    Try<os::Memory> memory = os::memory();
+    if (memory.isSome()) {
+      return memory.get().free.bytes();
+    }
+    return Failure("memory not available.");
+  }
 
   // HTTP endpoints.
   Future<http::Response> stats(const http::Request& request)
@@ -54,22 +172,16 @@ private:
 
     return http::OK(object, request.query.get("jsonp"));
   }
-};
 
+  metrics::Gauge load_1min;
+  metrics::Gauge load_5min;
+  metrics::Gauge load_15min;
+
+  metrics::Gauge cpus_total;
 
-const std::string System::STATS_HELP = HELP(
-    TLDR(
-        "Shows local system metrics."),
-    USAGE(
-        "/system/stats.json"),
-    DESCRIPTION(
-        ">        cpus_total           Total number of available CPUs",
-        ">        avg_load_1min        Average system load for last"
-        " minute in uptime(1) style",
-        ">        avg_load_5min        Average system load for last"
-        " 5 minutes in uptime(1) style",
-        ">        avg_load_15min       Average system load for last"
-        " 15 minutes in uptime(1) style"));
+  metrics::Gauge mem_total_bytes;
+  metrics::Gauge mem_free_bytes;
+};
 
 } // namespace process {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/99739705/3rdparty/libprocess/src/tests/metrics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp b/3rdparty/libprocess/src/tests/metrics_tests.cpp
index 2707d44..a822bc7 100644
--- a/3rdparty/libprocess/src/tests/metrics_tests.cpp
+++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
@@ -1,5 +1,8 @@
 #include <gtest/gtest.h>
 
+#include <map>
+#include <string>
+
 #include <stout/duration.hpp>
 #include <stout/gtest.hpp>
 
@@ -26,6 +29,8 @@ using process::metrics::Counter;
 using process::metrics::Gauge;
 using process::metrics::Timer;
 
+using std::map;
+using std::string;
 
 class GaugeProcess : public Process<GaugeProcess>
 {
@@ -146,15 +151,6 @@ TEST(Metrics, Snapshot)
 
   Clock::pause();
 
-  // Advance the clock to avoid rate limit.
-  Clock::advance(Seconds(1));
-
-  // Before adding any metrics, the response should be empty.
-  Future<Response> response = http::get(upid, "snapshot");
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-  AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(JSON::Object()), response);
-
   // Add a gauge and a counter.
   GaugeProcess process;
   PID<GaugeProcess> pid = spawn(&process);
@@ -172,15 +168,23 @@ TEST(Metrics, Snapshot)
   Clock::advance(Seconds(1));
 
   // Get the snapshot.
-  response = http::get(upid, "snapshot");
-
+  Future<Response> response = http::get(upid, "snapshot");
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
 
-  JSON::Object expected;
-  expected.values["test/counter"] = 0.0;
-  expected.values["test/gauge"] = 42.0;
+  // Parse the response.
+  Try<JSON::Object> responseJSON =
+      JSON::parse<JSON::Object>(response.get().body);
+  ASSERT_SOME(responseJSON);
+
+  map<string, JSON::Value> values = responseJSON.get().values;
+
+  EXPECT_EQ(1u, values.count("test/counter"));
+  EXPECT_FLOAT_EQ(0.0, values["test/counter"].as<JSON::Number>().value);
 
-  AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(expected), response);
+  EXPECT_EQ(1u, values.count("test/gauge"));
+  EXPECT_FLOAT_EQ(42.0, values["test/gauge"].as<JSON::Number>().value);
+
+  EXPECT_EQ(0u, values.count("test/gauge_fail"));
 
   // Remove the metrics and ensure they are no longer in the snapshot.
   AWAIT_READY(metrics::remove(gauge));
@@ -190,10 +194,21 @@ TEST(Metrics, Snapshot)
   // Advance the clock to avoid rate limit.
   Clock::advance(Seconds(1));
 
+  // Ensure MetricsProcess has removed the metrics.
+  Clock::settle();
+
   response = http::get(upid, "snapshot");
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-  AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(JSON::Object()), response);
+
+  // Parse the response.
+  responseJSON = JSON::parse<JSON::Object>(response.get().body);
+  ASSERT_SOME(responseJSON);
+
+  values = responseJSON.get().values;
+  EXPECT_EQ(0u, values.count("test/counter"));
+  EXPECT_EQ(0u, values.count("test/gauge"));
+  EXPECT_EQ(0u, values.count("test/gauge_fail"));
 
   terminate(process);
   wait(process);
@@ -219,13 +234,7 @@ TEST(Metrics, SnapshotTimeout)
   // Advance the clock to avoid rate limit.
   Clock::advance(Seconds(1));
 
-  // Before adding any metrics, the response should be empty.
-  Future<Response> response = http::get(upid, "snapshot?timeout=2secs");
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-  AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(JSON::Object()), response);
-
-  // Add a gauge and a counter.
+  // Add gauges and a counter.
   GaugeProcess process;
   PID<GaugeProcess> pid = spawn(&process);
   ASSERT_TRUE(pid);
@@ -244,7 +253,7 @@ TEST(Metrics, SnapshotTimeout)
   Clock::advance(Seconds(1));
 
   // Get the snapshot.
-  response = http::get(upid, "snapshot?timeout=2secs");
+  Future<Response> response = http::get(upid, "snapshot?timeout=2secs");
 
   // Make sure the request is pending before the timeout is exceeded.
   Clock::settle();
@@ -256,11 +265,21 @@ TEST(Metrics, SnapshotTimeout)
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
 
-  JSON::Object expected;
-  expected.values["test/counter"] = 0.0;
-  expected.values["test/gauge"] = 42.0;
+  // Parse the response.
+  Try<JSON::Object> responseJSON =
+      JSON::parse<JSON::Object>(response.get().body);
+  ASSERT_SOME(responseJSON);
 
-  AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(expected), response);
+  map<string, JSON::Value> values = responseJSON.get().values;
+
+  EXPECT_EQ(1u, values.count("test/counter"));
+  EXPECT_FLOAT_EQ(0.0, values["test/counter"].as<JSON::Number>().value);
+
+  EXPECT_EQ(1u, values.count("test/gauge"));
+  EXPECT_FLOAT_EQ(42.0, values["test/gauge"].as<JSON::Number>().value);
+
+  EXPECT_EQ(0u, values.count("test/gauge_fail"));
+  EXPECT_EQ(0u, values.count("test/gauge_timeout"));
 
   // Remove the metrics and ensure they are no longer in the snapshot.
   AWAIT_READY(metrics::remove(gauge));
@@ -271,10 +290,24 @@ TEST(Metrics, SnapshotTimeout)
   // Advance the clock to avoid rate limit.
   Clock::advance(Seconds(1));
 
+  // Ensure MetricsProcess has removed the metrics.
+  Clock::settle();
+
   response = http::get(upid, "snapshot?timeout=2secs");
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-  AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(JSON::Object()), response);
+
+  // Parse the response.
+  responseJSON = JSON::parse<JSON::Object>(response.get().body);
+  ASSERT_SOME(responseJSON);
+
+  values = responseJSON.get().values;
+
+  ASSERT_SOME(responseJSON);
+  EXPECT_EQ(0u, values.count("test/counter"));
+  EXPECT_EQ(0u, values.count("test/gauge"));
+  EXPECT_EQ(0u, values.count("test/gauge_fail"));
+  EXPECT_EQ(0u, values.count("test/gauge_timeout"));
 
   terminate(process);
   wait(process);
@@ -288,15 +321,6 @@ TEST(Metrics, SnapshotStatistics)
 
   Clock::pause();
 
-  // Advance the clock to avoid rate limit.
-  Clock::advance(Seconds(1));
-  Future<Response> response = http::get(upid, "snapshot");
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-  AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(JSON::Object()), response);
-
-  Clock::pause();
-
   Counter counter("test/counter", process::TIME_SERIES_WINDOW);
 
   AWAIT_READY(metrics::add(counter));
@@ -306,26 +330,46 @@ TEST(Metrics, SnapshotStatistics)
     ++counter;
   }
 
-  JSON::Object expected;
+  hashmap<std::string, double> expected;
 
-  expected.values["test/counter"] = 10.0;
+  expected["test/counter"] = 10.0;
 
-  expected.values["test/counter/count"] = 11;
+  expected["test/counter/count"] = 11;
 
-  expected.values["test/counter/min"] = 0.0;
-  expected.values["test/counter/max"] = 10.0;
+  expected["test/counter/min"] = 0.0;
+  expected["test/counter/max"] = 10.0;
 
-  expected.values["test/counter/p50"] = 5.0;
-  expected.values["test/counter/p90"] = 9.0;
-  expected.values["test/counter/p95"] = 9.5;
-  expected.values["test/counter/p99"] = 9.9;
-  expected.values["test/counter/p999"] = 9.99;
-  expected.values["test/counter/p9999"] = 9.999;
+  expected["test/counter/p50"] = 5.0;
+  expected["test/counter/p90"] = 9.0;
+  expected["test/counter/p95"] = 9.5;
+  expected["test/counter/p99"] = 9.9;
+  expected["test/counter/p999"] = 9.99;
+  expected["test/counter/p9999"] = 9.999;
 
-  response = http::get(upid, "snapshot");
+  Future<Response> response = http::get(upid, "snapshot");
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-  AWAIT_EXPECT_RESPONSE_BODY_EQ(stringify(expected), response);
+
+  Try<JSON::Object> responseJSON =
+      JSON::parse<JSON::Object>(response.get().body);
+
+  ASSERT_SOME(responseJSON);
+
+  // Pull the response values into a map.
+  hashmap<std::string, double> responseValues;
+  foreachpair (const std::string& key,
+               const JSON::Value& value,
+               responseJSON.get().values) {
+    if (value.is<JSON::Number>()) {
+      responseValues[key] = value.as<JSON::Number>().value;
+    }
+  }
+
+  // Ensure the expected keys are in the response and that the values match
+  // expectations.
+  foreachkey (const std::string& key, expected) {
+    EXPECT_FLOAT_EQ(expected[key], responseValues[key]);
+  }
 
   AWAIT_READY(metrics::remove(counter));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/99739705/3rdparty/libprocess/src/tests/system_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/system_tests.cpp b/3rdparty/libprocess/src/tests/system_tests.cpp
new file mode 100644
index 0000000..e07314f
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/system_tests.cpp
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <process/future.hpp>
+#include <process/gtest.hpp>
+#include <process/http.hpp>
+#include <process/system.hpp>
+
+#include <process/metrics/metrics.hpp>
+
+#include <stout/gtest.hpp>
+
+using namespace process;
+
+using process::metrics::internal::MetricsProcess;
+
+TEST(System, Metrics)
+{
+  Future<http::Response> response =
+    http::get(MetricsProcess::instance()->self(), "snapshot");
+
+  AWAIT_READY(response);
+
+  EXPECT_SOME_EQ(
+      "application/json",
+      response.get().headers.get("Content-Type"));
+
+  Try<JSON::Object> parse = JSON::parse<JSON::Object>(response.get().body);
+  ASSERT_SOME(parse);
+
+  JSON::Object stats = parse.get();
+
+  EXPECT_EQ(1u, stats.values.count("system/load_1min"));
+  EXPECT_EQ(1u, stats.values.count("system/load_5min"));
+  EXPECT_EQ(1u, stats.values.count("system/load_15min"));
+  EXPECT_EQ(1u, stats.values.count("system/cpus_total"));
+  EXPECT_EQ(1u, stats.values.count("system/mem_total_bytes"));
+  EXPECT_EQ(1u, stats.values.count("system/mem_free_bytes"));
+}