You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2024/02/13 23:15:23 UTC

(datasketches-cpp) branch tdigest updated: better naming, no forced compression

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch tdigest
in repository https://gitbox.apache.org/repos/asf/datasketches-cpp.git


The following commit(s) were added to refs/heads/tdigest by this push:
     new 9bfe6e6  better naming, no forced compression
9bfe6e6 is described below

commit 9bfe6e6b761936f8eac3641e7ef1397692681a54
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Tue Feb 13 15:15:13 2024 -0800

    better naming, no forced compression
---
 tdigest/include/tdigest.hpp      |  15 +++--
 tdigest/include/tdigest_impl.hpp | 117 +++++++++++++++++++--------------------
 2 files changed, 64 insertions(+), 68 deletions(-)

diff --git a/tdigest/include/tdigest.hpp b/tdigest/include/tdigest.hpp
index d60a396..0b7c54a 100644
--- a/tdigest/include/tdigest.hpp
+++ b/tdigest/include/tdigest.hpp
@@ -28,6 +28,8 @@
 namespace datasketches {
 
 // this is equivalent of K_2 (default) in the Java implementation mentioned below
+// Generates cluster sizes proportional to q*(1-q).
+// The use of a normalizing function results in a strictly bounded number of clusters no matter how many samples.
 struct scale_function {
   double k(double q, double normalizer) const {
     return limit([normalizer] (double q) { return std::log(q / (1 - q)) * normalizer; }, q, 1e-15, 1 - 1e-15);
@@ -99,12 +101,11 @@ public:
   using vector_bytes = std::vector<uint8_t, typename std::allocator_traits<Allocator>::template rebind_alloc<uint8_t>>;
 
   struct centroid_cmp {
-    centroid_cmp(bool reverse): reverse_(reverse) {}
+    centroid_cmp() {}
     bool operator()(const centroid& a, const centroid& b) const {
-      if (a.get_mean() < b.get_mean()) return !reverse_;
-      return reverse_;
+      if (a.get_mean() < b.get_mean()) return true;
+      return false;
     }
-    bool reverse_;
   };
 
   /**
@@ -218,7 +219,7 @@ private:
   T max_;
   size_t centroids_capacity_;
   vector_centroid centroids_;
-  uint64_t total_weight_;
+  uint64_t centroids_weight_;
   size_t buffer_capacity_;
   vector_centroid buffer_;
   uint64_t buffered_weight_;
@@ -236,9 +237,7 @@ private:
   // for deserialize
   tdigest(bool reverse_merge, uint16_t k, T min, T max, vector_centroid&& centroids, uint64_t total_weight_, const Allocator& allocator);
 
-  void merge_new_values();
-  void merge_new_values(bool force, uint16_t k);
-  void merge_new_values(uint16_t k);
+  void merge_buffered();
 
   static double weighted_average(double x1, double w1, double x2, double w2);
 
diff --git a/tdigest/include/tdigest_impl.hpp b/tdigest/include/tdigest_impl.hpp
index 1c3b345..7cfdfa7 100644
--- a/tdigest/include/tdigest_impl.hpp
+++ b/tdigest/include/tdigest_impl.hpp
@@ -36,7 +36,7 @@ tdigest(false, k, std::numeric_limits<T>::infinity(), -std::numeric_limits<T>::i
 template<typename T, typename A>
 void tdigest<T, A>::update(T value) {
   if (std::isnan(value)) return;
-  if (buffer_.size() >= buffer_capacity_ - centroids_.size()) merge_new_values();
+  if (buffer_.size() >= buffer_capacity_ - centroids_.size()) merge_buffered();
   buffer_.push_back(centroid(value, 1));
   ++buffered_weight_;
   min_ = std::min(min_, value);
@@ -52,7 +52,7 @@ void tdigest<T, A>::merge(tdigest& other) {
   std::copy(other.centroids_.begin(), other.centroids_.end(), std::back_inserter(buffer_));
   buffered_weight_ += other.get_total_weight();
   if (num > buffer_capacity_) {
-    merge_new_values(internal_k_);
+    merge_buffered();
   } else {
     min_ = std::min(min_, other.get_min_value());
     max_ = std::max(max_, other.get_max_value());
@@ -61,7 +61,7 @@ void tdigest<T, A>::merge(tdigest& other) {
 
 template<typename T, typename A>
 void tdigest<T, A>::compress() {
-  merge_new_values(true, k_);
+  merge_buffered();
 }
 
 template<typename T, typename A>
@@ -83,7 +83,7 @@ T tdigest<T, A>::get_max_value() const {
 
 template<typename T, typename A>
 uint64_t tdigest<T, A>::get_total_weight() const {
-  return total_weight_ + buffered_weight_;
+  return centroids_weight_ + buffered_weight_;
 }
 
 template<typename T, typename A>
@@ -95,13 +95,13 @@ double tdigest<T, A>::get_rank(T value) const {
   // one centroid and value == min_ == max_
   if ((centroids_.size() + buffer_.size()) == 1) return 0.5;
 
-  const_cast<tdigest*>(this)->merge_new_values(); // side effect
+  const_cast<tdigest*>(this)->merge_buffered(); // side effect
 
   // left tail
   const T first_mean = centroids_.front().get_mean();
   if (value < first_mean) {
     if (first_mean - min_ > 0) {
-      if (value == min_) return 0.5 / total_weight_;
+      if (value == min_) return 0.5 / centroids_weight_;
       return (1.0 + (value - min_) / (first_mean - min_) * (centroids_.front().get_weight() / 2.0 - 1.0)); // ?
     }
     return 0; // should never happen
@@ -111,15 +111,15 @@ double tdigest<T, A>::get_rank(T value) const {
   const T last_mean = centroids_.back().get_mean();
   if (value > last_mean) {
     if (max_ - last_mean > 0) {
-      if (value == max_) return 1.0 - 0.5 / total_weight_;
-        return 1 - ((1 + (max_ - value) / (max_ - last_mean) * (centroids_.back().get_weight() / 2.0 - 1.0)) / total_weight_); // ?
+      if (value == max_) return 1.0 - 0.5 / centroids_weight_;
+        return 1 - ((1 + (max_ - value) / (max_ - last_mean) * (centroids_.back().get_weight() / 2.0 - 1.0)) / centroids_weight_); // ?
     }
     return 1; // should never happen
   }
 
-  auto lower = std::lower_bound(centroids_.begin(), centroids_.end(), centroid(value, 1), centroid_cmp(false));
+  auto lower = std::lower_bound(centroids_.begin(), centroids_.end(), centroid(value, 1), centroid_cmp());
   if (lower == centroids_.end()) throw std::logic_error("lower == end in get_rank()");
-  auto upper = std::upper_bound(lower, centroids_.end(), centroid(value, 1), centroid_cmp(false));
+  auto upper = std::upper_bound(lower, centroids_.end(), centroid(value, 1), centroid_cmp());
   if (upper == centroids_.begin()) throw std::logic_error("upper == begin in get_rank()");
   if (value < lower->get_mean()) --lower;
   if (upper == centroids_.end() || (upper != centroids_.begin() && !((upper - 1)->get_mean() < value))) --upper;
@@ -138,9 +138,9 @@ double tdigest<T, A>::get_rank(T value) const {
   weight_delta -= lower->get_weight() / 2.0;
   weight_delta += upper->get_weight() / 2.0;
   if (upper->get_mean() - lower->get_mean() > 0) {
-    return (weight_below + weight_delta * (value - lower->get_mean()) / (upper->get_mean() - lower->get_mean())) / total_weight_;
+    return (weight_below + weight_delta * (value - lower->get_mean()) / (upper->get_mean() - lower->get_mean())) / centroids_weight_;
   }
-  return (weight_below + weight_delta / 2.0) / total_weight_;
+  return (weight_below + weight_delta / 2.0) / centroids_weight_;
 }
 
 template<typename T, typename A>
@@ -149,20 +149,20 @@ T tdigest<T, A>::get_quantile(double rank) const {
   if ((rank < 0.0) || (rank > 1.0)) {
     throw std::invalid_argument("Normalized rank cannot be less than 0 or greater than 1");
   }
-  const_cast<tdigest*>(this)->merge_new_values(); // side effect
+  const_cast<tdigest*>(this)->merge_buffered(); // side effect
   if (centroids_.size() == 1) return centroids_.front().get_mean();
 
   // at least 2 centroids
-  const double weight = rank * total_weight_;
+  const double weight = rank * centroids_weight_;
   if (weight < 1) return min_;
-  if (weight > total_weight_ - 1.0) return max_;
+  if (weight > centroids_weight_ - 1.0) return max_;
   const double first_weight = centroids_.front().get_weight();
   if (first_weight > 1 && weight < first_weight / 2.0) {
     return min_ + (weight - 1.0) / (first_weight / 2.0 - 1.0) * (centroids_.front().get_mean() - min_);
   }
   const double last_weight = centroids_.back().get_weight();
-  if (last_weight > 1 && total_weight_ - weight <= last_weight / 2.0) {
-    return max_ + (total_weight_ - weight - 1.0) / (last_weight / 2.0 - 1.0) * (max_ - centroids_.back().get_mean());
+  if (last_weight > 1 && centroids_weight_ - weight <= last_weight / 2.0) {
+    return max_ + (centroids_weight_ - weight - 1.0) / (last_weight / 2.0 - 1.0) * (max_ - centroids_.back().get_mean());
   }
 
   // interpolate between extremes
@@ -187,7 +187,7 @@ T tdigest<T, A>::get_quantile(double rank) const {
     }
     weight_so_far += dw;
   }
-  const double w1 = weight - total_weight_ - centroids_.back().get_weight() / 2.0;
+  const double w1 = weight - centroids_weight_ - centroids_.back().get_weight() / 2.0;
   const double w2 = centroids_.back().get_weight() / 2.0 - w1;
   return weighted_average(centroids_.back().get_weight(), w1, max_, w2);
 }
@@ -209,77 +209,76 @@ string<A> tdigest<T, A>::to_string(bool print_centroids) const {
   os << "   Buffered           : " << buffer_.size() << std::endl;
   os << "   Centroids capacity : " << centroids_capacity_ << std::endl;
   os << "   Buffer capacity    : " << buffer_capacity_ << std::endl;
-  os << "   Total Weight       : " << total_weight_ << std::endl;
+  os << "   Centroids Weight   : " << centroids_weight_ << std::endl;
   os << "   Buffered Weight    : " << buffered_weight_ << std::endl;
+  os << "   Total Weight       : " << get_total_weight() << std::endl;
+  os << "   Reverse Merge      : " << (reverse_merge_ ? "true" : "false") << std::endl;
   if (!is_empty()) {
     os << "   Min                : " << min_ << std::endl;
     os << "   Max                : " << max_ << std::endl;
   }
   os << "### End t-Digest summary" << std::endl;
   if (print_centroids) {
-    os << "Centroids:" << std::endl;
-    int i = 0;
-    for (auto centroid: centroids_) {
-      os << i << ": " << centroid.get_mean() << ", " << centroid.get_weight() << std::endl;
-      ++i;
+    if (centroids_.size() > 0) {
+      os << "Centroids:" << std::endl;
+      int i = 0;
+      for (const auto& c: centroids_) {
+        os << i++ << ": " << c.get_mean() << ", " << c.get_weight() << std::endl;
+      }
+    }
+    if (buffer_.size() > 0) {
+      os << "Buffer:" << std::endl;
+      int i = 0;
+      for (const auto& b: buffer_) {
+        os << i++ << ": " << b.get_mean() << ", " << b.get_weight() << std::endl;
+      }
     }
   }
   return string<A>(os.str().c_str(), allocator_);
 }
 
 template<typename T, typename A>
-void tdigest<T, A>::merge_new_values() {
-  merge_new_values(false, internal_k_);
-}
-
-template<typename T, typename A>
-void tdigest<T, A>::merge_new_values(bool force, uint16_t k) {
-  if (total_weight_ == 0 && buffered_weight_ == 0) return;
-  if (force || buffered_weight_ > 0) merge_new_values(k);
-}
-
-template<typename T, typename A>
-void tdigest<T, A>::merge_new_values(uint16_t k) {
-  const bool reverse = USE_ALTERNATING_SORT & reverse_merge_;
-  for (const auto& centroid: centroids_) buffer_.push_back(centroid);
+void tdigest<T, A>::merge_buffered() {
+  if (buffered_weight_ == 0) return;
+  const bool reverse = USE_ALTERNATING_SORT && reverse_merge_;
+  std::copy(centroids_.begin(), centroids_.end(), std::back_inserter(buffer_));
   centroids_.clear();
-  std::stable_sort(buffer_.begin(), buffer_.end(), centroid_cmp(reverse));
-  total_weight_ += buffered_weight_;
+  std::stable_sort(buffer_.begin(), buffer_.end(), centroid_cmp());
+  if (reverse) std::reverse(buffer_.begin(), buffer_.end());
+  centroids_weight_ += buffered_weight_;
   auto it = buffer_.begin();
   centroids_.push_back(*it);
   ++it;
   double weight_so_far = 0;
-  const double normalizer = scale_function().normalizer(k, total_weight_);
+  const double normalizer = scale_function().normalizer(internal_k_, centroids_weight_);
   double k1 = scale_function().k(0, normalizer);
-  double w_limit = total_weight_ * scale_function().q(k1 + 1, normalizer);
+  double w_limit = centroids_weight_ * scale_function().q(k1 + 1, normalizer);
   while (it != buffer_.end()) {
     const double proposed_weight = centroids_.back().get_weight() + it->get_weight();
-    const double projected_weight = weight_so_far + proposed_weight;
     bool add_this;
-    if (USE_WEIGHT_LIMIT) {
-      const double q0 = weight_so_far / total_weight_;
-      const double q2 = (weight_so_far + proposed_weight) / total_weight_;
-      add_this = proposed_weight <= total_weight_ * std::min(scale_function().max(q0, normalizer), scale_function().max(q2, normalizer));
-    } else {
-      add_this = projected_weight <= w_limit;
-    }
     if (std::distance(buffer_.begin(), it) == 1 || std::distance(buffer_.end(), it) == 1) {
       add_this = false;
+    } else if (USE_WEIGHT_LIMIT) {
+      const double q0 = weight_so_far / centroids_weight_;
+      const double q2 = (weight_so_far + proposed_weight) / centroids_weight_;
+      add_this = proposed_weight <= centroids_weight_ * std::min(scale_function().max(q0, normalizer), scale_function().max(q2, normalizer));
+    } else {
+      add_this = weight_so_far + proposed_weight <= w_limit;
     }
     if (add_this) {
       centroids_.back().add(*it);
     } else {
       weight_so_far += centroids_.back().get_weight();
       if (!USE_WEIGHT_LIMIT) {
-        k1 = scale_function().k(weight_so_far / total_weight_, normalizer);
-        w_limit = total_weight_ * scale_function().q(k1 + 1, normalizer);
+        k1 = scale_function().k(weight_so_far / centroids_weight_, normalizer);
+        w_limit = centroids_weight_ * scale_function().q(k1 + 1, normalizer);
       }
       centroids_.push_back(*it);
     }
     ++it;
   }
   if (reverse) std::reverse(centroids_.begin(), centroids_.end());
-  if (total_weight_ > 0) {
+  if (centroids_weight_ > 0) {
     min_ = std::min(min_, centroids_.front().get_mean());
     max_ = std::max(max_, centroids_.back().get_mean());
   }
@@ -295,7 +294,7 @@ double tdigest<T, A>::weighted_average(double x1, double w1, double x2, double w
 
 template<typename T, typename A>
 void tdigest<T, A>::serialize(std::ostream& os) const {
-  const_cast<tdigest*>(this)->merge_new_values(); // side effect
+  const_cast<tdigest*>(this)->merge_buffered(); // side effect
   write(os, is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NON_EMPTY);
   write(os, SERIAL_VERSION);
   write(os, SKETCH_TYPE);
@@ -319,7 +318,7 @@ void tdigest<T, A>::serialize(std::ostream& os) const {
 
 template<typename T, typename A>
 auto tdigest<T, A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
-  const_cast<tdigest*>(this)->merge_new_values(); // side effect
+  const_cast<tdigest*>(this)->merge_buffered(); // side effect
   const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NON_EMPTY;
   const size_t size_bytes = preamble_longs * sizeof(uint64_t) + sizeof(T) * 2 + sizeof(centroid) * centroids_.size();
   vector_bytes bytes(size_bytes, 0, allocator_);
@@ -556,7 +555,7 @@ min_(min),
 max_(max),
 centroids_capacity_(0),
 centroids_(std::move(centroids)),
-total_weight_(total_weight),
+centroids_weight_(total_weight),
 buffer_capacity_(0),
 buffer_(allocator),
 buffered_weight_(0)
@@ -572,10 +571,8 @@ buffered_weight_(0)
   double scale = std::max(1.0, static_cast<double>(buffer_capacity_) / centroids_capacity_ - 1.0);
   if (!USE_TWO_LEVEL_COMPRESSION) scale = 1;
   internal_k_ = std::ceil(std::sqrt(scale) * k_);
-  if (centroids_capacity_ < internal_k_ + fudge) {
-    centroids_capacity_ = internal_k_ + fudge;
-  }
-  if (buffer_capacity_ < 2 * centroids_capacity_) buffer_capacity_ = 2 * centroids_capacity_;
+  centroids_capacity_ = std::max(centroids_capacity_, internal_k_ + fudge);
+  buffer_capacity_ = std::max(buffer_capacity_, 2 * centroids_capacity_);
   centroids_.reserve(centroids_capacity_);
   buffer_.reserve(buffer_capacity_);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org