You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/04/11 00:00:18 UTC

git commit: Added a metrics API.

Repository: mesos
Updated Branches:
  refs/heads/master df0ead2dd -> 847db5288


Added a metrics API.

Review: https://reviews.apache.org/r/18718


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/847db528
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/847db528
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/847db528

Branch: refs/heads/master
Commit: 847db5288c86d9b452028c3e45098d7c1dbf2e0d
Parents: df0ead2
Author: Dominic Hamon <dh...@twopensource.com>
Authored: Thu Apr 10 14:04:53 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Thu Apr 10 14:57:09 2014 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |   6 +
 .../include/process/metrics/counter.hpp         |  65 +++++++++++
 .../include/process/metrics/gauge.hpp           |  44 ++++++++
 .../include/process/metrics/metric.hpp          |  40 +++++++
 .../include/process/metrics/metrics.hpp         |  77 +++++++++++++
 3rdparty/libprocess/src/metrics/metrics.cpp     | 113 +++++++++++++++++++
 3rdparty/libprocess/src/tests/metrics_tests.cpp |  81 +++++++++++++
 7 files changed, 426 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index c785c4d..d707ad7 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -34,6 +34,7 @@ libprocess_la_SOURCES =		\
   src/gate.hpp			\
   src/http.cpp			\
   src/latch.cpp			\
+  src/metrics/metrics.cpp	\
   src/pid.cpp			\
   src/process.cpp		\
   src/reap.cpp			\
@@ -87,6 +88,10 @@ libprocess_la_SOURCES +=					\
   $(top_srcdir)/include/process/limiter.hpp			\
   $(top_srcdir)/include/process/logging.hpp			\
   $(top_srcdir)/include/process/message.hpp			\
+  $(top_srcdir)/include/process/metrics/counter.hpp		\
+  $(top_srcdir)/include/process/metrics/gauge.hpp		\
+  $(top_srcdir)/include/process/metrics/metric.hpp		\
+  $(top_srcdir)/include/process/metrics/metrics.hpp		\
   $(top_srcdir)/include/process/mime.hpp			\
   $(top_srcdir)/include/process/mutex.hpp			\
   $(top_srcdir)/include/process/once.hpp			\
@@ -126,6 +131,7 @@ tests_SOURCES =							\
   src/tests/io_tests.cpp					\
   src/tests/main.cpp						\
   src/tests/mutex_tests.cpp					\
+  src/tests/metrics_tests.cpp					\
   src/tests/owned_tests.cpp					\
   src/tests/process_tests.cpp					\
   src/tests/reap_tests.cpp					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/include/process/metrics/counter.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/counter.hpp b/3rdparty/libprocess/include/process/metrics/counter.hpp
new file mode 100644
index 0000000..f4774ad
--- /dev/null
+++ b/3rdparty/libprocess/include/process/metrics/counter.hpp
@@ -0,0 +1,65 @@
+#ifndef __PROCESS_METRICS_COUNTER_HPP__
+#define __PROCESS_METRICS_COUNTER_HPP__
+
+#include <string>
+
+#include <process/metrics/metric.hpp>
+
+namespace process {
+namespace metrics {
+
+// A Metric that represents an integer value that can be incremented and
+// decremented.
+class Counter : public Metric
+{
+public:
+  explicit Counter(const std::string& name)
+    : Metric(name),
+      data(new Data()) {}
+
+  virtual ~Counter() {}
+
+  virtual Future<double> value() const
+  {
+    return static_cast<double>(data->v);
+  }
+
+  void reset()
+  {
+    __sync_fetch_and_and(&data->v, 0);
+  }
+
+  Counter& operator ++ ()
+  {
+    return *this += 1;
+  }
+
+  Counter operator ++ (int)
+  {
+    Counter c(*this);
+    ++(*this);
+    return c;
+  }
+
+  Counter& operator += (int64_t v)
+  {
+    __sync_fetch_and_add(&data->v, v);
+    return *this;
+  }
+
+private:
+  struct Data
+  {
+    explicit Data() : v(0) {}
+
+    // TODO(dhamon): Update to std::atomic<int64_t> when C++11 lands.
+    volatile int64_t v;
+  };
+
+  boost::shared_ptr<Data> data;
+};
+
+}  // namespace metrics {
+}  // namespace process {
+
+#endif  // __PROCESS_METRICS_COUNTER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/include/process/metrics/gauge.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/gauge.hpp b/3rdparty/libprocess/include/process/metrics/gauge.hpp
new file mode 100644
index 0000000..4f5c108
--- /dev/null
+++ b/3rdparty/libprocess/include/process/metrics/gauge.hpp
@@ -0,0 +1,44 @@
+#ifndef __PROCESS_METRICS_GAUGE_HPP__
+#define __PROCESS_METRICS_GAUGE_HPP__
+
+#include <string>
+
+#include <process/defer.hpp>
+
+#include <process/metrics/metric.hpp>
+
+#include <stout/lambda.hpp>
+
+namespace process {
+namespace metrics {
+
+// A Metric that represents an instantaneous value evaluated when
+// 'value' is called.
+class Gauge : public Metric
+{
+public:
+  Gauge(const std::string& name,
+        const Deferred<Future<double> (void)>& f)
+    : Metric(name),
+      data(new Data(f)) {}
+
+  virtual ~Gauge() {}
+
+  virtual Future<double> value() const { return data->f(); }
+
+private:
+  struct Data
+  {
+    explicit Data(const Deferred<Future<double> (void)>& _f)
+      : f(_f) {}
+
+    const Deferred<Future<double> (void)> f;
+  };
+
+  boost::shared_ptr<Data> data;
+};
+
+}  // namespace metrics {
+}  // namespace process {
+
+#endif  // __PROCESS_METRICS_GAUGE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/include/process/metrics/metric.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/metric.hpp b/3rdparty/libprocess/include/process/metrics/metric.hpp
new file mode 100644
index 0000000..ea64f69
--- /dev/null
+++ b/3rdparty/libprocess/include/process/metrics/metric.hpp
@@ -0,0 +1,40 @@
+#ifndef __PROCESS_METRICS_METRIC_HPP__
+#define __PROCESS_METRICS_METRIC_HPP__
+
+#include <string>
+
+#include <boost/smart_ptr/shared_ptr.hpp>
+
+#include <process/future.hpp>
+
+namespace process {
+namespace metrics {
+
+// The base class for Metrics such as Counter and Gauge.
+class Metric {
+public:
+  virtual ~Metric() {}
+
+  virtual Future<double> value() const = 0;
+
+  const std::string& name() const { return data->name; }
+
+protected:
+  // Only derived classes can construct.
+  explicit Metric(const std::string& name)
+    : data(new Data(name)) {}
+
+private:
+  struct Data {
+    explicit Data(const std::string& _name) : name(_name) {}
+
+    const std::string name;
+  };
+
+  boost::shared_ptr<Data> data;
+};
+
+}  // namespace metrics {
+}  // namespace process {
+
+#endif  // __PROCESS_METRICS_METRIC_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/include/process/metrics/metrics.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/metrics.hpp b/3rdparty/libprocess/include/process/metrics/metrics.hpp
new file mode 100644
index 0000000..c20bb63
--- /dev/null
+++ b/3rdparty/libprocess/include/process/metrics/metrics.hpp
@@ -0,0 +1,77 @@
+#ifndef __PROCESS_METRICS_METRICS_HPP__
+#define __PROCESS_METRICS_METRICS_HPP__
+
+#include <string>
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <process/metrics/metric.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/nothing.hpp>
+
+namespace process {
+namespace metrics {
+
+namespace internal {
+
+class MetricsProcess : public Process<MetricsProcess>
+{
+public:
+  static MetricsProcess* instance();
+
+  Future<Nothing> add(Owned<Metric> metric);
+
+  Future<Nothing> remove(const std::string& name);
+
+protected:
+  virtual void initialize();
+
+private:
+  static std::string help();
+
+  MetricsProcess() : ProcessBase("metrics") {}
+
+  // Non-copyable, non-assignable.
+  MetricsProcess(const MetricsProcess&);
+  MetricsProcess& operator = (const MetricsProcess&);
+
+  Future<http::Response> snapshot(const http::Request& request);
+  static Future<http::Response> _snapshot(
+      const http::Request& request,
+      const hashmap<std::string, Future<double> >& metrics);
+
+  // The Owned<Metric> is an explicit copy of the Metric passed to 'add'.
+  hashmap<std::string, Owned<Metric> > metrics;
+};
+
+}  // namespace internal {
+
+
+template <typename T>
+Future<Nothing> add(const T& metric)
+{
+  // There is an explicit copy in this call to ensure we end up owning
+  // the last copy of a Metric when we remove it.
+  return dispatch(
+      internal::MetricsProcess::instance(),
+      &internal::MetricsProcess::add,
+      Owned<Metric>(new T(metric)));
+}
+
+
+inline Future<Nothing> remove(const Metric& metric)
+{
+  return dispatch(
+      internal::MetricsProcess::instance(),
+      &internal::MetricsProcess::remove,
+      metric.name());
+}
+
+}  // namespace metrics {
+}  // namespace process {
+
+#endif  // __PROCESS_METRICS_METRICS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/src/metrics/metrics.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/metrics/metrics.cpp b/3rdparty/libprocess/src/metrics/metrics.cpp
new file mode 100644
index 0000000..391295a
--- /dev/null
+++ b/3rdparty/libprocess/src/metrics/metrics.cpp
@@ -0,0 +1,113 @@
+#include <glog/logging.h>
+
+#include <list>
+#include <string>
+
+#include <process/collect.hpp>
+#include <process/dispatch.hpp>
+#include <process/help.hpp>
+#include <process/once.hpp>
+#include <process/process.hpp>
+
+#include <process/metrics/metrics.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
+
+using std::list;
+using std::string;
+
+namespace process {
+namespace metrics {
+namespace internal {
+
+MetricsProcess* MetricsProcess::instance()
+{
+  static MetricsProcess* singleton = NULL;
+  static Once* initialized = new Once();
+
+  if (!initialized->once()) {
+    singleton = new MetricsProcess();
+    spawn(singleton);
+    initialized->done();
+  }
+
+  return singleton;
+}
+
+
+void MetricsProcess::initialize()
+{
+  route("/snapshot", help(), &MetricsProcess::snapshot);
+}
+
+
+string MetricsProcess::help()
+{
+  return HELP(
+      TLDR("Provides a snapshot of the current metrics."),
+      USAGE("/metrics/snapshot"),
+      DESCRIPTION(
+          "This endpoint provides information regarding the current metrics ",
+          "tracked by the system.",
+          "",
+          "The key is the metric name, and the value is a double-type."));
+}
+
+
+Future<Nothing> MetricsProcess::add(Owned<Metric> metric)
+{
+  if (metrics.contains(metric->name())) {
+    return Failure("Metric '" + metric->name() + "' was already added.");
+  }
+
+  metrics[metric->name()] = metric;
+  return Nothing();
+}
+
+
+Future<Nothing> MetricsProcess::remove(const std::string& name)
+{
+  if (!metrics.contains(name)) {
+    return Failure("Metric '" + name + "' not found.");
+  }
+
+  metrics.erase(name);
+
+  return Nothing();
+}
+
+
+// TODO(dhamon): Allow querying by context and context/name.
+Future<http::Response> MetricsProcess::snapshot(const http::Request& request)
+{
+  hashmap<string, Future<double> > futures;
+
+  foreachkey (const string& metric, metrics) {
+    CHECK_NOTNULL(metrics[metric].get());
+    futures[metric] = metrics[metric]->value();
+  }
+
+  return await(futures.values())
+    .then(lambda::bind(_snapshot, request, futures));
+}
+
+
+Future<http::Response> MetricsProcess::_snapshot(
+    const http::Request& request,
+    const hashmap<string, Future<double> >& metrics)
+{
+  JSON::Object object;
+
+  foreachpair (const string& key, const Future<double>& value, metrics) {
+    if (value.isReady()) {
+      object.values[key] = value.get();
+    }
+  }
+
+  return http::OK(object, request.query.get("jsonp"));
+}
+
+}  // namespace internal {
+}  // namespace metrics {
+}  // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/src/tests/metrics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp b/3rdparty/libprocess/src/tests/metrics_tests.cpp
new file mode 100644
index 0000000..0cc9f4b
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
@@ -0,0 +1,81 @@
+#include <gtest/gtest.h>
+
+#include <stout/gtest.hpp>
+
+#include <process/future.hpp>
+#include <process/gtest.hpp>
+#include <process/process.hpp>
+
+#include <process/metrics/counter.hpp>
+#include <process/metrics/gauge.hpp>
+#include <process/metrics/metrics.hpp>
+
+
+using process::Deferred;
+using process::Failure;
+using process::Future;
+using process::PID;
+using process::Process;
+
+using process::metrics::add;
+using process::metrics::remove;
+using process::metrics::Counter;
+using process::metrics::Gauge;
+
+
+class GaugeProcess : public Process<GaugeProcess>
+{
+public:
+  double get() { return 42.0; }
+  Future<double> fail() { return Failure("failure"); }
+};
+
+// TODO(dhamon): Add test for JSON equality.
+// TODO(dhamon): Add tests for JSON access with and without removal.
+
+TEST(MetricsTest, Counter)
+{
+  Counter c("test/counter");
+  AWAIT_READY(add(c));
+
+  EXPECT_FLOAT_EQ(0.0, c.value().get());
+  ++c;
+  EXPECT_FLOAT_EQ(1.0, c.value().get());
+  c++;
+  EXPECT_FLOAT_EQ(2.0, c.value().get());
+
+  c.reset();
+  EXPECT_FLOAT_EQ(0.0, c.value().get());
+
+  c += 42;
+  EXPECT_FLOAT_EQ(42.0, c.value().get());
+
+  AWAIT_READY(remove(c));
+}
+
+
+TEST(MetricsTest, Gauge)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  GaugeProcess process;
+  PID<GaugeProcess> pid = spawn(&process);
+  ASSERT_TRUE(pid);
+
+  Gauge g("test/gauge", defer(pid, &GaugeProcess::get));
+  Gauge g2("test/failedgauge", defer(pid, &GaugeProcess::fail));
+
+  AWAIT_READY(add(g));
+  AWAIT_READY(add(g2));
+
+  AWAIT_READY(g.value());
+  EXPECT_EQ(42.0, g.value().get());
+
+  AWAIT_FAILED(g2.value());
+
+  AWAIT_READY(remove(g2));
+  AWAIT_READY(remove(g));
+
+  terminate(process);
+  wait(process);
+}