You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/01/22 22:41:58 UTC

[5/8] git commit: Exposed TimeSeries and added "sparsification" of older values.

Exposed TimeSeries and added "sparsification" of older values.

Review: https://reviews.apache.org/r/13603


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/157e0118
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/157e0118
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/157e0118

Branch: refs/heads/master
Commit: 157e0118c574ea21a269238e88e588db6c4ec22d
Parents: 99e8417
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Jul 23 16:23:59 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Jan 22 13:03:11 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |   1 +
 .../libprocess/include/process/statistics.hpp   |  49 ++--
 .../libprocess/include/process/timeseries.hpp   | 252 ++++++++++++++++++
 3rdparty/libprocess/src/process.cpp             |   1 +
 3rdparty/libprocess/src/statistics.cpp          | 253 ++++---------------
 .../libprocess/src/tests/statistics_tests.cpp   | 103 +++-----
 .../libprocess/src/tests/timeseries_tests.cpp   | 191 ++++++++++++++
 7 files changed, 557 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 47e6d02..40f01a7 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -108,6 +108,7 @@ tests_SOURCES =							\
   src/tests/process_tests.cpp					\
   src/tests/shared_tests.cpp					\
   src/tests/statistics_tests.cpp				\
+  src/tests/timeseries_tests.cpp				\
   src/tests/time_tests.cpp
 
 tests_CPPFLAGS =			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/include/process/statistics.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/statistics.hpp b/3rdparty/libprocess/include/process/statistics.hpp
index 8592c4a..a4f1db3 100644
--- a/3rdparty/libprocess/include/process/statistics.hpp
+++ b/3rdparty/libprocess/include/process/statistics.hpp
@@ -5,6 +5,7 @@
 #include <process/future.hpp>
 #include <process/owned.hpp>
 #include <process/time.hpp>
+#include <process/timeseries.hpp>
 
 #include <stout/duration.hpp>
 #include <stout/none.hpp>
@@ -26,41 +27,32 @@ class StatisticsProcess;
 // Statistics are exposed via JSON for external visibility.
 extern Statistics* statistics;
 
-const Duration STATISTICS_TRUNCATION_INTERVAL = Minutes(5);
 
-// Provides an in-memory time series of statistics over some window
-// (values are truncated outside of the window, but no limit is
-// currently placed on the number of values within a window).
-//
-// TODO(bmahler): Time series granularity should be coarsened over
-// time. This means, for high-frequency statistics, we keep a lot of
-// recent data points (fine granularity), and keep fewer older data
-// points (coarse granularity). The tunable bit here could be the
-// total number of data points to keep around, which informs how
-// often to delete older data points, while still keeping a window
-// worth of data.
+// Default statistic configuration variables.
+// TODO(bmahler): It appears there may be a bug with gcc-4.1.2 in
+// which these duration constants were not being initialized when
+// having static linkage. This issue did not manifest in newer gcc's.
+// Specifically, 4.2.1 was ok. So we've moved these to have external
+// linkage but perhaps in the future we can revert this.
+extern const Duration STATISTICS_TRUNCATION_INTERVAL;
+
+
+// Provides a collection of in-memory fixed capacity time series
+// of statistics over some window. Values are truncated when they
+// fall outside the window. "Sparsification" will occur when the
+// capacity of a time series is exceeded inside the window.
 class Statistics
 {
 public:
-  Statistics(const Duration& window);
+  Statistics(const Duration& window = TIME_SERIES_WINDOW,
+             size_t capacity = TIME_SERIES_CAPACITY);
   ~Statistics();
 
   // Returns the time series of a statistic.
-  process::Future<std::map<Time, double> > timeseries(
-      const std::string& context,
-      const std::string& name,
-      const Option<Time>& start = None(),
-      const Option<Time>& stop = None());
-
-  // Returns the latest value of a statistic.
-  process::Future<Option<double> > get(
+  process::Future<TimeSeries<double> > timeseries(
       const std::string& context,
       const std::string& name);
 
-  // Returns the latest values of all statistics in the context.
-  process::Future<std::map<std::string, double> > get(
-      const std::string& context);
-
   // Sets the current value of a statistic at the current clock time
   // or at a specified time.
   void set(
@@ -69,13 +61,6 @@ public:
       double value,
       const Time& time = Clock::now());
 
-  // Archives the provided statistic time series.
-  // This means two things:
-  //   1. The statistic will no longer be part of the snapshot.
-  //   2. However, the time series will be retained until the window
-  //      expiration.
-  void archive(const std::string& context, const std::string& name);
-
   // Increments the current value of a statistic. If no statistic was
   // previously present, an initial value of 0.0 is used.
   void increment(const std::string& context, const std::string& name);

http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/include/process/timeseries.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/timeseries.hpp b/3rdparty/libprocess/include/process/timeseries.hpp
new file mode 100644
index 0000000..b368b9b
--- /dev/null
+++ b/3rdparty/libprocess/include/process/timeseries.hpp
@@ -0,0 +1,252 @@
+#ifndef __PROCESS_TIMESERIES_HPP__
+#define __PROCESS_TIMESERIES_HPP__
+
+#include <algorithm> // For max.
+#include <map>
+#include <vector>
+
+#include <process/clock.hpp>
+#include <process/time.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/none.hpp>
+#include <stout/option.hpp>
+
+namespace process {
+
+// Default statistic configuration variables.
+// TODO(bmahler): It appears there may be a bug with gcc-4.1.2 in
+// which these duration constants were not being initialized when
+// having static linkage. This issue did not manifest in newer gcc's.
+// Specifically, 4.2.1 was ok. So we've moved these to have external
+// linkage but perhaps in the future we can revert this.
+extern const Duration TIME_SERIES_WINDOW;
+extern const size_t TIME_SERIES_CAPACITY;
+
+
+// Provides an in-memory time series of statistics over some window.
+// When the time series capacity is exceeded within the window, the
+// granularity of older values is coarsened. This means, for
+// high-frequency statistics that exceed the capacity, we keep a lot
+// of recent data points (fine granularity), and keep fewer older
+// data points (coarse granularity). The tunable bit here is the
+// total number of data points to keep around, which informs how
+// often to delete older data points, while still keeping a window
+// worth of data.
+// TODO(bmahler): Investigate using Google's btree implementation.
+// This provides better insertion and lookup performance for large
+// containers. This _should_ also provide significant memory
+// savings. These are true because we have the following properties:
+//   1. Our insertion order will mostly be in sorted order.
+//   2. Our keys (Seconds) have efficient comparison operators.
+// See: http://code.google.com/p/cpp-btree/
+//      http://code.google.com/p/cpp-btree/wiki/UsageInstructions
+template <typename T>
+struct TimeSeries
+{
+  TimeSeries(const Duration& _window = TIME_SERIES_WINDOW,
+             size_t _capacity = TIME_SERIES_CAPACITY)
+    : window(_window),
+      // The truncation technique requires at least 3 elements.
+      capacity(std::max((size_t) 3, _capacity)) {}
+
+  struct Value
+  {
+    Value(const Time& _time, const T& _data) : time(_time), data(_data) {}
+
+    // Non-const for assignability.
+    Time time;
+    T data;
+  };
+
+  void set(const T& value, const Time& time = Clock::now())
+  {
+    // If we're not inserting at the end of the time series, then
+    // we have to reset the sparsification index. Given that
+    // out-of-order insertion is a rare use-case. This is a simple way
+    // to keep insertions O(log(n)). No need to figure out how to
+    // adjust the truncation index.
+    if (!values.empty() && time < values.rbegin()->first) {
+      index = None();
+    }
+
+    values[time] = value;
+    truncate();
+    sparsify();
+  }
+
+  // Returns the time series within the (optional) time range.
+  std::vector<Value> get(
+      const Option<Time>& start = None(),
+      const Option<Time>& stop = None()) const
+  {
+    // Ignore invalid ranges.
+    if (start.isSome() && stop.isSome() && start.get() > stop.get()) {
+      return std::vector<Value>();
+    }
+
+    typename std::map<Time, T>::const_iterator lower = values.lower_bound(
+        start.isSome() ? start.get() : Time::EPOCH);
+
+    typename std::map<Time, T>::const_iterator upper = values.upper_bound(
+        stop.isSome() ? stop.get() : Time::MAX);
+
+    std::vector<Value> values;
+    while (lower != upper) {
+      values.push_back(Value(lower->first, lower->second));
+      ++lower;
+    }
+    return values;
+  }
+
+  Option<Value> latest() const
+  {
+    if (empty()) {
+      return None();
+    }
+
+    return Value(values.rbegin()->first, values.rbegin()->second);
+  }
+
+  bool empty() const { return values.empty(); }
+
+  // Removes values outside the time window. This will ensure at
+  // least one value remains. Note that this is called automatically
+  // when writing to the time series, so this is only needed when
+  // one wants to explicitly trigger a truncation.
+  void truncate()
+  {
+    Time expired = Clock::now() - window;
+    typename std::map<Time, T>::iterator upper_bound =
+      values.upper_bound(expired);
+
+    // Ensure at least 1 value remains.
+    if (values.size() <= 1 || upper_bound == values.end()) {
+      return;
+    }
+
+    // When truncating and there exists a next value considered
+    // for sparsification, there are two cases to consider for
+    // updating the index:
+    //
+    // Case 1: upper_bound < next
+    //   ----------------------------------------------------------
+    //       upper_bound index, next
+    //                 v v
+    //   Before: 0 1 2 3 4 5 6 7 ...
+    //   ----------------------------------------------------------
+    //                 next  index    After truncating, index is
+    //                   v     v      must be adjusted:
+    //   Truncate:     3 4 5 6 7 ...  index -= # elements removed
+    //   ----------------------------------------------------------
+    //              index, next
+    //                   v
+    //   After:        3 4 5 6 7 ...
+    //   ----------------------------------------------------------
+    //
+    // Case 2: upper_bound >= next
+    //   ----------------------------------------------------------
+    //                   upper_bound, index, next
+    //                   v
+    //   Before: 0 1 2 3 4 5 6 7 ...
+    //   ----------------------------------------------------------
+    //                               After truncating, we must
+    //   After:          4 5 6 7 ... reset index to None().
+    //   ----------------------------------------------------------
+    if (index.isSome() && upper_bound->first < next->first) {
+      size_t size = values.size();
+      values.erase(values.begin(), upper_bound);
+      index = index.get() - (size - values.size());
+    } else {
+      index = None();
+      values.erase(values.begin(), upper_bound);
+    }
+  }
+
+private:
+  // Performs "sparsification" to limit the size of the time series
+  // to be within the capacity.
+  //
+  // The sparsifying technique is to iteratively halve the granularity
+  // of the older half of the time series. Once sparsification reaches
+  // the midpoint of the time series, it begins again from the
+  // beginning.
+  //
+  // Sparsification results in the following granularity over time:
+  // Initial: | ------------------------ A -------------------- |
+  // Stage 1: | ------- 1/2 A ---------- | -------- B --------- |
+  // Stage 2: | -- 1/4A --- | -- 1/2B -- | -------- C --------- |
+  // Stage 3: | 1/8A | 1/4B | -- 1/2C -- | -------- D --------- |
+  //     ...
+  //
+  // Each stage halves the size and granularity of time series prior
+  // to sparsifying.
+  void sparsify()
+  {
+    // We remove every other element up to the halfway point of the
+    // time series, until we're within the capacity. If we reach the
+    // half-way point of the time series, we'll start another
+    // sparsification cycle from the beginning, for example:
+    //
+    // next             Time series with a capacity of 7.
+    //   v              Initial state with 7 entries
+    // 0 1 2 3 4 5 6
+    //
+    //   next           Insert '7'.
+    //     v            Capacity is exceeded, we remove '1' and
+    // 0 2 3 4 5 6 7    advance to remove '3' next.
+    //
+    //     next         Insert '8'.
+    //       v          Capacity is exceeded, we remove '3' and
+    // 0 2 4 5 6 7 8    advance to remove '5' next.
+    //
+    // next             Insert '9'.
+    //   v              Capacity is exceeded, we remove '5' and now
+    // 0 2 4 6 7 8 9    '7' is past the halfway mark, so we will reset
+    //                  reset to the beginning and consider '2'.
+
+    while (values.size() > capacity) {
+      // If the index is uninitialized, or past the half-way point,
+      // we set it back to the beginning.
+      if (index.isNone() || index.get() > values.size() / 2) {
+        // The second element is the initial deletion candidate.
+        next = values.begin();
+        ++next;
+        index = 1;
+      }
+
+#if __cplusplus >= 201103L
+      next = values.erase(next);
+      next++; // Skip one element.
+#else
+      // Store the next deletion candidate.
+      typename std::map<Time, T>::iterator copy = next;
+      copy++; // Skip every other element.
+      copy++; // Next deletion candidate.
+
+      values.erase(next);
+      next = copy;
+#endif
+      index = index.get() + 1;
+    }
+  }
+
+  // Non-const for assignability.
+  Duration window;
+  size_t capacity;
+
+  // We use a map instead of a hashmap to store the values because
+  // that way we can retrieve a series in sorted order efficiently.
+  std::map<Time, T> values;
+
+  // Next deletion candidate. We store both the iterator and index.
+  // The index is None initially, and whenever a value is appended
+  // out-of-order. This means 'next' is only valid when 'index' is
+  // Some.
+  typename std::map<Time, T>::iterator next;
+  Option<size_t> index;
+};
+
+} // namespace process {
+
+#endif // __PROCESS_TIMESERIES_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 936f118..bc7a1c5 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -593,6 +593,7 @@ ThreadLocal<Executor>* _executor_ = new ThreadLocal<Executor>();
 
 const Duration LIBPROCESS_STATISTICS_WINDOW = Days(1);
 
+
 // We namespace the clock related variables to keep them well
 // named. In the future we'll probably want to associate a clock with
 // a specific ProcessManager/SocketManager instance pair, so this will

http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/src/statistics.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/statistics.cpp b/3rdparty/libprocess/src/statistics.cpp
index 07990d1..75aac40 100644
--- a/3rdparty/libprocess/src/statistics.cpp
+++ b/3rdparty/libprocess/src/statistics.cpp
@@ -4,7 +4,6 @@
 #include <list>
 #include <map>
 #include <string>
-#include <utility>
 #include <vector>
 
 #include <process/clock.hpp>
@@ -42,46 +41,27 @@ namespace process {
 // This is initialized by process::initialize().
 Statistics* statistics = NULL;
 
-// TODO(bmahler): Move time series related logic into this struct.
-// TODO(bmahler): Investigate using Google's sparse_hash_map.
-// Also investigate using Google's btree implementation.
-// This provides better insertion and lookup performance for large
-// containers. This _should_ also provide significant memory
-// savings, especially since:
-//   1. Our insertion order will mostly be in sorted order.
-//   2. Our keys (Seconds) have efficient comparison operators.
-// See: http://code.google.com/p/cpp-btree/
-//      http://code.google.com/p/cpp-btree/wiki/UsageInstructions
-struct TimeSeries
-{
-  TimeSeries() : values(), archived(false) {}
-
-  // We use a map instead of a hashmap to store the values because
-  // that way we can retrieve a series in sorted order efficiently.
-  map<Time, double> values;
-  bool archived;
-};
+const Duration STATISTICS_TRUNCATION_INTERVAL = Minutes(5);
 
+// TODO(bmahler): Move these into timeseries.hpp header once we
+// can require gcc >= 4.2.1.
+const Duration TIME_SERIES_WINDOW = Weeks(2);
+const size_t TIME_SERIES_CAPACITY = 1000;
 
 class StatisticsProcess : public Process<StatisticsProcess>
 {
 public:
-  StatisticsProcess(const Duration& _window)
+  StatisticsProcess(const Duration& _window, size_t _capacity)
     : ProcessBase("statistics"),
-      window(_window) {}
+      window(_window),
+      capacity(_capacity) {}
 
   virtual ~StatisticsProcess() {}
 
   // Statistics implementation.
-  map<Time, double> timeseries(
+  TimeSeries<double> timeseries(
       const string& context,
-      const string& name,
-      const Option<Time>& start,
-      const Option<Time>& stop);
-
-  Option<double> get(const string& context, const string& name);
-
-  map<string, double> get(const string& context);
+      const string& name);
 
   void set(
       const string& context,
@@ -89,8 +69,6 @@ public:
       double value,
       const Time& time);
 
-  void archive(const string& context, const string& name);
-
   void increment(const string& context, const string& name);
 
   void decrement(const string& context, const string& name);
@@ -109,18 +87,9 @@ private:
   static const string SNAPSHOT_HELP;
   static const string SERIES_HELP;
 
-  // Removes values for the specified statistic that occurred outside
-  // the time series window.
-  // NOTE: We always ensure there is at least 1 value left for a statistic,
-  // unless it is archived!
-  // Returns true iff the time series is empty.
-  bool truncate(const string& context, const string& name);
-
   // Removes values for all statistics that occurred outside the time
-  // series window.
+  // series window. We always ensure 1 value remains.
   // NOTE: Runs periodically every STATISTICS_TRUNCATION_INTERVAL.
-  // NOTE: We always ensure there is at least 1 value left for a statistic,
-  // unless it is archived.
   void truncate();
 
   // Returns the a snapshot of all statistics in JSON.
@@ -130,9 +99,10 @@ private:
   Future<Response> series(const Request& request);
 
   const Duration window;
+  const size_t capacity;
 
   // This maps from {context: {name: TimeSeries } }.
-  hashmap<string, hashmap<string, TimeSeries> > statistics;
+  hashmap<string, hashmap<string, TimeSeries<double> > > statistics;
 };
 
 
@@ -162,58 +132,15 @@ const string StatisticsProcess::SNAPSHOT_HELP = HELP(
         ">        param=VALUE          Some description here"));
 
 
-map<Time, double> StatisticsProcess::timeseries(
+TimeSeries<double> StatisticsProcess::timeseries(
     const string& context,
-    const string& name,
-    const Option<Time>& start,
-    const Option<Time>& stop)
+    const string& name)
 {
   if (!statistics.contains(context) || !statistics[context].contains(name)) {
-    return map<Time, double>();
+    return TimeSeries<double>();
   }
 
-  const std::map<Time, double>& values =
-    statistics[context].find(name)->second.values;
-
-  map<Time, double>::const_iterator lower = values.lower_bound(start.isSome()
-      ? start.get() : Time::EPOCH);
-
-  map<Time, double>::const_iterator upper = values.upper_bound(stop.isSome()
-      ? stop.get() : Time::MAX);
-
-  return map<Time, double>(lower, upper);
-}
-
-
-Option<double> StatisticsProcess::get(const string& context, const string& name)
-{
-  if (!statistics.contains(context) ||
-      !statistics[context].contains(name) ||
-      statistics[context][name].values.empty()) {
-    return Option<double>::none();
-  } else {
-    return statistics[context][name].values.rbegin()->second;
-  }
-}
-
-
-map<string, double> StatisticsProcess::get(const string& context)
-{
-  map<string, double> results;
-
-  if (!statistics.contains(context)) {
-    return results;
-  }
-
-  foreachkey (const string& name, statistics[context]) {
-    const map<Time, double>& values = statistics[context][name].values;
-
-    if (!values.empty()) {
-      results[name] = values.rbegin()->second;
-    }
-  }
-
-  return results;
+  return statistics[context][name];
 }
 
 
@@ -223,25 +150,19 @@ void StatisticsProcess::set(
     double value,
     const Time& time)
 {
-  statistics[context][name].values[time] = value; // Update the raw value.
-  statistics[context][name].archived = false;     // Unarchive.
-
-  truncate(context, name);
-}
-
-
-void StatisticsProcess::archive(const string& context, const string& name)
-{
-  // Exclude the statistic from the snapshot.
-  statistics[context][name].archived = true;
+  if (!statistics[context].contains(name)) {
+    statistics[context][name] = TimeSeries<double>(window, capacity);
+  }
+  statistics[context][name].set(value, time);
 }
 
 
 void StatisticsProcess::increment(const string& context, const string& name)
 {
   double value = 0.0;
-  if (!statistics[context][name].values.empty()) {
-    value = statistics[context][name].values.rbegin()->second;
+  if (statistics[context].contains(name) &&
+      !statistics[context][name].empty()) {
+    value = statistics[context][name].latest().get().data;
   }
   set(context, name, value + 1.0, Clock::now());
 }
@@ -250,59 +171,19 @@ void StatisticsProcess::increment(const string& context, const string& name)
 void StatisticsProcess::decrement(const string& context, const string& name)
 {
   double value = 0.0;
-  if (!statistics[context][name].values.empty()) {
-    value = statistics[context][name].values.rbegin()->second;
+  if (statistics[context].contains(name) &&
+      !statistics[context][name].empty()) {
+    value = statistics[context][name].latest().get().data;
   }
   set(context, name, value - 1.0, Clock::now());
 }
 
 
-bool StatisticsProcess::truncate(const string& context, const string& name)
-{
-  CHECK(statistics.contains(context));
-  CHECK(statistics[context].contains(name));
-
-  if (statistics[context][name].values.empty()) {
-    return true; // No truncation is needed, the time series is already empty.
-  }
-
-  map<Time, double>::iterator start =
-    statistics[context][name].values.begin();
-
-  while ((Clock::now() - start->first) > window) {
-    // Always keep at least one value for a statistic, unless it's archived!
-    if (statistics[context][name].values.size() == 1) {
-      if (statistics[context][name].archived) {
-        statistics[context][name].values.clear();
-      }
-      break;
-    }
-
-    statistics[context][name].values.erase(start);
-    start = statistics[context][name].values.begin();
-  }
-
-  return statistics[context][name].values.empty();
-}
-
-
 void StatisticsProcess::truncate()
 {
-  hashmap<string, hashset<string> > empties;
-
   foreachkey (const string& context, statistics) {
     foreachkey (const string& name, statistics[context]) {
-      // Keep track of the emptied timeseries.
-      if (truncate(context, name)) {
-        empties[context].insert(name);
-      }
-    }
-  }
-
-  // Remove the empty timeseries.
-  foreachkey (const string& context, empties) {
-    foreach (const string& name, empties[context]) {
-      statistics[context].erase(name);
+      statistics[context][name].truncate();
     }
   }
 
@@ -319,12 +200,6 @@ Future<Response> StatisticsProcess::snapshot(const Request& request)
 
   foreachkey (const string& context, statistics) {
     foreachkey (const string& name, statistics[context]) {
-      // Exclude archived and empty time series.
-      if (statistics[context][name].archived ||
-          statistics[context][name].values.empty()) {
-        continue;
-      }
-
       // Skip statistics that don't match the query, if present.
       if (queryContext.isSome() && queryContext.get() != context) {
         continue;
@@ -332,14 +207,17 @@ Future<Response> StatisticsProcess::snapshot(const Request& request)
         continue;
       }
 
-      JSON::Object object;
-      object.values["context"] = context;
-      object.values["name"] = name;
-      object.values["time"] =
-        statistics[context][name].values.rbegin()->first.secs();
-      object.values["value"] =
-        statistics[context][name].values.rbegin()->second;
-      array.values.push_back(object);
+      const Option<TimeSeries<double>::Value>& value =
+        statistics[context][name].latest();
+
+      if (value.isSome()) {
+        JSON::Object object;
+        object.values["context"] = context;
+        object.values["name"] = name;
+        object.values["time"] = value.get().time.secs();
+        object.values["value"] = value.get().data;
+        array.values.push_back(object);
+      }
     }
   }
 
@@ -364,12 +242,12 @@ Future<Response> StatisticsProcess::series(const Request& request)
   if (request.query.get("start").isSome()) {
     Try<double> result = numify<double>(request.query.get("start").get());
     if (result.isError()) {
-      return BadRequest("Failed to parse 'start': " + result.error());
+      return BadRequest("Failed to parse 'start': " + result.error() + "\n.");
     }
 
     Try<Time> start_ = Time::create(result.get());
     if (start_.isError()) {
-      return BadRequest("Failed to parse 'start': " + start_.error());
+      return BadRequest("Failed to parse 'start': " + start_.error() + "\n.");
     }
     start = start_.get();
   }
@@ -377,25 +255,29 @@ Future<Response> StatisticsProcess::series(const Request& request)
   if (request.query.get("stop").isSome()) {
     Try<double> result = numify<double>(request.query.get("stop").get());
     if (result.isError()) {
-      return BadRequest("Failed to parse 'stop': " + result.error());
+      return BadRequest("Failed to parse 'stop': " + result.error() + "\n.");
     }
 
     Try<Time> stop_ = Time::create(result.get());
     if (stop_.isError()) {
-      return BadRequest("Failed to parse 'stop': " + stop_.error());
+      return BadRequest("Failed to parse 'stop': " + stop_.error() + "\n.");
     }
     stop = stop_.get();
   }
 
+  if (start.isSome() && stop.isSome() && start.get() > stop.get()) {
+    return BadRequest("Invalid query: 'start' must be less than 'stop'\n.");
+  }
+
   JSON::Array array;
 
-  const map<Time, double>& values =
-    timeseries(context.get(), name.get(), start, stop);
+  const vector<TimeSeries<double>::Value>& values =
+    timeseries(context.get(), name.get()).get(start, stop);
 
-  foreachpair (const Time& s, double value, values) {
+  foreach (const TimeSeries<double>::Value& value, values) {
     JSON::Object object;
-    object.values["time"] = s.secs();
-    object.values["value"] = value;
+    object.values["time"] = value.time.secs();
+    object.values["value"] = value.data;
     array.values.push_back(object);
   }
 
@@ -403,9 +285,9 @@ Future<Response> StatisticsProcess::series(const Request& request)
 }
 
 
-Statistics::Statistics(const Duration& window)
+Statistics::Statistics(const Duration& window, size_t capacity)
 {
-  process = new StatisticsProcess(window);
+  process = new StatisticsProcess(window, capacity);
   spawn(process);
 }
 
@@ -417,28 +299,11 @@ Statistics::~Statistics()
 }
 
 
-Future<map<Time, double> > Statistics::timeseries(
-    const string& context,
-    const string& name,
-    const Option<Time>& start,
-    const Option<Time>& stop)
-{
-  return dispatch(
-      process, &StatisticsProcess::timeseries, context, name, start, stop);
-}
-
-
-Future<Option<double> > Statistics::get(
+Future<TimeSeries<double> > Statistics::timeseries(
     const string& context,
     const string& name)
 {
-  return dispatch(process, &StatisticsProcess::get, context, name);
-}
-
-
-Future<map<string, double> > Statistics::get(const string& context)
-{
-  return dispatch(process, &StatisticsProcess::get, context);
+  return dispatch(process, &StatisticsProcess::timeseries, context, name);
 }
 
 
@@ -452,12 +317,6 @@ void Statistics::set(
 }
 
 
-void Statistics::archive(const string& context, const string& name)
-{
-  dispatch(process, &StatisticsProcess::archive, context, name);
-}
-
-
 void Statistics::increment(const string& context, const string& name)
 {
   dispatch(process, &StatisticsProcess::increment, context, name);

http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/src/tests/statistics_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/statistics_tests.cpp b/3rdparty/libprocess/src/tests/statistics_tests.cpp
index 1acf47c..1382653 100644
--- a/3rdparty/libprocess/src/tests/statistics_tests.cpp
+++ b/3rdparty/libprocess/src/tests/statistics_tests.cpp
@@ -1,18 +1,28 @@
 #include <gmock/gmock.h>
 
-#include <map>
-
 #include <process/clock.hpp>
 #include <process/future.hpp>
+#include <process/gmock.hpp>
 #include <process/gtest.hpp>
 #include <process/statistics.hpp>
 #include <process/time.hpp>
 
 #include <stout/duration.hpp>
+#include <stout/foreach.hpp>
+#include <stout/list.hpp>
 
 using namespace process;
 
-using std::map;
+
+// Overload for testing time series equality.
+bool operator == (const List<double>& list, const TimeSeries<double>& series)
+{
+  List<double> result;
+  foreach (const TimeSeries<double>::Value& value, series.get()) {
+    result.push_back(value.data);
+  }
+  return list == result;
+}
 
 
 TEST(Statistics, set)
@@ -26,94 +36,59 @@ TEST(Statistics, set)
   Time now = Clock::now();
   statistics.set("test", "statistic", 4.0, now);
 
-  Future<map<Time, double> > values =
+  Future<TimeSeries<double> > values =
     statistics.timeseries("test", "statistic");
 
   AWAIT_ASSERT_READY(values);
 
-  EXPECT_EQ(2, values.get().size());
+  EXPECT_EQ(2, values.get().get().size());
 
-  EXPECT_GE(Clock::now(), values.get().begin()->first);
-  EXPECT_DOUBLE_EQ(3.0, values.get().begin()->second);
+  EXPECT_GE(Clock::now(), values.get().get().begin()->time);
+  EXPECT_DOUBLE_EQ(3.0, values.get().get().begin()->data);
 
-  EXPECT_EQ(1, values.get().count(now));
-  EXPECT_DOUBLE_EQ(4.0, values.get()[now]);
+  EXPECT_EQ(List<double>(3.0, 4.0), values.get());
 }
 
 
-TEST(Statistics, truncate)
+TEST(Statistics, increment)
 {
-  Clock::pause();
-
-  Statistics statistics(Days(1));
-
-  statistics.set("test", "statistic", 3.0);
-
-  Future<map<Time, double> > values =
-    statistics.timeseries("test", "statistic");
+  Statistics statistics;
+  Future<TimeSeries<double> > values;
 
+  statistics.increment("test", "statistic");
+  values = statistics.timeseries("test", "statistic");
   AWAIT_ASSERT_READY(values);
-
-  EXPECT_EQ(1, values.get().size());
-  EXPECT_GE(Clock::now(), values.get().begin()->first);
-  EXPECT_DOUBLE_EQ(3.0, values.get().begin()->second);
-
-  Clock::advance(Days(1) + Seconds(1));
-  Clock::settle();
+  EXPECT_EQ(List<double>(1.0), values.get());
 
   statistics.increment("test", "statistic");
-
   values = statistics.timeseries("test", "statistic");
-
   AWAIT_ASSERT_READY(values);
+  EXPECT_EQ(List<double>(1.0, 2.0), values.get());
 
-  EXPECT_EQ(1, values.get().size());
-  EXPECT_GE(Clock::now(), values.get().begin()->first);
-  EXPECT_DOUBLE_EQ(4.0, values.get().begin()->second);
-
-  Clock::resume();
+  statistics.increment("test", "statistic");
+  values = statistics.timeseries("test", "statistic");
+  AWAIT_ASSERT_READY(values);
+  EXPECT_EQ(List<double>(1.0, 2.0, 3.0), values.get());
 }
 
 
-TEST(Statistics, archive)
+TEST(Statistics, decrement)
 {
-  Clock::pause();
-
-  Statistics statistics(Seconds(10));
-
-  Time now = Clock::now();
-  statistics.set("test", "statistic", 1.0, now);
-  statistics.set("test", "statistic", 2.0, Time(now + Seconds(1)));
+  Statistics statistics;
+  Future<TimeSeries<double> > values;
 
-  // Archive and ensure the following:
-  //   1. The statistic will no longer be part of the snapshot.
-  //   2. However, the time series will be retained until the window
-  //      expiration.
-  statistics.archive("test", "statistic");
-
-  // TODO(bmahler): Wait for JSON parsing to verify number 1.
-
-  // Ensure the raw time series is present.
-  Future<map<Time, double> > values =
-    statistics.timeseries("test", "statistic");
+  statistics.decrement("test", "statistic");
+  values = statistics.timeseries("test", "statistic");
   AWAIT_ASSERT_READY(values);
-  EXPECT_FALSE(values.get().empty());
+  EXPECT_EQ(List<double>(-1.0), values.get());
 
-  // Expire the window and ensure the statistics were removed.
-  Clock::advance(STATISTICS_TRUNCATION_INTERVAL);
-  Clock::settle();
-
-  // Ensure the raw statistics are gone.
+  statistics.decrement("test", "statistic");
   values = statistics.timeseries("test", "statistic");
   AWAIT_ASSERT_READY(values);
-  EXPECT_TRUE(values.get().empty());
-
-  // Reactivate the statistic, and make sure we can retrieve it.
-  statistics.set("test", "statistic", 1.0, now);
+  EXPECT_EQ(List<double>(-1.0, -2.0), values.get());
 
+  statistics.decrement("test", "statistic");
   values = statistics.timeseries("test", "statistic");
   AWAIT_ASSERT_READY(values);
-  EXPECT_FALSE(values.get().empty());
-
-  Clock::resume();
+  EXPECT_EQ(List<double>(-1.0, -2.0, -3.0), values.get());
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/157e0118/3rdparty/libprocess/src/tests/timeseries_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/timeseries_tests.cpp b/3rdparty/libprocess/src/tests/timeseries_tests.cpp
new file mode 100644
index 0000000..0fa966e
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/timeseries_tests.cpp
@@ -0,0 +1,191 @@
+#include <gmock/gmock.h>
+
+#include <process/clock.hpp>
+#include <process/timeseries.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/gtest.hpp>
+#include <stout/list.hpp>
+
+using namespace process;
+
+
+// Overload for testing time series equality.
+bool operator == (const List<int>& list, const TimeSeries<int>& series)
+{
+  List<int> result;
+  foreach (const TimeSeries<int>::Value& value, series.get()) {
+    result.push_back(value.data);
+  }
+  return list == result;
+}
+
+
+TEST(TimeSeries, set)
+{
+  TimeSeries<int> series;
+
+  ASSERT_TRUE(series.empty());
+
+  series.set(1);
+
+  ASSERT_FALSE(series.empty());
+
+  const Option<TimeSeries<int>::Value> latest = series.latest();
+
+  ASSERT_SOME(latest);
+  ASSERT_EQ(1, latest.get().data);
+}
+
+TEST(TimeSeries, sparsify)
+{
+  // Create a time series and fill it to its capacity.
+  TimeSeries<int> series(Duration::max(), 10);
+
+  series.set(0);
+  series.set(1);
+  series.set(2);
+  series.set(3);
+  series.set(4);
+  series.set(5);
+  series.set(6);
+  series.set(7);
+  series.set(8);
+  series.set(9);
+
+  ASSERT_EQ(List<int>(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), series);
+
+  // Verify the sparsification pattern.
+  series.set(10);
+  ASSERT_EQ(List<int>(0, 2, 3, 4, 5, 6, 7, 8, 9, 10), series);
+
+  series.set(11);
+  ASSERT_EQ(List<int>(0, 2, 4, 5, 6, 7, 8, 9, 10, 11), series);
+
+  series.set(12);
+  ASSERT_EQ(List<int>(0, 2, 4, 6, 7, 8, 9, 10, 11, 12), series);
+
+  series.set(13);
+  ASSERT_EQ(List<int>(0, 2, 4, 6, 8, 9, 10, 11, 12, 13), series);
+
+  series.set(14);
+  ASSERT_EQ(List<int>(0, 2, 4, 6, 8, 10, 11, 12, 13, 14), series);
+
+  // Now we expect a new round of sparsification to occur, starting
+  // again from the beginning.
+  series.set(15);
+  ASSERT_EQ(List<int>(0, 4, 6, 8, 10, 11, 12, 13, 14, 15), series);
+
+  series.set(16);
+  ASSERT_EQ(List<int>(0, 4, 8, 10, 11, 12, 13, 14, 15, 16), series);
+
+  series.set(17);
+  ASSERT_EQ(List<int>(0, 4, 8, 11, 12, 13, 14, 15, 16, 17), series);
+
+  series.set(18);
+  ASSERT_EQ(List<int>(0, 4, 8, 11, 13, 14, 15, 16, 17, 18), series);
+
+  series.set(19);
+  ASSERT_EQ(List<int>(0, 4, 8, 11, 13, 15, 16, 17, 18, 19), series);
+}
+
+
+TEST(TimeSeries, truncate)
+{
+  // Test simple truncation first.
+  Clock::pause();
+  Time now = Clock::now();
+
+  // Create a time series and fill it to its capacity.
+  TimeSeries<int> series(Seconds(10), 10);
+
+  series.set(0, now);
+  series.set(1, now + Seconds(1));
+  series.set(2, now + Seconds(2));
+  series.set(3, now + Seconds(3));
+  series.set(4, now + Seconds(4));
+  series.set(5, now + Seconds(5));
+  series.set(6, now + Seconds(6));
+  series.set(7, now + Seconds(7));
+  series.set(8, now + Seconds(8));
+  series.set(9, now + Seconds(9));
+
+  ASSERT_EQ(List<int>(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), series);
+
+  // Cause the first 6 tasks to be truncated from the window.
+  Clock::advance(Seconds(10 + 6));
+  series.set(10, now + Seconds(10));
+
+  ASSERT_EQ(List<int>(7, 8, 9, 10), series);
+
+  Clock::resume();
+
+
+  // Now test truncation in the face of sparsification.
+  Clock::pause();
+  now = Clock::now();
+
+  series = TimeSeries<int>(Seconds(10), 10);
+
+  series.set(0, now);
+  series.set(1, now + Seconds(1));
+  series.set(2, now + Seconds(2));
+  series.set(3, now + Seconds(3));
+  series.set(4, now + Seconds(4));
+  series.set(5, now + Seconds(5));
+  series.set(6, now + Seconds(6));
+  series.set(7, now + Seconds(7));
+  series.set(8, now + Seconds(8));
+  series.set(9, now + Seconds(9));
+
+  ASSERT_EQ(List<int>(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), series);
+
+  // Move the sparsification candidate forward to ensure sparsification
+  // is correct after a truncation occurs.
+  series.set(10, now + Seconds(10));
+  ASSERT_EQ(List<int>(0, 2, 3, 4, 5, 6, 7, 8, 9, 10), series);
+
+  series.set(11, now + Seconds(11));
+  ASSERT_EQ(List<int>(0, 2, 4, 5, 6, 7, 8, 9, 10, 11), series);
+
+  series.set(12, now + Seconds(12));
+  ASSERT_EQ(List<int>(0, 2, 4, 6, 7, 8, 9, 10, 11, 12), series);
+
+  // Now the next sparsification candidate is '7'. First, we will
+  // truncate exluding '7' and ensure sparsification proceeds as
+  // expected.
+  Clock::advance(Seconds(10 + 2));
+  series.truncate();
+  ASSERT_EQ(List<int>(4, 6, 7, 8, 9, 10, 11, 12), series);
+
+  // Add 2 more items to return to capacity.
+  series.set(13, now + Seconds(13));
+  series.set(14, now + Seconds(14));
+  ASSERT_EQ(List<int>(4, 6, 7, 8, 9, 10, 11, 12, 13, 14), series);
+
+  // Now cause the time series to exceed capacity and ensure we
+  // correctly remove '7'.
+  series.set(15, now + Seconds(15));
+  ASSERT_EQ(List<int>(4, 6, 8, 9, 10, 11, 12, 13, 14, 15), series);
+
+  // Finally, let's truncate into the next sparsification candidate
+  // '9', and ensure sparsification is reset.
+  Clock::advance(Seconds(7)); // 2 + 7 = 9.
+  series.truncate();
+  ASSERT_EQ(List<int>(10, 11, 12, 13, 14, 15), series);
+
+  // Get back to capacity and ensure sparsification starts from the
+  // beginning.
+  series.set(16, now + Seconds(16));
+  series.set(17, now + Seconds(17));
+  series.set(18, now + Seconds(18));
+  series.set(19, now + Seconds(19));
+  ASSERT_EQ(List<int>(10, 11, 12, 13, 14, 15, 16, 17, 18, 19), series);
+
+  // Did we sparsify from the beginning?
+  series.set(20, now + Seconds(20));
+  ASSERT_EQ(List<int>(10, 12, 13, 14, 15, 16, 17, 18, 19, 20), series);
+
+  // Done!
+  Clock::resume();
+}