You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/04/11 00:00:18 UTC
git commit: Added a metrics API.
Repository: mesos
Updated Branches:
refs/heads/master df0ead2dd -> 847db5288
Added a metrics API.
Review: https://reviews.apache.org/r/18718
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/847db528
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/847db528
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/847db528
Branch: refs/heads/master
Commit: 847db5288c86d9b452028c3e45098d7c1dbf2e0d
Parents: df0ead2
Author: Dominic Hamon <dh...@twopensource.com>
Authored: Thu Apr 10 14:04:53 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Thu Apr 10 14:57:09 2014 -0700
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 6 +
.../include/process/metrics/counter.hpp | 65 +++++++++++
.../include/process/metrics/gauge.hpp | 44 ++++++++
.../include/process/metrics/metric.hpp | 40 +++++++
.../include/process/metrics/metrics.hpp | 77 +++++++++++++
3rdparty/libprocess/src/metrics/metrics.cpp | 113 +++++++++++++++++++
3rdparty/libprocess/src/tests/metrics_tests.cpp | 81 +++++++++++++
7 files changed, 426 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index c785c4d..d707ad7 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -34,6 +34,7 @@ libprocess_la_SOURCES = \
src/gate.hpp \
src/http.cpp \
src/latch.cpp \
+ src/metrics/metrics.cpp \
src/pid.cpp \
src/process.cpp \
src/reap.cpp \
@@ -87,6 +88,10 @@ libprocess_la_SOURCES += \
$(top_srcdir)/include/process/limiter.hpp \
$(top_srcdir)/include/process/logging.hpp \
$(top_srcdir)/include/process/message.hpp \
+ $(top_srcdir)/include/process/metrics/counter.hpp \
+ $(top_srcdir)/include/process/metrics/gauge.hpp \
+ $(top_srcdir)/include/process/metrics/metric.hpp \
+ $(top_srcdir)/include/process/metrics/metrics.hpp \
$(top_srcdir)/include/process/mime.hpp \
$(top_srcdir)/include/process/mutex.hpp \
$(top_srcdir)/include/process/once.hpp \
@@ -126,6 +131,7 @@ tests_SOURCES = \
src/tests/io_tests.cpp \
src/tests/main.cpp \
src/tests/mutex_tests.cpp \
+ src/tests/metrics_tests.cpp \
src/tests/owned_tests.cpp \
src/tests/process_tests.cpp \
src/tests/reap_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/include/process/metrics/counter.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/counter.hpp b/3rdparty/libprocess/include/process/metrics/counter.hpp
new file mode 100644
index 0000000..f4774ad
--- /dev/null
+++ b/3rdparty/libprocess/include/process/metrics/counter.hpp
@@ -0,0 +1,65 @@
+#ifndef __PROCESS_METRICS_COUNTER_HPP__
+#define __PROCESS_METRICS_COUNTER_HPP__
+
+#include <string>
+
+#include <process/metrics/metric.hpp>
+
+namespace process {
+namespace metrics {
+
+// A Metric that represents an integer value that can be incremented and
+// decremented.
+class Counter : public Metric
+{
+public:
+ explicit Counter(const std::string& name)
+ : Metric(name),
+ data(new Data()) {}
+
+ virtual ~Counter() {}
+
+ virtual Future<double> value() const
+ {
+ return static_cast<double>(data->v);
+ }
+
+ void reset()
+ {
+ __sync_fetch_and_and(&data->v, 0);
+ }
+
+ Counter& operator ++ ()
+ {
+ return *this += 1;
+ }
+
+ Counter operator ++ (int)
+ {
+ Counter c(*this);
+ ++(*this);
+ return c;
+ }
+
+ Counter& operator += (int64_t v)
+ {
+ __sync_fetch_and_add(&data->v, v);
+ return *this;
+ }
+
+private:
+ struct Data
+ {
+ explicit Data() : v(0) {}
+
+ // TODO(dhamon): Update to std::atomic<int64_t> when C++11 lands.
+ volatile int64_t v;
+ };
+
+ boost::shared_ptr<Data> data;
+};
+
+} // namespace metrics {
+} // namespace process {
+
+#endif // __PROCESS_METRICS_COUNTER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/include/process/metrics/gauge.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/gauge.hpp b/3rdparty/libprocess/include/process/metrics/gauge.hpp
new file mode 100644
index 0000000..4f5c108
--- /dev/null
+++ b/3rdparty/libprocess/include/process/metrics/gauge.hpp
@@ -0,0 +1,44 @@
+#ifndef __PROCESS_METRICS_GAUGE_HPP__
+#define __PROCESS_METRICS_GAUGE_HPP__
+
+#include <string>
+
+#include <process/defer.hpp>
+
+#include <process/metrics/metric.hpp>
+
+#include <stout/lambda.hpp>
+
+namespace process {
+namespace metrics {
+
+// A Metric that represents an instantaneous value evaluated when
+// 'value' is called.
+class Gauge : public Metric
+{
+public:
+ Gauge(const std::string& name,
+ const Deferred<Future<double> (void)>& f)
+ : Metric(name),
+ data(new Data(f)) {}
+
+ virtual ~Gauge() {}
+
+ virtual Future<double> value() const { return data->f(); }
+
+private:
+ struct Data
+ {
+ explicit Data(const Deferred<Future<double> (void)>& _f)
+ : f(_f) {}
+
+ const Deferred<Future<double> (void)> f;
+ };
+
+ boost::shared_ptr<Data> data;
+};
+
+} // namespace metrics {
+} // namespace process {
+
+#endif // __PROCESS_METRICS_GAUGE_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/include/process/metrics/metric.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/metric.hpp b/3rdparty/libprocess/include/process/metrics/metric.hpp
new file mode 100644
index 0000000..ea64f69
--- /dev/null
+++ b/3rdparty/libprocess/include/process/metrics/metric.hpp
@@ -0,0 +1,40 @@
+#ifndef __PROCESS_METRICS_METRIC_HPP__
+#define __PROCESS_METRICS_METRIC_HPP__
+
+#include <string>
+
+#include <boost/smart_ptr/shared_ptr.hpp>
+
+#include <process/future.hpp>
+
+namespace process {
+namespace metrics {
+
+// The base class for Metrics such as Counter and Gauge.
+class Metric {
+public:
+ virtual ~Metric() {}
+
+ virtual Future<double> value() const = 0;
+
+ const std::string& name() const { return data->name; }
+
+protected:
+ // Only derived classes can construct.
+ explicit Metric(const std::string& name)
+ : data(new Data(name)) {}
+
+private:
+ struct Data {
+ explicit Data(const std::string& _name) : name(_name) {}
+
+ const std::string name;
+ };
+
+ boost::shared_ptr<Data> data;
+};
+
+} // namespace metrics {
+} // namespace process {
+
+#endif // __PROCESS_METRICS_METRIC_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/include/process/metrics/metrics.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/metrics/metrics.hpp b/3rdparty/libprocess/include/process/metrics/metrics.hpp
new file mode 100644
index 0000000..c20bb63
--- /dev/null
+++ b/3rdparty/libprocess/include/process/metrics/metrics.hpp
@@ -0,0 +1,77 @@
+#ifndef __PROCESS_METRICS_METRICS_HPP__
+#define __PROCESS_METRICS_METRICS_HPP__
+
+#include <string>
+
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <process/metrics/metric.hpp>
+
+#include <stout/hashmap.hpp>
+#include <stout/nothing.hpp>
+
+namespace process {
+namespace metrics {
+
+namespace internal {
+
+class MetricsProcess : public Process<MetricsProcess>
+{
+public:
+ static MetricsProcess* instance();
+
+ Future<Nothing> add(Owned<Metric> metric);
+
+ Future<Nothing> remove(const std::string& name);
+
+protected:
+ virtual void initialize();
+
+private:
+ static std::string help();
+
+ MetricsProcess() : ProcessBase("metrics") {}
+
+ // Non-copyable, non-assignable.
+ MetricsProcess(const MetricsProcess&);
+ MetricsProcess& operator = (const MetricsProcess&);
+
+ Future<http::Response> snapshot(const http::Request& request);
+ static Future<http::Response> _snapshot(
+ const http::Request& request,
+ const hashmap<std::string, Future<double> >& metrics);
+
+ // The Owned<Metric> is an explicit copy of the Metric passed to 'add'.
+ hashmap<std::string, Owned<Metric> > metrics;
+};
+
+} // namespace internal {
+
+
+template <typename T>
+Future<Nothing> add(const T& metric)
+{
+ // There is an explicit copy in this call to ensure we end up owning
+ // the last copy of a Metric when we remove it.
+ return dispatch(
+ internal::MetricsProcess::instance(),
+ &internal::MetricsProcess::add,
+ Owned<Metric>(new T(metric)));
+}
+
+
+inline Future<Nothing> remove(const Metric& metric)
+{
+ return dispatch(
+ internal::MetricsProcess::instance(),
+ &internal::MetricsProcess::remove,
+ metric.name());
+}
+
+} // namespace metrics {
+} // namespace process {
+
+#endif // __PROCESS_METRICS_METRICS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/src/metrics/metrics.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/metrics/metrics.cpp b/3rdparty/libprocess/src/metrics/metrics.cpp
new file mode 100644
index 0000000..391295a
--- /dev/null
+++ b/3rdparty/libprocess/src/metrics/metrics.cpp
@@ -0,0 +1,113 @@
+#include <glog/logging.h>
+
+#include <list>
+#include <string>
+
+#include <process/collect.hpp>
+#include <process/dispatch.hpp>
+#include <process/help.hpp>
+#include <process/once.hpp>
+#include <process/process.hpp>
+
+#include <process/metrics/metrics.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
+
+using std::list;
+using std::string;
+
+namespace process {
+namespace metrics {
+namespace internal {
+
+MetricsProcess* MetricsProcess::instance()
+{
+ static MetricsProcess* singleton = NULL;
+ static Once* initialized = new Once();
+
+ if (!initialized->once()) {
+ singleton = new MetricsProcess();
+ spawn(singleton);
+ initialized->done();
+ }
+
+ return singleton;
+}
+
+
+void MetricsProcess::initialize()
+{
+ route("/snapshot", help(), &MetricsProcess::snapshot);
+}
+
+
+string MetricsProcess::help()
+{
+ return HELP(
+ TLDR("Provides a snapshot of the current metrics."),
+ USAGE("/metrics/snapshot"),
+ DESCRIPTION(
+ "This endpoint provides information regarding the current metrics ",
+ "tracked by the system.",
+ "",
+ "The key is the metric name, and the value is a double-type."));
+}
+
+
+Future<Nothing> MetricsProcess::add(Owned<Metric> metric)
+{
+ if (metrics.contains(metric->name())) {
+ return Failure("Metric '" + metric->name() + "' was already added.");
+ }
+
+ metrics[metric->name()] = metric;
+ return Nothing();
+}
+
+
+Future<Nothing> MetricsProcess::remove(const std::string& name)
+{
+ if (!metrics.contains(name)) {
+ return Failure("Metric '" + name + "' not found.");
+ }
+
+ metrics.erase(name);
+
+ return Nothing();
+}
+
+
+// TODO(dhamon): Allow querying by context and context/name.
+Future<http::Response> MetricsProcess::snapshot(const http::Request& request)
+{
+ hashmap<string, Future<double> > futures;
+
+ foreachkey (const string& metric, metrics) {
+ CHECK_NOTNULL(metrics[metric].get());
+ futures[metric] = metrics[metric]->value();
+ }
+
+ return await(futures.values())
+ .then(lambda::bind(_snapshot, request, futures));
+}
+
+
+Future<http::Response> MetricsProcess::_snapshot(
+ const http::Request& request,
+ const hashmap<string, Future<double> >& metrics)
+{
+ JSON::Object object;
+
+ foreachpair (const string& key, const Future<double>& value, metrics) {
+ if (value.isReady()) {
+ object.values[key] = value.get();
+ }
+ }
+
+ return http::OK(object, request.query.get("jsonp"));
+}
+
+} // namespace internal {
+} // namespace metrics {
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/847db528/3rdparty/libprocess/src/tests/metrics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/metrics_tests.cpp b/3rdparty/libprocess/src/tests/metrics_tests.cpp
new file mode 100644
index 0000000..0cc9f4b
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/metrics_tests.cpp
@@ -0,0 +1,81 @@
+#include <gtest/gtest.h>
+
+#include <stout/gtest.hpp>
+
+#include <process/future.hpp>
+#include <process/gtest.hpp>
+#include <process/process.hpp>
+
+#include <process/metrics/counter.hpp>
+#include <process/metrics/gauge.hpp>
+#include <process/metrics/metrics.hpp>
+
+
+using process::Deferred;
+using process::Failure;
+using process::Future;
+using process::PID;
+using process::Process;
+
+using process::metrics::add;
+using process::metrics::remove;
+using process::metrics::Counter;
+using process::metrics::Gauge;
+
+
+class GaugeProcess : public Process<GaugeProcess>
+{
+public:
+ double get() { return 42.0; }
+ Future<double> fail() { return Failure("failure"); }
+};
+
+// TODO(dhamon): Add test for JSON equality.
+// TODO(dhamon): Add tests for JSON access with and without removal.
+
+TEST(MetricsTest, Counter)
+{
+ Counter c("test/counter");
+ AWAIT_READY(add(c));
+
+ EXPECT_FLOAT_EQ(0.0, c.value().get());
+ ++c;
+ EXPECT_FLOAT_EQ(1.0, c.value().get());
+ c++;
+ EXPECT_FLOAT_EQ(2.0, c.value().get());
+
+ c.reset();
+ EXPECT_FLOAT_EQ(0.0, c.value().get());
+
+ c += 42;
+ EXPECT_FLOAT_EQ(42.0, c.value().get());
+
+ AWAIT_READY(remove(c));
+}
+
+
+TEST(MetricsTest, Gauge)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ GaugeProcess process;
+ PID<GaugeProcess> pid = spawn(&process);
+ ASSERT_TRUE(pid);
+
+ Gauge g("test/gauge", defer(pid, &GaugeProcess::get));
+ Gauge g2("test/failedgauge", defer(pid, &GaugeProcess::fail));
+
+ AWAIT_READY(add(g));
+ AWAIT_READY(add(g2));
+
+ AWAIT_READY(g.value());
+ EXPECT_EQ(42.0, g.value().get());
+
+ AWAIT_FAILED(g2.value());
+
+ AWAIT_READY(remove(g2));
+ AWAIT_READY(remove(g));
+
+ terminate(process);
+ wait(process);
+}