You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/04/22 05:17:53 UTC
git commit: Added Statistics for TimeSeries data.
Repository: mesos
Updated Branches:
refs/heads/master 8b678c482 -> 12e1a9675
Added Statistics for TimeSeries data.
Review: https://reviews.apache.org/r/20047
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/12e1a967
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/12e1a967
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/12e1a967
Branch: refs/heads/master
Commit: 12e1a9675a630a01988cd30679c7409e123292ae
Parents: 8b678c4
Author: Dominic Hamon <dh...@twopensource.com>
Authored: Mon Apr 21 17:57:14 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Mon Apr 21 20:14:50 2014 -0700
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 4 +-
.../libprocess/include/process/statistics.hpp | 140 ++++----
3rdparty/libprocess/src/process.cpp | 32 +-
3rdparty/libprocess/src/statistics.cpp | 331 -------------------
.../libprocess/src/tests/statistics_tests.cpp | 95 ++----
3rdparty/libprocess/src/timeseries.cpp | 14 +
6 files changed, 136 insertions(+), 480 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/12e1a967/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index d707ad7..0a8a31b 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -38,9 +38,9 @@ libprocess_la_SOURCES = \
src/pid.cpp \
src/process.cpp \
src/reap.cpp \
- src/statistics.cpp \
src/subprocess.cpp \
- src/synchronized.hpp
+ src/synchronized.hpp \
+ src/timeseries.cpp
libprocess_la_CPPFLAGS = \
-I$(srcdir)/include \
http://git-wip-us.apache.org/repos/asf/mesos/blob/12e1a967/3rdparty/libprocess/include/process/statistics.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/statistics.hpp b/3rdparty/libprocess/include/process/statistics.hpp
index a4f1db3..d046bff 100644
--- a/3rdparty/libprocess/include/process/statistics.hpp
+++ b/3rdparty/libprocess/include/process/statistics.hpp
@@ -1,76 +1,94 @@
#ifndef __PROCESS_STATISTICS_HPP__
#define __PROCESS_STATISTICS_HPP__
-#include <process/clock.hpp>
-#include <process/future.hpp>
-#include <process/owned.hpp>
-#include <process/time.hpp>
+#include <glog/logging.h>
+
+#include <algorithm>
+#include <vector>
+
#include <process/timeseries.hpp>
-#include <stout/duration.hpp>
-#include <stout/none.hpp>
-#include <stout/nothing.hpp>
+#include <stout/foreach.hpp>
#include <stout/option.hpp>
namespace process {
-// Forward declarations.
-class Statistics;
-class StatisticsProcess;
-
-// Libprocess statistics handle.
-// To be used from anywhere to manage statistics.
-//
-// Ex: process::statistics->increment("http", "num_requests");
-// process::statistics->set("http", "response_size", response.size());
-//
-// Statistics are exposed via JSON for external visibility.
-extern Statistics* statistics;
-
-
-// Default statistic configuration variables.
-// TODO(bmahler): It appears there may be a bug with gcc-4.1.2 in
-// which these duration constants were not being initialized when
-// having static linkage. This issue did not manifest in newer gcc's.
-// Specifically, 4.2.1 was ok. So we've moved these to have external
-// linkage but perhaps in the future we can revert this.
-extern const Duration STATISTICS_TRUNCATION_INTERVAL;
-
-
-// Provides a collection of in-memory fixed capacity time series
-// of statistics over some window. Values are truncated when they
-// fall outside the window. "Sparsification" will occur when the
-// capacity of a time series is exceeded inside the window.
-class Statistics
+// Represents statistics for a TimeSeries of data.
+template <typename T>
+struct Statistics
{
-public:
- Statistics(const Duration& window = TIME_SERIES_WINDOW,
- size_t capacity = TIME_SERIES_CAPACITY);
- ~Statistics();
-
- // Returns the time series of a statistic.
- process::Future<TimeSeries<double> > timeseries(
- const std::string& context,
- const std::string& name);
-
- // Sets the current value of a statistic at the current clock time
- // or at a specified time.
- void set(
- const std::string& context,
- const std::string& name,
- double value,
- const Time& time = Clock::now());
-
- // Increments the current value of a statistic. If no statistic was
- // previously present, an initial value of 0.0 is used.
- void increment(const std::string& context, const std::string& name);
-
- // Decrements the current value of a statistic. If no statistic was
- // previously present, an initial value of 0.0 is used.
- void decrement(const std::string& context, const std::string& name);
+ // Returns Statistics for the given TimeSeries, or None() if the
+ // TimeSeries is empty.
+ // TODO(dhamon): Consider adding a histogram abstraction for better
+ // performance.
+ static Option<Statistics<T> > from(const TimeSeries<T>& timeseries)
+ {
+ std::vector<typename TimeSeries<T>::Value> values_ = timeseries.get();
+
+ if (values_.empty()) {
+ return None();
+ }
+
+ std::vector<T> values;
+ values.reserve(values_.size());
+
+ foreach (const typename TimeSeries<T>::Value& value, values_) {
+ values.push_back(value.data);
+ }
+
+ std::sort(values.begin(), values.end());
+
+ Statistics statistics;
+
+ statistics.min = values.front();
+ statistics.max = values.back();
+
+ statistics.p50 = percentile(values, 0.5);
+ statistics.p90 = percentile(values, 0.90);
+ statistics.p95 = percentile(values, 0.95);
+ statistics.p99 = percentile(values, 0.99);
+ statistics.p999 = percentile(values, 0.999);
+ statistics.p9999 = percentile(values, 0.9999);
+
+ return statistics;
+ }
+
+ T min;
+ T max;
+
+ // TODO(dhamon): Consider making the percentiles we store dynamic.
+ T p50;
+ T p90;
+ T p95;
+ T p99;
+ T p999;
+ T p9999;
private:
- StatisticsProcess* process;
+ // Returns the requested percentile from the sorted values.
+ // TODO(dhamon): Use a 'Percentage' abstraction.
+ static T percentile(const std::vector<T>& values, double percentile)
+ {
+ CHECK(!values.empty());
+
+ if (percentile <= 0.0) {
+ return values.front();
+ }
+
+ if (percentile >= 1.0) {
+ return values.back();
+ }
+
+ // Use linear interpolation.
+ const double position = percentile * (values.size() - 1);
+ const size_t index = floor(position);
+ const double delta = position - index;
+
+ CHECK_GE(index, 0u);
+ CHECK_LT(index, values.size() - 1);
+
+ return values[index] + delta * (values[index + 1] - values[index]);
+ }
};
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/12e1a967/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 9654c04..2439526 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -594,7 +594,8 @@ ThreadLocal<ProcessBase>* _process_ = new ThreadLocal<ProcessBase>();
// Per thread executor pointer.
ThreadLocal<Executor>* _executor_ = new ThreadLocal<Executor>();
-const Duration LIBPROCESS_STATISTICS_WINDOW = Days(1);
+// TODO(dhamon): Reintroduce this when it is plumbed through to Statistics.
+// const Duration LIBPROCESS_STATISTICS_WINDOW = Days(1);
// We namespace the clock related variables to keep them well
@@ -1554,20 +1555,21 @@ void initialize(const string& delegate)
spawn(new System(), true);
// Create the global statistics.
- value = getenv("LIBPROCESS_STATISTICS_WINDOW");
- if (value != NULL) {
- Try<Duration> window = Duration::parse(string(value));
- if (window.isError()) {
- LOG(FATAL) << "LIBPROCESS_STATISTICS_WINDOW=" << value
- << " is not a valid duration: " << window.error();
- }
- statistics = new Statistics(window.get());
- } else {
- // TODO(bmahler): Investigate memory implications of this window
- // size. We may also want to provide a maximum memory size rather than
- // time window. Or, offload older data to disk, etc.
- statistics = new Statistics(LIBPROCESS_STATISTICS_WINDOW);
- }
+ // TODO(dhamon): Plumb this through to metrics.
+ // value = getenv("LIBPROCESS_STATISTICS_WINDOW");
+ // if (value != NULL) {
+ // Try<Duration> window = Duration::parse(string(value));
+ // if (window.isError()) {
+ // LOG(FATAL) << "LIBPROCESS_STATISTICS_WINDOW=" << value
+ // << " is not a valid duration: " << window.error();
+ // }
+ // statistics = new Statistics(window.get());
+ // } else {
+ // // TODO(bmahler): Investigate memory implications of this window
+ // // size. We may also want to provide a maximum memory size rather than
+ // // time window. Or, offload older data to disk, etc.
+ // statistics = new Statistics(LIBPROCESS_STATISTICS_WINDOW);
+ // }
// Initialize the mime types.
mime::initialize();
http://git-wip-us.apache.org/repos/asf/mesos/blob/12e1a967/3rdparty/libprocess/src/statistics.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/statistics.cpp b/3rdparty/libprocess/src/statistics.cpp
deleted file mode 100644
index 75aac40..0000000
--- a/3rdparty/libprocess/src/statistics.cpp
+++ /dev/null
@@ -1,331 +0,0 @@
-#include <glog/logging.h>
-
-#include <algorithm>
-#include <list>
-#include <map>
-#include <string>
-#include <vector>
-
-#include <process/clock.hpp>
-#include <process/delay.hpp>
-#include <process/dispatch.hpp>
-#include <process/future.hpp>
-#include <process/help.hpp>
-#include <process/http.hpp>
-#include <process/process.hpp>
-#include <process/statistics.hpp>
-#include <process/time.hpp>
-
-#include <stout/error.hpp>
-#include <stout/duration.hpp>
-#include <stout/foreach.hpp>
-#include <stout/hashmap.hpp>
-#include <stout/hashset.hpp>
-#include <stout/json.hpp>
-#include <stout/none.hpp>
-#include <stout/numify.hpp>
-#include <stout/option.hpp>
-#include <stout/stringify.hpp>
-#include <stout/strings.hpp>
-
-using namespace process;
-using namespace process::http;
-
-using std::list;
-using std::map;
-using std::string;
-using std::vector;
-
-namespace process {
-
-// This is initialized by process::initialize().
-Statistics* statistics = NULL;
-
-const Duration STATISTICS_TRUNCATION_INTERVAL = Minutes(5);
-
-// TODO(bmahler): Move these into timeseries.hpp header once we
-// can require gcc >= 4.2.1.
-const Duration TIME_SERIES_WINDOW = Weeks(2);
-const size_t TIME_SERIES_CAPACITY = 1000;
-
-class StatisticsProcess : public Process<StatisticsProcess>
-{
-public:
- StatisticsProcess(const Duration& _window, size_t _capacity)
- : ProcessBase("statistics"),
- window(_window),
- capacity(_capacity) {}
-
- virtual ~StatisticsProcess() {}
-
- // Statistics implementation.
- TimeSeries<double> timeseries(
- const string& context,
- const string& name);
-
- void set(
- const string& context,
- const string& name,
- double value,
- const Time& time);
-
- void increment(const string& context, const string& name);
-
- void decrement(const string& context, const string& name);
-
-protected:
- virtual void initialize()
- {
- route("/snapshot.json", SNAPSHOT_HELP, &StatisticsProcess::snapshot);
- route("/series.json", SERIES_HELP, &StatisticsProcess::series);
-
- // Schedule the first truncation.
- delay(STATISTICS_TRUNCATION_INTERVAL, self(), &StatisticsProcess::truncate);
- }
-
-private:
- static const string SNAPSHOT_HELP;
- static const string SERIES_HELP;
-
- // Removes values for all statistics that occurred outside the time
- // series window. We always ensure 1 value remains.
- // NOTE: Runs periodically every STATISTICS_TRUNCATION_INTERVAL.
- void truncate();
-
- // Returns the a snapshot of all statistics in JSON.
- Future<Response> snapshot(const Request& request);
-
- // Returns the time series of a statistic in JSON.
- Future<Response> series(const Request& request);
-
- const Duration window;
- const size_t capacity;
-
- // This maps from {context: {name: TimeSeries } }.
- hashmap<string, hashmap<string, TimeSeries<double> > > statistics;
-};
-
-
-const string StatisticsProcess::SERIES_HELP = HELP(
- TLDR(
- "Provides the time series for ..."),
- USAGE(
- "/statistics/series.json..."),
- DESCRIPTION(
- "...",
- "",
- "Query parameters:",
- "",
- "> param=VALUE Some description here"));
-
-
-const string StatisticsProcess::SNAPSHOT_HELP = HELP(
- TLDR(
- "Provides a snapshot of the current statistics ..."),
- USAGE(
- "/statistics/snapshot.json..."),
- DESCRIPTION(
- "...",
- "",
- "Query parameters:",
- "",
- "> param=VALUE Some description here"));
-
-
-TimeSeries<double> StatisticsProcess::timeseries(
- const string& context,
- const string& name)
-{
- if (!statistics.contains(context) || !statistics[context].contains(name)) {
- return TimeSeries<double>();
- }
-
- return statistics[context][name];
-}
-
-
-void StatisticsProcess::set(
- const string& context,
- const string& name,
- double value,
- const Time& time)
-{
- if (!statistics[context].contains(name)) {
- statistics[context][name] = TimeSeries<double>(window, capacity);
- }
- statistics[context][name].set(value, time);
-}
-
-
-void StatisticsProcess::increment(const string& context, const string& name)
-{
- double value = 0.0;
- if (statistics[context].contains(name) &&
- !statistics[context][name].empty()) {
- value = statistics[context][name].latest().get().data;
- }
- set(context, name, value + 1.0, Clock::now());
-}
-
-
-void StatisticsProcess::decrement(const string& context, const string& name)
-{
- double value = 0.0;
- if (statistics[context].contains(name) &&
- !statistics[context][name].empty()) {
- value = statistics[context][name].latest().get().data;
- }
- set(context, name, value - 1.0, Clock::now());
-}
-
-
-void StatisticsProcess::truncate()
-{
- foreachkey (const string& context, statistics) {
- foreachkey (const string& name, statistics[context]) {
- statistics[context][name].truncate();
- }
- }
-
- delay(STATISTICS_TRUNCATION_INTERVAL, self(), &StatisticsProcess::truncate);
-}
-
-
-Future<Response> StatisticsProcess::snapshot(const Request& request)
-{
- JSON::Array array;
-
- Option<string> queryContext = request.query.get("context");
- Option<string> queryName = request.query.get("name");
-
- foreachkey (const string& context, statistics) {
- foreachkey (const string& name, statistics[context]) {
- // Skip statistics that don't match the query, if present.
- if (queryContext.isSome() && queryContext.get() != context) {
- continue;
- } else if (queryName.isSome() && queryName.get() != name) {
- continue;
- }
-
- const Option<TimeSeries<double>::Value>& value =
- statistics[context][name].latest();
-
- if (value.isSome()) {
- JSON::Object object;
- object.values["context"] = context;
- object.values["name"] = name;
- object.values["time"] = value.get().time.secs();
- object.values["value"] = value.get().data;
- array.values.push_back(object);
- }
- }
- }
-
- return OK(array, request.query.get("jsonp"));
-}
-
-
-Future<Response> StatisticsProcess::series(const Request& request)
-{
- Option<string> context = request.query.get("context");
- Option<string> name = request.query.get("name");
-
- if (!context.isSome()) {
- return BadRequest("Expected 'context=val' in query.\n");
- } else if (!name.isSome()) {
- return BadRequest("Expected 'name=val' in query.\n");
- }
-
- Option<Time> start = None();
- Option<Time> stop = None();
-
- if (request.query.get("start").isSome()) {
- Try<double> result = numify<double>(request.query.get("start").get());
- if (result.isError()) {
- return BadRequest("Failed to parse 'start': " + result.error() + "\n.");
- }
-
- Try<Time> start_ = Time::create(result.get());
- if (start_.isError()) {
- return BadRequest("Failed to parse 'start': " + start_.error() + "\n.");
- }
- start = start_.get();
- }
-
- if (request.query.get("stop").isSome()) {
- Try<double> result = numify<double>(request.query.get("stop").get());
- if (result.isError()) {
- return BadRequest("Failed to parse 'stop': " + result.error() + "\n.");
- }
-
- Try<Time> stop_ = Time::create(result.get());
- if (stop_.isError()) {
- return BadRequest("Failed to parse 'stop': " + stop_.error() + "\n.");
- }
- stop = stop_.get();
- }
-
- if (start.isSome() && stop.isSome() && start.get() > stop.get()) {
- return BadRequest("Invalid query: 'start' must be less than 'stop'\n.");
- }
-
- JSON::Array array;
-
- const vector<TimeSeries<double>::Value>& values =
- timeseries(context.get(), name.get()).get(start, stop);
-
- foreach (const TimeSeries<double>::Value& value, values) {
- JSON::Object object;
- object.values["time"] = value.time.secs();
- object.values["value"] = value.data;
- array.values.push_back(object);
- }
-
- return OK(array, request.query.get("jsonp"));
-}
-
-
-Statistics::Statistics(const Duration& window, size_t capacity)
-{
- process = new StatisticsProcess(window, capacity);
- spawn(process);
-}
-
-
-Statistics::~Statistics()
-{
- terminate(process);
- wait(process);
-}
-
-
-Future<TimeSeries<double> > Statistics::timeseries(
- const string& context,
- const string& name)
-{
- return dispatch(process, &StatisticsProcess::timeseries, context, name);
-}
-
-
-void Statistics::set(
- const string& context,
- const string& name,
- double value,
- const Time& time)
-{
- dispatch(process, &StatisticsProcess::set, context, name, value, time);
-}
-
-
-void Statistics::increment(const string& context, const string& name)
-{
- dispatch(process, &StatisticsProcess::increment, context, name);
-}
-
-
-void Statistics::decrement(const string& context, const string& name)
-{
- dispatch(process, &StatisticsProcess::decrement, context, name);
-}
-
-} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/12e1a967/3rdparty/libprocess/src/tests/statistics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/statistics_tests.cpp b/3rdparty/libprocess/src/tests/statistics_tests.cpp
index 3521bd5..478453f 100644
--- a/3rdparty/libprocess/src/tests/statistics_tests.cpp
+++ b/3rdparty/libprocess/src/tests/statistics_tests.cpp
@@ -1,93 +1,46 @@
-#include <gmock/gmock.h>
+#include <gtest/gtest.h>
#include <process/clock.hpp>
-#include <process/future.hpp>
-#include <process/gmock.hpp>
-#include <process/gtest.hpp>
#include <process/statistics.hpp>
-#include <process/time.hpp>
#include <stout/duration.hpp>
-#include <stout/foreach.hpp>
-#include <stout/list.hpp>
+#include <stout/gtest.hpp>
using namespace process;
-List<double> toList(const TimeSeries<double>& series)
+TEST(Statistics, empty)
{
- List<double> result;
- foreach (const TimeSeries<double>::Value& value, series.get()) {
- result.push_back(value.data);
- }
- return result;
-}
-
-
-TEST(Statistics, set)
-{
- Statistics statistics(Days(1));
-
- // Set one using Clock::now() implicitly.
- statistics.set("test", "statistic", 3.0);
-
- // Set one using Clock::now() explicitly.
- Time now = Clock::now();
- statistics.set("test", "statistic", 4.0, now);
-
- Future<TimeSeries<double> > values =
- statistics.timeseries("test", "statistic");
+ TimeSeries<double> timeseries;
- AWAIT_ASSERT_READY(values);
-
- EXPECT_EQ(2, values.get().get().size());
-
- EXPECT_GE(Clock::now(), values.get().get().begin()->time);
- EXPECT_DOUBLE_EQ(3.0, values.get().get().begin()->data);
-
- EXPECT_EQ(List<double>(3.0, 4.0), toList(values.get()));
+ EXPECT_NONE(Statistics<double>::from(timeseries));
}
-TEST(Statistics, increment)
+TEST(Statistics, statistics)
{
- Statistics statistics;
- Future<TimeSeries<double> > values;
-
- statistics.increment("test", "statistic");
- values = statistics.timeseries("test", "statistic");
- AWAIT_ASSERT_READY(values);
- EXPECT_EQ(List<double>(1.0), toList(values.get()));
-
- statistics.increment("test", "statistic");
- values = statistics.timeseries("test", "statistic");
- AWAIT_ASSERT_READY(values);
- EXPECT_EQ(List<double>(1.0, 2.0), toList(values.get()));
+ // Create a distribution of 10 values from -5 to 4.
+ TimeSeries<double> timeseries;
- statistics.increment("test", "statistic");
- values = statistics.timeseries("test", "statistic");
- AWAIT_ASSERT_READY(values);
- EXPECT_EQ(List<double>(1.0, 2.0, 3.0), toList(values.get()));
-}
+ Time now = Clock::now();
+ timeseries.set(0.0, now);
+ for (int i = -5; i < 5; ++i) {
+ now += Seconds(1);
+ timeseries.set(i, now);
+ }
-TEST(Statistics, decrement)
-{
- Statistics statistics;
- Future<TimeSeries<double> > values;
+ Option<Statistics<double> > statistics = Statistics<double>::from(timeseries);
- statistics.decrement("test", "statistic");
- values = statistics.timeseries("test", "statistic");
- AWAIT_ASSERT_READY(values);
- EXPECT_EQ(List<double>(-1.0), toList(values.get()));
+ EXPECT_SOME(statistics);
- statistics.decrement("test", "statistic");
- values = statistics.timeseries("test", "statistic");
- AWAIT_ASSERT_READY(values);
- EXPECT_EQ(List<double>(-1.0, -2.0), toList(values.get()));
+ EXPECT_FLOAT_EQ(-5.0, statistics.get().min);
+ EXPECT_FLOAT_EQ(4.0, statistics.get().max);
- statistics.decrement("test", "statistic");
- values = statistics.timeseries("test", "statistic");
- AWAIT_ASSERT_READY(values);
- EXPECT_EQ(List<double>(-1.0, -2.0, -3.0), toList(values.get()));
+ EXPECT_FLOAT_EQ(0.0, statistics.get().p50);
+ EXPECT_FLOAT_EQ(3.0, statistics.get().p90);
+ EXPECT_FLOAT_EQ(3.5, statistics.get().p95);
+ EXPECT_FLOAT_EQ(3.9, statistics.get().p99);
+ EXPECT_FLOAT_EQ(3.99, statistics.get().p999);
+ EXPECT_FLOAT_EQ(3.999, statistics.get().p9999);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/12e1a967/3rdparty/libprocess/src/timeseries.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/timeseries.cpp b/3rdparty/libprocess/src/timeseries.cpp
new file mode 100644
index 0000000..89af8ec
--- /dev/null
+++ b/3rdparty/libprocess/src/timeseries.cpp
@@ -0,0 +1,14 @@
+#include <stddef.h> // For size_t.
+
+#include <process/timeseries.hpp>
+
+#include <stout/duration.hpp>
+
+namespace process {
+
+// TODO(bmahler): Move these into timeseries.hpp header once we
+// can require gcc >= 4.2.1.
+const Duration TIME_SERIES_WINDOW = Weeks(2);
+const size_t TIME_SERIES_CAPACITY = 1000;
+
+} // namespace process {