You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2024/01/10 21:37:43 UTC

(datasketches-cpp) branch tdigest updated: implemented merge

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch tdigest
in repository https://gitbox.apache.org/repos/asf/datasketches-cpp.git


The following commit(s) were added to refs/heads/tdigest by this push:
     new 1a9667b  implemented merge
1a9667b is described below

commit 1a9667ba6d01f98b7adf6f08ea9787d36b8222c8
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Wed Jan 10 13:37:33 2024 -0800

    implemented merge
---
 tdigest/include/tdigest.hpp      |  2 +-
 tdigest/include/tdigest_impl.hpp | 39 ++++++++++++++++++++++++++-------------
 tdigest/test/tdigest_test.cpp    | 39 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 66 insertions(+), 14 deletions(-)

diff --git a/tdigest/include/tdigest.hpp b/tdigest/include/tdigest.hpp
index 8ebd13c..1880366 100644
--- a/tdigest/include/tdigest.hpp
+++ b/tdigest/include/tdigest.hpp
@@ -192,7 +192,7 @@ private:
 
   void merge_new_values();
   void merge_new_values(bool force, uint16_t k);
-  void merge_new_values(vector_centroid& centroids, uint64_t weight, uint16_t k, bool reverse);
+  void merge_new_values(uint16_t k);
 };
 
 } /* namespace datasketches */
diff --git a/tdigest/include/tdigest_impl.hpp b/tdigest/include/tdigest_impl.hpp
index 5f57e85..b6f639c 100644
--- a/tdigest/include/tdigest_impl.hpp
+++ b/tdigest/include/tdigest_impl.hpp
@@ -55,6 +55,8 @@ buffered_weight_(0)
     centroids_capacity_ = internal_k_ + fudge;
   }
   if (buffer_capacity_ < 2 * centroids_capacity_) buffer_capacity_ = 2 * centroids_capacity_;
+  centroids_.reserve(centroids_capacity_);
+  buffer_.reserve(buffer_capacity_);
 }
 
 template<typename T, typename A>
@@ -69,6 +71,18 @@ void tdigest<T, A>::update(T value) {
 
 template<typename T, typename A>
 void tdigest<T, A>::merge(tdigest& other) {
+  if (other.is_empty()) return;
+  size_t num = buffer_.size() + centroids_.size() + other.buffer_.size() + other.centroids_.size();
+  buffer_.reserve(num);
+  std::copy(other.buffer_.begin(), other.buffer_.end(), std::back_inserter(buffer_));
+  std::copy(other.centroids_.begin(), other.centroids_.end(), std::back_inserter(buffer_));
+  buffered_weight_ += other.get_total_weight();
+  if (num > buffer_capacity_) {
+    merge_new_values(internal_k_);
+  } else {
+    min_ = std::min(min_, other.get_min_value());
+    max_ = std::max(max_, other.get_max_value());
+  }
 }
 
 template<typename T, typename A>
@@ -205,28 +219,24 @@ void tdigest<T, A>::merge_new_values() {
 template<typename T, typename A>
 void tdigest<T, A>::merge_new_values(bool force, uint16_t k) {
   if (total_weight_ == 0 && buffered_weight_ == 0) return;
-  if (force || buffered_weight_ > 0) {
-    merge_new_values(buffer_, buffered_weight_, k, USE_ALTERNATING_SORT & (merge_count_ & 1));
-    ++merge_count_;
-    buffer_.clear();
-    buffered_weight_ = 0;
-  }
+  if (force || buffered_weight_ > 0) merge_new_values(k);
 }
 
 template<typename T, typename A>
-void tdigest<T, A>::merge_new_values(vector_centroid& incoming_centroids, uint64_t weight, uint16_t k, bool reverse) {
-  for (const auto& centroid: centroids_) incoming_centroids.push_back(centroid);
+void tdigest<T, A>::merge_new_values(uint16_t k) {
+  const bool reverse = USE_ALTERNATING_SORT & (merge_count_ & 1);
+  for (const auto& centroid: centroids_) buffer_.push_back(centroid);
   centroids_.clear();
-  std::stable_sort(incoming_centroids.begin(), incoming_centroids.end(), centroid_cmp(reverse));
-  total_weight_ += weight;
-  auto it = incoming_centroids.begin();
+  std::stable_sort(buffer_.begin(), buffer_.end(), centroid_cmp(reverse));
+  total_weight_ += buffered_weight_;
+  auto it = buffer_.begin();
   centroids_.push_back(*it);
   ++it;
   double weight_so_far = 0;
   const double normalizer = scale_function().normalizer(k, total_weight_);
   double k1 = scale_function().k(0, normalizer);
   double w_limit = total_weight_ * scale_function().q(k1 + 1, normalizer);
-  while (it != incoming_centroids.end()) {
+  while (it != buffer_.end()) {
     const double proposed_weight = centroids_.back().get_weight() + it->get_weight();
     const double projected_weight = weight_so_far + proposed_weight;
     bool add_this;
@@ -237,7 +247,7 @@ void tdigest<T, A>::merge_new_values(vector_centroid& incoming_centroids, uint64
     } else {
       add_this = projected_weight <= w_limit;
     }
-    if (std::distance(incoming_centroids.begin(), it) == 1 || std::distance(incoming_centroids.end(), it) == 1) {
+    if (std::distance(buffer_.begin(), it) == 1 || std::distance(buffer_.end(), it) == 1) {
       add_this = false;
     }
     if (add_this) {
@@ -257,6 +267,9 @@ void tdigest<T, A>::merge_new_values(vector_centroid& incoming_centroids, uint64
     min_ = std::min(min_, centroids_.front().get_mean());
     max_ = std::max(max_, centroids_.back().get_mean());
   }
+  ++merge_count_;
+  buffer_.clear();
+  buffered_weight_ = 0;
 }
 
 } /* namespace datasketches */
diff --git a/tdigest/test/tdigest_test.cpp b/tdigest/test/tdigest_test.cpp
index e90faa7..08fbe80 100644
--- a/tdigest/test/tdigest_test.cpp
+++ b/tdigest/test/tdigest_test.cpp
@@ -108,4 +108,43 @@ TEST_CASE("rank - repeated block", "[tdigest]") {
   REQUIRE(td.get_rank(3.01) == 1);
 }
 
+TEST_CASE("merge small", "[tdigest]") {
+  tdigest_double td1(10);
+  td1.update(1);
+  td1.update(2);
+  tdigest_double td2(10);
+  td2.update(2);
+  td2.update(3);
+  td1.merge(td2);
+  REQUIRE(td1.get_min_value() == 1);
+  REQUIRE(td1.get_max_value() == 3);
+  REQUIRE(td1.get_total_weight() == 4);
+  REQUIRE(td1.get_rank(0.99) == 0);
+  REQUIRE(td1.get_rank(1) == 0.125);
+  REQUIRE(td1.get_rank(2) == 0.5);
+  REQUIRE(td1.get_rank(3) == 0.875);
+  REQUIRE(td1.get_rank(3.01) == 1);
+}
+
+TEST_CASE("merge large", "[tdigest]") {
+  const size_t n = 10000;
+  tdigest_double td1(100);
+  tdigest_double td2(100);
+  for (size_t i = 0; i < n / 2; ++i) {
+    td1.update(i);
+    td2.update(n / 2 + i);
+  }
+  td1.merge(td2);
+//  td1.compress();
+//  std::cout << td1.to_string(true);
+  REQUIRE(td1.get_total_weight() == n);
+  REQUIRE(td1.get_min_value() == 0);
+  REQUIRE(td1.get_max_value() == n - 1);
+  REQUIRE(td1.get_rank(0) == Approx(0).margin(0.0001));
+  REQUIRE(td1.get_rank(n / 4) == Approx(0.25).margin(0.0001));
+  REQUIRE(td1.get_rank(n / 2) == Approx(0.5).margin(0.0001));
+  REQUIRE(td1.get_rank(n * 3 / 4) == Approx(0.75).margin(0.0001));
+  REQUIRE(td1.get_rank(n) == 1);
+}
+
 } /* namespace datasketches */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org