You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2024/01/10 21:37:43 UTC
(datasketches-cpp) branch tdigest updated: implemented merge
This is an automated email from the ASF dual-hosted git repository.
alsay pushed a commit to branch tdigest
in repository https://gitbox.apache.org/repos/asf/datasketches-cpp.git
The following commit(s) were added to refs/heads/tdigest by this push:
new 1a9667b implemented merge
1a9667b is described below
commit 1a9667ba6d01f98b7adf6f08ea9787d36b8222c8
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Wed Jan 10 13:37:33 2024 -0800
implemented merge
---
tdigest/include/tdigest.hpp | 2 +-
tdigest/include/tdigest_impl.hpp | 39 ++++++++++++++++++++++++++-------------
tdigest/test/tdigest_test.cpp | 39 +++++++++++++++++++++++++++++++++++++++
3 files changed, 66 insertions(+), 14 deletions(-)
diff --git a/tdigest/include/tdigest.hpp b/tdigest/include/tdigest.hpp
index 8ebd13c..1880366 100644
--- a/tdigest/include/tdigest.hpp
+++ b/tdigest/include/tdigest.hpp
@@ -192,7 +192,7 @@ private:
void merge_new_values();
void merge_new_values(bool force, uint16_t k);
- void merge_new_values(vector_centroid& centroids, uint64_t weight, uint16_t k, bool reverse);
+ void merge_new_values(uint16_t k);
};
} /* namespace datasketches */
diff --git a/tdigest/include/tdigest_impl.hpp b/tdigest/include/tdigest_impl.hpp
index 5f57e85..b6f639c 100644
--- a/tdigest/include/tdigest_impl.hpp
+++ b/tdigest/include/tdigest_impl.hpp
@@ -55,6 +55,8 @@ buffered_weight_(0)
centroids_capacity_ = internal_k_ + fudge;
}
if (buffer_capacity_ < 2 * centroids_capacity_) buffer_capacity_ = 2 * centroids_capacity_;
+ centroids_.reserve(centroids_capacity_);
+ buffer_.reserve(buffer_capacity_);
}
template<typename T, typename A>
@@ -69,6 +71,18 @@ void tdigest<T, A>::update(T value) {
template<typename T, typename A>
void tdigest<T, A>::merge(tdigest& other) {
+ if (other.is_empty()) return;
+ size_t num = buffer_.size() + centroids_.size() + other.buffer_.size() + other.centroids_.size();
+ buffer_.reserve(num);
+ std::copy(other.buffer_.begin(), other.buffer_.end(), std::back_inserter(buffer_));
+ std::copy(other.centroids_.begin(), other.centroids_.end(), std::back_inserter(buffer_));
+ buffered_weight_ += other.get_total_weight();
+ if (num > buffer_capacity_) {
+ merge_new_values(internal_k_);
+ } else {
+ min_ = std::min(min_, other.get_min_value());
+ max_ = std::max(max_, other.get_max_value());
+ }
}
template<typename T, typename A>
@@ -205,28 +219,24 @@ void tdigest<T, A>::merge_new_values() {
template<typename T, typename A>
void tdigest<T, A>::merge_new_values(bool force, uint16_t k) {
if (total_weight_ == 0 && buffered_weight_ == 0) return;
- if (force || buffered_weight_ > 0) {
- merge_new_values(buffer_, buffered_weight_, k, USE_ALTERNATING_SORT & (merge_count_ & 1));
- ++merge_count_;
- buffer_.clear();
- buffered_weight_ = 0;
- }
+ if (force || buffered_weight_ > 0) merge_new_values(k);
}
template<typename T, typename A>
-void tdigest<T, A>::merge_new_values(vector_centroid& incoming_centroids, uint64_t weight, uint16_t k, bool reverse) {
- for (const auto& centroid: centroids_) incoming_centroids.push_back(centroid);
+void tdigest<T, A>::merge_new_values(uint16_t k) {
+ const bool reverse = USE_ALTERNATING_SORT & (merge_count_ & 1);
+ for (const auto& centroid: centroids_) buffer_.push_back(centroid);
centroids_.clear();
- std::stable_sort(incoming_centroids.begin(), incoming_centroids.end(), centroid_cmp(reverse));
- total_weight_ += weight;
- auto it = incoming_centroids.begin();
+ std::stable_sort(buffer_.begin(), buffer_.end(), centroid_cmp(reverse));
+ total_weight_ += buffered_weight_;
+ auto it = buffer_.begin();
centroids_.push_back(*it);
++it;
double weight_so_far = 0;
const double normalizer = scale_function().normalizer(k, total_weight_);
double k1 = scale_function().k(0, normalizer);
double w_limit = total_weight_ * scale_function().q(k1 + 1, normalizer);
- while (it != incoming_centroids.end()) {
+ while (it != buffer_.end()) {
const double proposed_weight = centroids_.back().get_weight() + it->get_weight();
const double projected_weight = weight_so_far + proposed_weight;
bool add_this;
@@ -237,7 +247,7 @@ void tdigest<T, A>::merge_new_values(vector_centroid& incoming_centroids, uint64
} else {
add_this = projected_weight <= w_limit;
}
- if (std::distance(incoming_centroids.begin(), it) == 1 || std::distance(incoming_centroids.end(), it) == 1) {
+ if (std::distance(buffer_.begin(), it) == 1 || std::distance(buffer_.end(), it) == 1) {
add_this = false;
}
if (add_this) {
@@ -257,6 +267,9 @@ void tdigest<T, A>::merge_new_values(vector_centroid& incoming_centroids, uint64
min_ = std::min(min_, centroids_.front().get_mean());
max_ = std::max(max_, centroids_.back().get_mean());
}
+ ++merge_count_;
+ buffer_.clear();
+ buffered_weight_ = 0;
}
} /* namespace datasketches */
diff --git a/tdigest/test/tdigest_test.cpp b/tdigest/test/tdigest_test.cpp
index e90faa7..08fbe80 100644
--- a/tdigest/test/tdigest_test.cpp
+++ b/tdigest/test/tdigest_test.cpp
@@ -108,4 +108,43 @@ TEST_CASE("rank - repeated block", "[tdigest]") {
REQUIRE(td.get_rank(3.01) == 1);
}
+TEST_CASE("merge small", "[tdigest]") {
+ tdigest_double td1(10);
+ td1.update(1);
+ td1.update(2);
+ tdigest_double td2(10);
+ td2.update(2);
+ td2.update(3);
+ td1.merge(td2);
+ REQUIRE(td1.get_min_value() == 1);
+ REQUIRE(td1.get_max_value() == 3);
+ REQUIRE(td1.get_total_weight() == 4);
+ REQUIRE(td1.get_rank(0.99) == 0);
+ REQUIRE(td1.get_rank(1) == 0.125);
+ REQUIRE(td1.get_rank(2) == 0.5);
+ REQUIRE(td1.get_rank(3) == 0.875);
+ REQUIRE(td1.get_rank(3.01) == 1);
+}
+
+TEST_CASE("merge large", "[tdigest]") {
+ const size_t n = 10000;
+ tdigest_double td1(100);
+ tdigest_double td2(100);
+ for (size_t i = 0; i < n / 2; ++i) {
+ td1.update(i);
+ td2.update(n / 2 + i);
+ }
+ td1.merge(td2);
+// td1.compress();
+// std::cout << td1.to_string(true);
+ REQUIRE(td1.get_total_weight() == n);
+ REQUIRE(td1.get_min_value() == 0);
+ REQUIRE(td1.get_max_value() == n - 1);
+ REQUIRE(td1.get_rank(0) == Approx(0).margin(0.0001));
+ REQUIRE(td1.get_rank(n / 4) == Approx(0.25).margin(0.0001));
+ REQUIRE(td1.get_rank(n / 2) == Approx(0.5).margin(0.0001));
+ REQUIRE(td1.get_rank(n * 3 / 4) == Approx(0.75).margin(0.0001));
+ REQUIRE(td1.get_rank(n) == 1);
+}
+
} /* namespace datasketches */
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org