You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/01/22 22:41:58 UTC
[5/8] git commit: Exposed TimeSeries and added "sparsification" of
older values.
Exposed TimeSeries and added "sparsification" of older values.
Review: https://reviews.apache.org/r/13603
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/157e0118
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/157e0118
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/157e0118
Branch: refs/heads/master
Commit: 157e0118c574ea21a269238e88e588db6c4ec22d
Parents: 99e8417
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Jul 23 16:23:59 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 22 13:03:11 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
.../libprocess/include/process/statistics.hpp | 49 ++--
.../libprocess/include/process/timeseries.hpp | 252 ++++++++++++++++++
3rdparty/libprocess/src/process.cpp | 1 +
3rdparty/libprocess/src/statistics.cpp | 253 ++++---------------
.../libprocess/src/tests/statistics_tests.cpp | 103 +++-----
.../libprocess/src/tests/timeseries_tests.cpp | 191 ++++++++++++++
7 files changed, 557 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 47e6d02..40f01a7 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -108,6 +108,7 @@ tests_SOURCES = \
src/tests/process_tests.cpp \
src/tests/shared_tests.cpp \
src/tests/statistics_tests.cpp \
+ src/tests/timeseries_tests.cpp \
src/tests/time_tests.cpp
tests_CPPFLAGS = \
http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/include/process/statistics.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/statistics.hpp b/3rdparty/libprocess/include/process/statistics.hpp
index 8592c4a..a4f1db3 100644
--- a/3rdparty/libprocess/include/process/statistics.hpp
+++ b/3rdparty/libprocess/include/process/statistics.hpp
@@ -5,6 +5,7 @@
#include <process/future.hpp>
#include <process/owned.hpp>
#include <process/time.hpp>
+#include <process/timeseries.hpp>
#include <stout/duration.hpp>
#include <stout/none.hpp>
@@ -26,41 +27,32 @@ class StatisticsProcess;
// Statistics are exposed via JSON for external visibility.
extern Statistics* statistics;
-const Duration STATISTICS_TRUNCATION_INTERVAL = Minutes(5);
-// Provides an in-memory time series of statistics over some window
-// (values are truncated outside of the window, but no limit is
-// currently placed on the number of values within a window).
-//
-// TODO(bmahler): Time series granularity should be coarsened over
-// time. This means, for high-frequency statistics, we keep a lot of
-// recent data points (fine granularity), and keep fewer older data
-// points (coarse granularity). The tunable bit here could be the
-// total number of data points to keep around, which informs how
-// often to delete older data points, while still keeping a window
-// worth of data.
+// Default statistic configuration variables.
+// TODO(bmahler): It appears there may be a bug with gcc-4.1.2 in
+// which these duration constants were not being initialized when
+// having static linkage. This issue did not manifest in newer gcc's.
+// Specifically, 4.2.1 was ok. So we've moved these to have external
+// linkage but perhaps in the future we can revert this.
+extern const Duration STATISTICS_TRUNCATION_INTERVAL;
+
+
+// Provides a collection of in-memory fixed capacity time series
+// of statistics over some window. Values are truncated when they
+// fall outside the window. "Sparsification" will occur when the
+// capacity of a time series is exceeded inside the window.
class Statistics
{
public:
- Statistics(const Duration& window);
+ Statistics(const Duration& window = TIME_SERIES_WINDOW,
+ size_t capacity = TIME_SERIES_CAPACITY);
~Statistics();
// Returns the time series of a statistic.
- process::Future<std::map<Time, double> > timeseries(
- const std::string& context,
- const std::string& name,
- const Option<Time>& start = None(),
- const Option<Time>& stop = None());
-
- // Returns the latest value of a statistic.
- process::Future<Option<double> > get(
+ process::Future<TimeSeries<double> > timeseries(
const std::string& context,
const std::string& name);
- // Returns the latest values of all statistics in the context.
- process::Future<std::map<std::string, double> > get(
- const std::string& context);
-
// Sets the current value of a statistic at the current clock time
// or at a specified time.
void set(
@@ -69,13 +61,6 @@ public:
double value,
const Time& time = Clock::now());
- // Archives the provided statistic time series.
- // This means two things:
- // 1. The statistic will no longer be part of the snapshot.
- // 2. However, the time series will be retained until the window
- // expiration.
- void archive(const std::string& context, const std::string& name);
-
// Increments the current value of a statistic. If no statistic was
// previously present, an initial value of 0.0 is used.
void increment(const std::string& context, const std::string& name);
http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/include/process/timeseries.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/timeseries.hpp b/3rdparty/libprocess/include/process/timeseries.hpp
new file mode 100644
index 0000000..b368b9b
--- /dev/null
+++ b/3rdparty/libprocess/include/process/timeseries.hpp
@@ -0,0 +1,252 @@
+#ifndef __PROCESS_TIMESERIES_HPP__
+#define __PROCESS_TIMESERIES_HPP__
+
+#include <algorithm> // For max.
+#include <map>
+#include <vector>
+
+#include <process/clock.hpp>
+#include <process/time.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/none.hpp>
+#include <stout/option.hpp>
+
+namespace process {
+
+// Default statistic configuration variables.
+// TODO(bmahler): It appears there may be a bug with gcc-4.1.2 in
+// which these duration constants were not being initialized when
+// having static linkage. This issue did not manifest in newer gcc's.
+// Specifically, 4.2.1 was ok. So we've moved these to have external
+// linkage but perhaps in the future we can revert this.
+extern const Duration TIME_SERIES_WINDOW;
+extern const size_t TIME_SERIES_CAPACITY;
+
+
+// Provides an in-memory time series of statistics over some window.
+// When the time series capacity is exceeded within the window, the
+// granularity of older values is coarsened. This means, for
+// high-frequency statistics that exceed the capacity, we keep a lot
+// of recent data points (fine granularity), and keep fewer older
+// data points (coarse granularity). The tunable bit here is the
+// total number of data points to keep around, which informs how
+// often to delete older data points, while still keeping a window
+// worth of data.
+// TODO(bmahler): Investigate using Google's btree implementation.
+// This provides better insertion and lookup performance for large
+// containers. This _should_ also provide significant memory
+// savings. These are true because we have the following properties:
+// 1. Our insertion order will mostly be in sorted order.
+// 2. Our keys (Seconds) have efficient comparison operators.
+// See: http://code.google.com/p/cpp-btree/
+// http://code.google.com/p/cpp-btree/wiki/UsageInstructions
+template <typename T>
+struct TimeSeries
+{
+ TimeSeries(const Duration& _window = TIME_SERIES_WINDOW,
+ size_t _capacity = TIME_SERIES_CAPACITY)
+ : window(_window),
+ // The truncation technique requires at least 3 elements.
+ capacity(std::max((size_t) 3, _capacity)) {}
+
+ struct Value
+ {
+ Value(const Time& _time, const T& _data) : time(_time), data(_data) {}
+
+ // Non-const for assignability.
+ Time time;
+ T data;
+ };
+
+ void set(const T& value, const Time& time = Clock::now())
+ {
+ // If we're not inserting at the end of the time series, then
+ // we have to reset the sparsification index. Given that
+ // out-of-order insertion is a rare use-case. This is a simple way
+ // to keep insertions O(log(n)). No need to figure out how to
+ // adjust the truncation index.
+ if (!values.empty() && time < values.rbegin()->first) {
+ index = None();
+ }
+
+ values[time] = value;
+ truncate();
+ sparsify();
+ }
+
+ // Returns the time series within the (optional) time range.
+ std::vector<Value> get(
+ const Option<Time>& start = None(),
+ const Option<Time>& stop = None()) const
+ {
+ // Ignore invalid ranges.
+ if (start.isSome() && stop.isSome() && start.get() > stop.get()) {
+ return std::vector<Value>();
+ }
+
+ typename std::map<Time, T>::const_iterator lower = values.lower_bound(
+ start.isSome() ? start.get() : Time::EPOCH);
+
+ typename std::map<Time, T>::const_iterator upper = values.upper_bound(
+ stop.isSome() ? stop.get() : Time::MAX);
+
+ std::vector<Value> values;
+ while (lower != upper) {
+ values.push_back(Value(lower->first, lower->second));
+ ++lower;
+ }
+ return values;
+ }
+
+ Option<Value> latest() const
+ {
+ if (empty()) {
+ return None();
+ }
+
+ return Value(values.rbegin()->first, values.rbegin()->second);
+ }
+
+ bool empty() const { return values.empty(); }
+
+ // Removes values outside the time window. This will ensure at
+ // least one value remains. Note that this is called automatically
+ // when writing to the time series, so this is only needed when
+ // one wants to explicitly trigger a truncation.
+ void truncate()
+ {
+ Time expired = Clock::now() - window;
+ typename std::map<Time, T>::iterator upper_bound =
+ values.upper_bound(expired);
+
+ // Ensure at least 1 value remains.
+ if (values.size() <= 1 || upper_bound == values.end()) {
+ return;
+ }
+
+ // When truncating and there exists a next value considered
+ // for sparsification, there are two cases to consider for
+ // updating the index:
+ //
+ // Case 1: upper_bound < next
+ // ----------------------------------------------------------
+ // upper_bound index, next
+ // v v
+ // Before: 0 1 2 3 4 5 6 7 ...
+ // ----------------------------------------------------------
+ // next index After truncating, index is
+ // v v must be adjusted:
+ // Truncate: 3 4 5 6 7 ... index -= # elements removed
+ // ----------------------------------------------------------
+ // index, next
+ // v
+ // After: 3 4 5 6 7 ...
+ // ----------------------------------------------------------
+ //
+ // Case 2: upper_bound >= next
+ // ----------------------------------------------------------
+ // upper_bound, index, next
+ // v
+ // Before: 0 1 2 3 4 5 6 7 ...
+ // ----------------------------------------------------------
+ // After truncating, we must
+ // After: 4 5 6 7 ... reset index to None().
+ // ----------------------------------------------------------
+ if (index.isSome() && upper_bound->first < next->first) {
+ size_t size = values.size();
+ values.erase(values.begin(), upper_bound);
+ index = index.get() - (size - values.size());
+ } else {
+ index = None();
+ values.erase(values.begin(), upper_bound);
+ }
+ }
+
+private:
+ // Performs "sparsification" to limit the size of the time series
+ // to be within the capacity.
+ //
+ // The sparsifying technique is to iteratively halve the granularity
+ // of the older half of the time series. Once sparsification reaches
+ // the midpoint of the time series, it begins again from the
+ // beginning.
+ //
+ // Sparsification results in the following granularity over time:
+ // Initial: | ------------------------ A -------------------- |
+ // Stage 1: | ------- 1/2 A ---------- | -------- B --------- |
+ // Stage 2: | -- 1/4A --- | -- 1/2B -- | -------- C --------- |
+ // Stage 3: | 1/8A | 1/4B | -- 1/2C -- | -------- D --------- |
+ // ...
+ //
+ // Each stage halves the size and granularity of time series prior
+ // to sparsifying.
+ void sparsify()
+ {
+ // We remove every other element up to the halfway point of the
+ // time series, until we're within the capacity. If we reach the
+ // half-way point of the time series, we'll start another
+ // sparsification cycle from the beginning, for example:
+ //
+ // next Time series with a capacity of 7.
+ // v Initial state with 7 entries
+ // 0 1 2 3 4 5 6
+ //
+ // next Insert '7'.
+ // v Capacity is exceeded, we remove '1' and
+ // 0 2 3 4 5 6 7 advance to remove '3' next.
+ //
+ // next Insert '8'.
+ // v Capacity is exceeded, we remove '3' and
+ // 0 2 4 5 6 7 8 advance to remove '5' next.
+ //
+ // next Insert '9'.
+ // v Capacity is exceeded, we remove '5' and now
+ // 0 2 4 6 7 8 9 '7' is past the halfway mark, so we will reset
+ // reset to the beginning and consider '2'.
+
+ while (values.size() > capacity) {
+ // If the index is uninitialized, or past the half-way point,
+ // we set it back to the beginning.
+ if (index.isNone() || index.get() > values.size() / 2) {
+ // The second element is the initial deletion candidate.
+ next = values.begin();
+ ++next;
+ index = 1;
+ }
+
+#if __cplusplus >= 201103L
+ next = values.erase(next);
+ next++; // Skip one element.
+#else
+ // Store the next deletion candidate.
+ typename std::map<Time, T>::iterator copy = next;
+ copy++; // Skip every other element.
+ copy++; // Next deletion candidate.
+
+ values.erase(next);
+ next = copy;
+#endif
+ index = index.get() + 1;
+ }
+ }
+
+ // Non-const for assignability.
+ Duration window;
+ size_t capacity;
+
+ // We use a map instead of a hashmap to store the values because
+ // that way we can retrieve a series in sorted order efficiently.
+ std::map<Time, T> values;
+
+ // Next deletion candidate. We store both the iterator and index.
+ // The index is None initially, and whenever a value is appended
+ // out-of-order. This means 'next' is only valid when 'index' is
+ // Some.
+ typename std::map<Time, T>::iterator next;
+ Option<size_t> index;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_TIMESERIES_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 936f118..bc7a1c5 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -593,6 +593,7 @@ ThreadLocal<Executor>* _executor_ = new ThreadLocal<Executor>();
const Duration LIBPROCESS_STATISTICS_WINDOW = Days(1);
+
// We namespace the clock related variables to keep them well
// named. In the future we'll probably want to associate a clock with
// a specific ProcessManager/SocketManager instance pair, so this will
http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/src/statistics.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/statistics.cpp b/3rdparty/libprocess/src/statistics.cpp
index 07990d1..75aac40 100644
--- a/3rdparty/libprocess/src/statistics.cpp
+++ b/3rdparty/libprocess/src/statistics.cpp
@@ -4,7 +4,6 @@
#include <list>
#include <map>
#include <string>
-#include <utility>
#include <vector>
#include <process/clock.hpp>
@@ -42,46 +41,27 @@ namespace process {
// This is initialized by process::initialize().
Statistics* statistics = NULL;
-// TODO(bmahler): Move time series related logic into this struct.
-// TODO(bmahler): Investigate using Google's sparse_hash_map.
-// Also investigate using Google's btree implementation.
-// This provides better insertion and lookup performance for large
-// containers. This _should_ also provide significant memory
-// savings, especially since:
-// 1. Our insertion order will mostly be in sorted order.
-// 2. Our keys (Seconds) have efficient comparison operators.
-// See: http://code.google.com/p/cpp-btree/
-// http://code.google.com/p/cpp-btree/wiki/UsageInstructions
-struct TimeSeries
-{
- TimeSeries() : values(), archived(false) {}
-
- // We use a map instead of a hashmap to store the values because
- // that way we can retrieve a series in sorted order efficiently.
- map<Time, double> values;
- bool archived;
-};
+const Duration STATISTICS_TRUNCATION_INTERVAL = Minutes(5);
+// TODO(bmahler): Move these into timeseries.hpp header once we
+// can require gcc >= 4.2.1.
+const Duration TIME_SERIES_WINDOW = Weeks(2);
+const size_t TIME_SERIES_CAPACITY = 1000;
class StatisticsProcess : public Process<StatisticsProcess>
{
public:
- StatisticsProcess(const Duration& _window)
+ StatisticsProcess(const Duration& _window, size_t _capacity)
: ProcessBase("statistics"),
- window(_window) {}
+ window(_window),
+ capacity(_capacity) {}
virtual ~StatisticsProcess() {}
// Statistics implementation.
- map<Time, double> timeseries(
+ TimeSeries<double> timeseries(
const string& context,
- const string& name,
- const Option<Time>& start,
- const Option<Time>& stop);
-
- Option<double> get(const string& context, const string& name);
-
- map<string, double> get(const string& context);
+ const string& name);
void set(
const string& context,
@@ -89,8 +69,6 @@ public:
double value,
const Time& time);
- void archive(const string& context, const string& name);
-
void increment(const string& context, const string& name);
void decrement(const string& context, const string& name);
@@ -109,18 +87,9 @@ private:
static const string SNAPSHOT_HELP;
static const string SERIES_HELP;
- // Removes values for the specified statistic that occurred outside
- // the time series window.
- // NOTE: We always ensure there is at least 1 value left for a statistic,
- // unless it is archived!
- // Returns true iff the time series is empty.
- bool truncate(const string& context, const string& name);
-
// Removes values for all statistics that occurred outside the time
- // series window.
+ // series window. We always ensure 1 value remains.
// NOTE: Runs periodically every STATISTICS_TRUNCATION_INTERVAL.
- // NOTE: We always ensure there is at least 1 value left for a statistic,
- // unless it is archived.
void truncate();
// Returns the a snapshot of all statistics in JSON.
@@ -130,9 +99,10 @@ private:
Future<Response> series(const Request& request);
const Duration window;
+ const size_t capacity;
// This maps from {context: {name: TimeSeries } }.
- hashmap<string, hashmap<string, TimeSeries> > statistics;
+ hashmap<string, hashmap<string, TimeSeries<double> > > statistics;
};
@@ -162,58 +132,15 @@ const string StatisticsProcess::SNAPSHOT_HELP = HELP(
"> param=VALUE Some description here"));
-map<Time, double> StatisticsProcess::timeseries(
+TimeSeries<double> StatisticsProcess::timeseries(
const string& context,
- const string& name,
- const Option<Time>& start,
- const Option<Time>& stop)
+ const string& name)
{
if (!statistics.contains(context) || !statistics[context].contains(name)) {
- return map<Time, double>();
+ return TimeSeries<double>();
}
- const std::map<Time, double>& values =
- statistics[context].find(name)->second.values;
-
- map<Time, double>::const_iterator lower = values.lower_bound(start.isSome()
- ? start.get() : Time::EPOCH);
-
- map<Time, double>::const_iterator upper = values.upper_bound(stop.isSome()
- ? stop.get() : Time::MAX);
-
- return map<Time, double>(lower, upper);
-}
-
-
-Option<double> StatisticsProcess::get(const string& context, const string& name)
-{
- if (!statistics.contains(context) ||
- !statistics[context].contains(name) ||
- statistics[context][name].values.empty()) {
- return Option<double>::none();
- } else {
- return statistics[context][name].values.rbegin()->second;
- }
-}
-
-
-map<string, double> StatisticsProcess::get(const string& context)
-{
- map<string, double> results;
-
- if (!statistics.contains(context)) {
- return results;
- }
-
- foreachkey (const string& name, statistics[context]) {
- const map<Time, double>& values = statistics[context][name].values;
-
- if (!values.empty()) {
- results[name] = values.rbegin()->second;
- }
- }
-
- return results;
+ return statistics[context][name];
}
@@ -223,25 +150,19 @@ void StatisticsProcess::set(
double value,
const Time& time)
{
- statistics[context][name].values[time] = value; // Update the raw value.
- statistics[context][name].archived = false; // Unarchive.
-
- truncate(context, name);
-}
-
-
-void StatisticsProcess::archive(const string& context, const string& name)
-{
- // Exclude the statistic from the snapshot.
- statistics[context][name].archived = true;
+ if (!statistics[context].contains(name)) {
+ statistics[context][name] = TimeSeries<double>(window, capacity);
+ }
+ statistics[context][name].set(value, time);
}
void StatisticsProcess::increment(const string& context, const string& name)
{
double value = 0.0;
- if (!statistics[context][name].values.empty()) {
- value = statistics[context][name].values.rbegin()->second;
+ if (statistics[context].contains(name) &&
+ !statistics[context][name].empty()) {
+ value = statistics[context][name].latest().get().data;
}
set(context, name, value + 1.0, Clock::now());
}
@@ -250,59 +171,19 @@ void StatisticsProcess::increment(const string& context, const string& name)
void StatisticsProcess::decrement(const string& context, const string& name)
{
double value = 0.0;
- if (!statistics[context][name].values.empty()) {
- value = statistics[context][name].values.rbegin()->second;
+ if (statistics[context].contains(name) &&
+ !statistics[context][name].empty()) {
+ value = statistics[context][name].latest().get().data;
}
set(context, name, value - 1.0, Clock::now());
}
-bool StatisticsProcess::truncate(const string& context, const string& name)
-{
- CHECK(statistics.contains(context));
- CHECK(statistics[context].contains(name));
-
- if (statistics[context][name].values.empty()) {
- return true; // No truncation is needed, the time series is already empty.
- }
-
- map<Time, double>::iterator start =
- statistics[context][name].values.begin();
-
- while ((Clock::now() - start->first) > window) {
- // Always keep at least one value for a statistic, unless it's archived!
- if (statistics[context][name].values.size() == 1) {
- if (statistics[context][name].archived) {
- statistics[context][name].values.clear();
- }
- break;
- }
-
- statistics[context][name].values.erase(start);
- start = statistics[context][name].values.begin();
- }
-
- return statistics[context][name].values.empty();
-}
-
-
void StatisticsProcess::truncate()
{
- hashmap<string, hashset<string> > empties;
-
foreachkey (const string& context, statistics) {
foreachkey (const string& name, statistics[context]) {
- // Keep track of the emptied timeseries.
- if (truncate(context, name)) {
- empties[context].insert(name);
- }
- }
- }
-
- // Remove the empty timeseries.
- foreachkey (const string& context, empties) {
- foreach (const string& name, empties[context]) {
- statistics[context].erase(name);
+ statistics[context][name].truncate();
}
}
@@ -319,12 +200,6 @@ Future<Response> StatisticsProcess::snapshot(const Request& request)
foreachkey (const string& context, statistics) {
foreachkey (const string& name, statistics[context]) {
- // Exclude archived and empty time series.
- if (statistics[context][name].archived ||
- statistics[context][name].values.empty()) {
- continue;
- }
-
// Skip statistics that don't match the query, if present.
if (queryContext.isSome() && queryContext.get() != context) {
continue;
@@ -332,14 +207,17 @@ Future<Response> StatisticsProcess::snapshot(const Request& request)
continue;
}
- JSON::Object object;
- object.values["context"] = context;
- object.values["name"] = name;
- object.values["time"] =
- statistics[context][name].values.rbegin()->first.secs();
- object.values["value"] =
- statistics[context][name].values.rbegin()->second;
- array.values.push_back(object);
+ const Option<TimeSeries<double>::Value>& value =
+ statistics[context][name].latest();
+
+ if (value.isSome()) {
+ JSON::Object object;
+ object.values["context"] = context;
+ object.values["name"] = name;
+ object.values["time"] = value.get().time.secs();
+ object.values["value"] = value.get().data;
+ array.values.push_back(object);
+ }
}
}
@@ -364,12 +242,12 @@ Future<Response> StatisticsProcess::series(const Request& request)
if (request.query.get("start").isSome()) {
Try<double> result = numify<double>(request.query.get("start").get());
if (result.isError()) {
- return BadRequest("Failed to parse 'start': " + result.error());
+ return BadRequest("Failed to parse 'start': " + result.error() + "\n.");
}
Try<Time> start_ = Time::create(result.get());
if (start_.isError()) {
- return BadRequest("Failed to parse 'start': " + start_.error());
+ return BadRequest("Failed to parse 'start': " + start_.error() + "\n.");
}
start = start_.get();
}
@@ -377,25 +255,29 @@ Future<Response> StatisticsProcess::series(const Request& request)
if (request.query.get("stop").isSome()) {
Try<double> result = numify<double>(request.query.get("stop").get());
if (result.isError()) {
- return BadRequest("Failed to parse 'stop': " + result.error());
+ return BadRequest("Failed to parse 'stop': " + result.error() + "\n.");
}
Try<Time> stop_ = Time::create(result.get());
if (stop_.isError()) {
- return BadRequest("Failed to parse 'stop': " + stop_.error());
+ return BadRequest("Failed to parse 'stop': " + stop_.error() + "\n.");
}
stop = stop_.get();
}
+ if (start.isSome() && stop.isSome() && start.get() > stop.get()) {
+ return BadRequest("Invalid query: 'start' must be less than 'stop'\n.");
+ }
+
JSON::Array array;
- const map<Time, double>& values =
- timeseries(context.get(), name.get(), start, stop);
+ const vector<TimeSeries<double>::Value>& values =
+ timeseries(context.get(), name.get()).get(start, stop);
- foreachpair (const Time& s, double value, values) {
+ foreach (const TimeSeries<double>::Value& value, values) {
JSON::Object object;
- object.values["time"] = s.secs();
- object.values["value"] = value;
+ object.values["time"] = value.time.secs();
+ object.values["value"] = value.data;
array.values.push_back(object);
}
@@ -403,9 +285,9 @@ Future<Response> StatisticsProcess::series(const Request& request)
}
-Statistics::Statistics(const Duration& window)
+Statistics::Statistics(const Duration& window, size_t capacity)
{
- process = new StatisticsProcess(window);
+ process = new StatisticsProcess(window, capacity);
spawn(process);
}
@@ -417,28 +299,11 @@ Statistics::~Statistics()
}
-Future<map<Time, double> > Statistics::timeseries(
- const string& context,
- const string& name,
- const Option<Time>& start,
- const Option<Time>& stop)
-{
- return dispatch(
- process, &StatisticsProcess::timeseries, context, name, start, stop);
-}
-
-
-Future<Option<double> > Statistics::get(
+Future<TimeSeries<double> > Statistics::timeseries(
const string& context,
const string& name)
{
- return dispatch(process, &StatisticsProcess::get, context, name);
-}
-
-
-Future<map<string, double> > Statistics::get(const string& context)
-{
- return dispatch(process, &StatisticsProcess::get, context);
+ return dispatch(process, &StatisticsProcess::timeseries, context, name);
}
@@ -452,12 +317,6 @@ void Statistics::set(
}
-void Statistics::archive(const string& context, const string& name)
-{
- dispatch(process, &StatisticsProcess::archive, context, name);
-}
-
-
void Statistics::increment(const string& context, const string& name)
{
dispatch(process, &StatisticsProcess::increment, context, name);
http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/src/tests/statistics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/statistics_tests.cpp b/3rdparty/libprocess/src/tests/statistics_tests.cpp
index 1acf47c..1382653 100644
--- a/3rdparty/libprocess/src/tests/statistics_tests.cpp
+++ b/3rdparty/libprocess/src/tests/statistics_tests.cpp
@@ -1,18 +1,28 @@
#include <gmock/gmock.h>
-#include <map>
-
#include <process/clock.hpp>
#include <process/future.hpp>
+#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/statistics.hpp>
#include <process/time.hpp>
#include <stout/duration.hpp>
+#include <stout/foreach.hpp>
+#include <stout/list.hpp>
using namespace process;
-using std::map;
+
+// Overload for testing time series equality.
+bool operator == (const List<double>& list, const TimeSeries<double>& series)
+{
+ List<double> result;
+ foreach (const TimeSeries<double>::Value& value, series.get()) {
+ result.push_back(value.data);
+ }
+ return list == result;
+}
TEST(Statistics, set)
@@ -26,94 +36,59 @@ TEST(Statistics, set)
Time now = Clock::now();
statistics.set("test", "statistic", 4.0, now);
- Future<map<Time, double> > values =
+ Future<TimeSeries<double> > values =
statistics.timeseries("test", "statistic");
AWAIT_ASSERT_READY(values);
- EXPECT_EQ(2, values.get().size());
+ EXPECT_EQ(2, values.get().get().size());
- EXPECT_GE(Clock::now(), values.get().begin()->first);
- EXPECT_DOUBLE_EQ(3.0, values.get().begin()->second);
+ EXPECT_GE(Clock::now(), values.get().get().begin()->time);
+ EXPECT_DOUBLE_EQ(3.0, values.get().get().begin()->data);
- EXPECT_EQ(1, values.get().count(now));
- EXPECT_DOUBLE_EQ(4.0, values.get()[now]);
+ EXPECT_EQ(List<double>(3.0, 4.0), values.get());
}
-TEST(Statistics, truncate)
+TEST(Statistics, increment)
{
- Clock::pause();
-
- Statistics statistics(Days(1));
-
- statistics.set("test", "statistic", 3.0);
-
- Future<map<Time, double> > values =
- statistics.timeseries("test", "statistic");
+ Statistics statistics;
+ Future<TimeSeries<double> > values;
+ statistics.increment("test", "statistic");
+ values = statistics.timeseries("test", "statistic");
AWAIT_ASSERT_READY(values);
-
- EXPECT_EQ(1, values.get().size());
- EXPECT_GE(Clock::now(), values.get().begin()->first);
- EXPECT_DOUBLE_EQ(3.0, values.get().begin()->second);
-
- Clock::advance(Days(1) + Seconds(1));
- Clock::settle();
+ EXPECT_EQ(List<double>(1.0), values.get());
statistics.increment("test", "statistic");
-
values = statistics.timeseries("test", "statistic");
-
AWAIT_ASSERT_READY(values);
+ EXPECT_EQ(List<double>(1.0, 2.0), values.get());
- EXPECT_EQ(1, values.get().size());
- EXPECT_GE(Clock::now(), values.get().begin()->first);
- EXPECT_DOUBLE_EQ(4.0, values.get().begin()->second);
-
- Clock::resume();
+ statistics.increment("test", "statistic");
+ values = statistics.timeseries("test", "statistic");
+ AWAIT_ASSERT_READY(values);
+ EXPECT_EQ(List<double>(1.0, 2.0, 3.0), values.get());
}
-TEST(Statistics, archive)
+TEST(Statistics, decrement)
{
- Clock::pause();
-
- Statistics statistics(Seconds(10));
-
- Time now = Clock::now();
- statistics.set("test", "statistic", 1.0, now);
- statistics.set("test", "statistic", 2.0, Time(now + Seconds(1)));
+ Statistics statistics;
+ Future<TimeSeries<double> > values;
- // Archive and ensure the following:
- // 1. The statistic will no longer be part of the snapshot.
- // 2. However, the time series will be retained until the window
- // expiration.
- statistics.archive("test", "statistic");
-
- // TODO(bmahler): Wait for JSON parsing to verify number 1.
-
- // Ensure the raw time series is present.
- Future<map<Time, double> > values =
- statistics.timeseries("test", "statistic");
+ statistics.decrement("test", "statistic");
+ values = statistics.timeseries("test", "statistic");
AWAIT_ASSERT_READY(values);
- EXPECT_FALSE(values.get().empty());
+ EXPECT_EQ(List<double>(-1.0), values.get());
- // Expire the window and ensure the statistics were removed.
- Clock::advance(STATISTICS_TRUNCATION_INTERVAL);
- Clock::settle();
-
- // Ensure the raw statistics are gone.
+ statistics.decrement("test", "statistic");
values = statistics.timeseries("test", "statistic");
AWAIT_ASSERT_READY(values);
- EXPECT_TRUE(values.get().empty());
-
- // Reactivate the statistic, and make sure we can retrieve it.
- statistics.set("test", "statistic", 1.0, now);
+ EXPECT_EQ(List<double>(-1.0, -2.0), values.get());
+ statistics.decrement("test", "statistic");
values = statistics.timeseries("test", "statistic");
AWAIT_ASSERT_READY(values);
- EXPECT_FALSE(values.get().empty());
-
- Clock::resume();
+ EXPECT_EQ(List<double>(-1.0, -2.0, -3.0), values.get());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/src/tests/timeseries_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/timeseries_tests.cpp b/3rdparty/libprocess/src/tests/timeseries_tests.cpp
new file mode 100644
index 0000000..0fa966e
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/timeseries_tests.cpp
@@ -0,0 +1,191 @@
+#include <gmock/gmock.h>
+
+#include <process/clock.hpp>
+#include <process/timeseries.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/gtest.hpp>
+#include <stout/list.hpp>
+
+using namespace process;
+
+
+// Overload for testing time series equality.
+bool operator == (const List<int>& list, const TimeSeries<int>& series)
+{
+ List<int> result;
+ foreach (const TimeSeries<int>::Value& value, series.get()) {
+ result.push_back(value.data);
+ }
+ return list == result;
+}
+
+
+TEST(TimeSeries, set)
+{
+ TimeSeries<int> series;
+
+ ASSERT_TRUE(series.empty());
+
+ series.set(1);
+
+ ASSERT_FALSE(series.empty());
+
+ const Option<TimeSeries<int>::Value> latest = series.latest();
+
+ ASSERT_SOME(latest);
+ ASSERT_EQ(1, latest.get().data);
+}
+
+TEST(TimeSeries, sparsify)
+{
+ // Create a time series and fill it to its capacity.
+ TimeSeries<int> series(Duration::max(), 10);
+
+ series.set(0);
+ series.set(1);
+ series.set(2);
+ series.set(3);
+ series.set(4);
+ series.set(5);
+ series.set(6);
+ series.set(7);
+ series.set(8);
+ series.set(9);
+
+ ASSERT_EQ(List<int>(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), series);
+
+ // Verify the sparsification pattern.
+ series.set(10);
+ ASSERT_EQ(List<int>(0, 2, 3, 4, 5, 6, 7, 8, 9, 10), series);
+
+ series.set(11);
+ ASSERT_EQ(List<int>(0, 2, 4, 5, 6, 7, 8, 9, 10, 11), series);
+
+ series.set(12);
+ ASSERT_EQ(List<int>(0, 2, 4, 6, 7, 8, 9, 10, 11, 12), series);
+
+ series.set(13);
+ ASSERT_EQ(List<int>(0, 2, 4, 6, 8, 9, 10, 11, 12, 13), series);
+
+ series.set(14);
+ ASSERT_EQ(List<int>(0, 2, 4, 6, 8, 10, 11, 12, 13, 14), series);
+
+ // Now we expect a new round of sparsification to occur, starting
+ // again from the beginning.
+ series.set(15);
+ ASSERT_EQ(List<int>(0, 4, 6, 8, 10, 11, 12, 13, 14, 15), series);
+
+ series.set(16);
+ ASSERT_EQ(List<int>(0, 4, 8, 10, 11, 12, 13, 14, 15, 16), series);
+
+ series.set(17);
+ ASSERT_EQ(List<int>(0, 4, 8, 11, 12, 13, 14, 15, 16, 17), series);
+
+ series.set(18);
+ ASSERT_EQ(List<int>(0, 4, 8, 11, 13, 14, 15, 16, 17, 18), series);
+
+ series.set(19);
+ ASSERT_EQ(List<int>(0, 4, 8, 11, 13, 15, 16, 17, 18, 19), series);
+}
+
+
+TEST(TimeSeries, truncate)
+{
+ // Test simple truncation first.
+ Clock::pause();
+ Time now = Clock::now();
+
+ // Create a time series and fill it to its capacity.
+ TimeSeries<int> series(Seconds(10), 10);
+
+ series.set(0, now);
+ series.set(1, now + Seconds(1));
+ series.set(2, now + Seconds(2));
+ series.set(3, now + Seconds(3));
+ series.set(4, now + Seconds(4));
+ series.set(5, now + Seconds(5));
+ series.set(6, now + Seconds(6));
+ series.set(7, now + Seconds(7));
+ series.set(8, now + Seconds(8));
+ series.set(9, now + Seconds(9));
+
+ ASSERT_EQ(List<int>(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), series);
+
+ // Cause the first 6 tasks to be truncated from the window.
+ Clock::advance(Seconds(10 + 6));
+ series.set(10, now + Seconds(10));
+
+ ASSERT_EQ(List<int>(7, 8, 9, 10), series);
+
+ Clock::resume();
+
+
+ // Now test truncation in the face of sparsification.
+ Clock::pause();
+ now = Clock::now();
+
+ series = TimeSeries<int>(Seconds(10), 10);
+
+ series.set(0, now);
+ series.set(1, now + Seconds(1));
+ series.set(2, now + Seconds(2));
+ series.set(3, now + Seconds(3));
+ series.set(4, now + Seconds(4));
+ series.set(5, now + Seconds(5));
+ series.set(6, now + Seconds(6));
+ series.set(7, now + Seconds(7));
+ series.set(8, now + Seconds(8));
+ series.set(9, now + Seconds(9));
+
+ ASSERT_EQ(List<int>(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), series);
+
+ // Move the sparsification candidate forward to ensure sparsification
+ // is correct after a truncation occurs.
+ series.set(10, now + Seconds(10));
+ ASSERT_EQ(List<int>(0, 2, 3, 4, 5, 6, 7, 8, 9, 10), series);
+
+ series.set(11, now + Seconds(11));
+ ASSERT_EQ(List<int>(0, 2, 4, 5, 6, 7, 8, 9, 10, 11), series);
+
+ series.set(12, now + Seconds(12));
+ ASSERT_EQ(List<int>(0, 2, 4, 6, 7, 8, 9, 10, 11, 12), series);
+
+ // Now the next sparsification candidate is '7'. First, we will
+ // truncate exluding '7' and ensure sparsification proceeds as
+ // expected.
+ Clock::advance(Seconds(10 + 2));
+ series.truncate();
+ ASSERT_EQ(List<int>(4, 6, 7, 8, 9, 10, 11, 12), series);
+
+ // Add 2 more items to return to capacity.
+ series.set(13, now + Seconds(13));
+ series.set(14, now + Seconds(14));
+ ASSERT_EQ(List<int>(4, 6, 7, 8, 9, 10, 11, 12, 13, 14), series);
+
+ // Now cause the time series to exceed capacity and ensure we
+ // correctly remove '7'.
+ series.set(15, now + Seconds(15));
+ ASSERT_EQ(List<int>(4, 6, 8, 9, 10, 11, 12, 13, 14, 15), series);
+
+ // Finally, let's truncate into the next sparsification candidate
+ // '9', and ensure sparsification is reset.
+ Clock::advance(Seconds(7)); // 2 + 7 = 9.
+ series.truncate();
+ ASSERT_EQ(List<int>(10, 11, 12, 13, 14, 15), series);
+
+ // Get back to capacity and ensure sparsification starts from the
+ // beginning.
+ series.set(16, now + Seconds(16));
+ series.set(17, now + Seconds(17));
+ series.set(18, now + Seconds(18));
+ series.set(19, now + Seconds(19));
+ ASSERT_EQ(List<int>(10, 11, 12, 13, 14, 15, 16, 17, 18, 19), series);
+
+ // Did we sparsify from the beginning?
+ series.set(20, now + Seconds(20));
+ ASSERT_EQ(List<int>(10, 12, 13, 14, 15, 16, 17, 18, 19, 20), series);
+
+ // Done!
+ Clock::resume();
+}