You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by jm...@apache.org on 2022/04/20 23:27:33 UTC
[datasketches-cpp] branch quantiles updated: incomplete merge implementation
This is an automated email from the ASF dual-hosted git repository.
jmalkin pushed a commit to branch quantiles
in repository https://gitbox.apache.org/repos/asf/datasketches-cpp.git
The following commit(s) were added to refs/heads/quantiles by this push:
new 06331f5 incomplete merge implementation
06331f5 is described below
commit 06331f5120763644e3da1a105044fa7e651bf876
Author: Jon <jm...@apache.org>
AuthorDate: Wed Apr 20 16:25:24 2022 -0700
incomplete merge implementation
---
common/include/common_defs.hpp | 7 ++
common/test/test_type.hpp | 2 +
quantiles/include/quantiles_sketch.hpp | 4 +
quantiles/include/quantiles_sketch_impl.hpp | 141 +++++++++++++++++++++++++++-
quantiles/test/CMakeLists.txt | 5 +-
quantiles/test/quantiles_sketch_test.cpp | 27 ++++--
sampling/include/var_opt_sketch_impl.hpp | 10 --
7 files changed, 172 insertions(+), 24 deletions(-)
diff --git a/common/include/common_defs.hpp b/common/include/common_defs.hpp
index 1c3a6e0..a1ab6cf 100644
--- a/common/include/common_defs.hpp
+++ b/common/include/common_defs.hpp
@@ -40,6 +40,13 @@ template<typename A> using string = std::basic_string<char, std::char_traits<cha
static std::independent_bits_engine<std::mt19937, 1, uint32_t>
random_bit(static_cast<uint32_t>(std::chrono::system_clock::now().time_since_epoch().count()));
+// common random declarations
+namespace random_utils {
+ static std::random_device rd; // possibly unsafe in MinGW with GCC < 9.2
+ static std::mt19937_64 rand(rd());
+ static std::uniform_real_distribution<> next_double(0.0, 1.0);
+}
+
// utility function to hide unused compiler warning
// usually has no additional cost
diff --git a/common/test/test_type.hpp b/common/test/test_type.hpp
index 18be598..dacb370 100644
--- a/common/test/test_type.hpp
+++ b/common/test/test_type.hpp
@@ -20,7 +20,9 @@
#ifndef CLASS_TEST_TYPE_HPP_
#define CLASS_TEST_TYPE_HPP_
+#include <cstring>
#include <iostream>
+#include "memory_operations.hpp"
namespace datasketches {
diff --git a/quantiles/include/quantiles_sketch.hpp b/quantiles/include/quantiles_sketch.hpp
index ec3b475..d033cb9 100644
--- a/quantiles/include/quantiles_sketch.hpp
+++ b/quantiles/include/quantiles_sketch.hpp
@@ -494,6 +494,7 @@ private:
Level& buf_size_2k, bool apply_as_update,
quantiles_sketch<T,C,A>& sketch);
static void zip_buffer(Level& buf_in, Level& buf_out);
+ static void zip_buffer_with_stride(Level& buf_in, Level& buf_out, uint16_t stride);
static void merge_two_size_k_buffers(Level& arr_in_1, Level& arr_in_2, Level& arr_out);
template<typename SerDe>
@@ -513,6 +514,9 @@ private:
static uint32_t compute_valid_levels(uint64_t bit_pattern);
static uint8_t compute_levels_needed(uint16_t k, uint64_t n);
+ template<typename FwdT>
+ void downsampling_merge(FwdT&& other);
+
/**
* Returns the zero-based bit position of the lowest zero bit of <i>bits</i> starting at
* <i>startingBit</i>. If input is all ones, this returns 64.
diff --git a/quantiles/include/quantiles_sketch_impl.hpp b/quantiles/include/quantiles_sketch_impl.hpp
index 0e63efc..1119829 100644
--- a/quantiles/include/quantiles_sketch_impl.hpp
+++ b/quantiles/include/quantiles_sketch_impl.hpp
@@ -29,6 +29,7 @@
#include "common_defs.hpp"
#include "count_zeros.hpp"
+#include "conditional_forward.hpp"
#include "quantiles_sketch.hpp"
namespace datasketches {
@@ -164,6 +165,40 @@ void quantiles_sketch<T, C, A>::update(FwdT&& item) {
process_full_base_buffer();
}
+template<typename T, typename C, typename A>
+template<typename FwdT>
+void quantiles_sketch<T, C, A>::merge(FwdT&& other) {
+ if (other.is_empty()) {
+ return; // nothing to do
+ } else if (!other.is_estimation_mode()) {
+ // other is exact, stream in regardless of k
+ for (auto item : other.base_buffer_) {
+ update(conditional_forward<FwdT>(item));
+ }
+ return; // we're done
+ }
+
+ if (is_empty()) {
+ std::cerr << "Copy, possibly downsampling" << std::endl;
+ if (k_ >= other.get_k()) {
+ std::cerr << "Copy other into self" << std::endl;
+ // empty, so copy other (since we can't change it) and replace self with copy
+ quantiles_sketch<T, C, A> sk_copy(k_, allocator_);
+ //sk_copy.merge(std::forward<FwdT>(other));
+ sk_copy.merge(other);
+ *this = std::move(sk_copy);
+ } else { // k_ < other.get_k()
+ std::cerr << "Downsampling merge other into self" << std::endl;
+ // copy, maybe with downsampling
+ downsampling_merge(std::forward<FwdT>(other));
+
+ }
+ } else {
+ // merge, maybe with downsampling
+ std::cerr << "Merge, possibly downsampling" << std::endl;
+ }
+}
+
template<typename T, typename C, typename A>
template<typename SerDe>
void quantiles_sketch<T, C, A>::serialize(std::ostream& os, const SerDe& serde) const {
@@ -772,7 +807,7 @@ uint8_t quantiles_sketch<T, C, A>::compute_levels_needed(const uint16_t k, const
template<typename T, typename C, typename A>
void quantiles_sketch<T, C, A>::check_k(uint16_t k) {
- if (k < quantiles_constants::MIN_K || k > quantiles_constants::MAX_K || (k & k - 1) != 0) {
+ if (k < quantiles_constants::MIN_K || k > quantiles_constants::MAX_K || (k & (k - 1)) != 0) {
throw std::invalid_argument("k must be a power of 2 that is >= "
+ std::to_string(quantiles_constants::MIN_K) + " and <= "
+ std::to_string(quantiles_constants::MAX_K) + ". Found: " + std::to_string(k));
@@ -879,9 +914,9 @@ bool quantiles_sketch<T, C, A>::grow_levels_if_needed() {
template<typename T, typename C, typename A>
void quantiles_sketch<T, C, A>::in_place_propagate_carry(uint8_t starting_level,
- Level& buf_size_k, Level& buf_size_2k,
- bool apply_as_update,
- quantiles_sketch<T,C,A>& sketch) {
+ Level& buf_size_k, Level& buf_size_2k,
+ bool apply_as_update,
+ quantiles_sketch<T,C,A>& sketch) {
const uint64_t bit_pattern = sketch.bit_pattern_;
const int k = sketch.k_;
@@ -893,7 +928,9 @@ void quantiles_sketch<T, C, A>::in_place_propagate_carry(uint8_t starting_level,
zip_buffer(buf_size_2k, sketch.levels_[ending_level]);
} else {
// merge_into version of computation
- std::move(&buf_size_k[0], &buf_size_k[0] + k, &sketch.levels_[ending_level][0]);
+ for (uint16_t i = 0; i < k; ++i) {
+ sketch.levels_[ending_level].push_back(std::move(buf_size_k[i]));
+ }
}
for (uint64_t lvl = starting_level; lvl < ending_level; lvl++) {
@@ -929,6 +966,22 @@ void quantiles_sketch<T, C, A>::zip_buffer(Level& buf_in, Level& buf_out) {
buf_in.clear();
}
+template<typename T, typename C, typename A>
+void quantiles_sketch<T, C, A>::zip_buffer_with_stride(Level& buf_in, Level& buf_out, uint16_t stride) {
+ // Random offset in range [0, stride)
+ std::uniform_int_distribution<uint16_t> dist(0, stride - 1);
+ uint16_t rand_offset = dist(random_utils::rand);
+
+ assert(buf_in.size() == (1 << stride) * buf_out.capacity());
+ assert(buf_out.size() == 0);
+ size_t k = buf_out.capacity();
+ for (uint16_t i = rand_offset, o = 0; o < k; i += 2, ++o) {
+ buf_out.push_back(buf_in[i]);
+ }
+ // do not clear input buffer
+}
+
+
template<typename T, typename C, typename A>
void quantiles_sketch<T, C, A>::merge_two_size_k_buffers(Level& src_1, Level& src_2, Level& dst) {
assert(src_1.size() == src_2.size());
@@ -955,6 +1008,84 @@ void quantiles_sketch<T, C, A>::merge_two_size_k_buffers(Level& src_1, Level& sr
}
}
+/**
+ * Merges the other sketch into the current sketch with a smaller value of K.
+ * However, it is required that the ratio of the two K values be a power of 2.
+ * I.e., other.get_k() = this.get_k() * 2^(nonnegative integer).
+ * other is modified only if elements can be moved out of it
+ */
+template<typename T, typename C, typename A>
+template<typename FwdT>
+void quantiles_sketch<T, C, A>::downsampling_merge(FwdT&& other) {
+ if (other.get_k() % k_ != 0) {
+ throw std::invalid_argument("other.get_k() is not a multiple of k_");
+ }
+ assert(!other.is_empty());
+
+ const uint32_t downsample_factor = other.get_k() / k_;
+ const uint32_t lg_sample_factor = count_trailing_zeros_in_u32(downsample_factor);
+
+ uint64_t new_n = n_ + other.get_n();
+
+ // move items from other's base buffer
+ for (uint16_t i = 0; i < other.base_buffer_.size(); ++i) {
+ update(conditional_forward<FwdT>(other.base_buffer_[i]));
+ }
+
+ // check (after moving raw items) if we need to extetend levels array
+ uint8_t levels_needed = compute_levels_needed(k_, new_n);
+ if (levels_needed > levels_.size()) {
+ levels_.reserve(levels_needed);
+ while (levels_.size() < levels_needed) {
+ Level empty_level(allocator_);
+ empty_level.reserve(k_);
+ levels_.push_back(std::move(empty_level));
+ }
+ }
+
+ Level down_buf(allocator_);
+ down_buf.reserve(k_);
+
+ Level scratch_buf(allocator_);
+ scratch_buf.reserve(2 * k_);
+
+ uint64_t src_pattern = other.bit_pattern_;
+ for (uint8_t src_lvl = 0; src_pattern != 0; ++src_lvl, src_pattern >>= 1) {
+ if ((src_pattern & 1) > 0) {
+ down_buf.clear();
+ scratch_buf.clear();
+
+ // zip with stride, leaving input buffer intact
+ zip_buffer_with_stride(other.levels_[src_lvl], down_buf, lg_sample_factor);
+
+ // propagate-carry
+ in_place_propagate_carry(src_lvl + lg_sample_factor,
+ down_buf, scratch_buf,
+ false, *this);
+ // update n_ at the end
+ }
+ }
+ n_ = new_n;
+ assert((n_ / (2 * k_)) == bit_pattern_); // internal consistency check
+
+ // update min/max values
+ // can't just check is_empty() since min/max might not have been set if
+ // there were no base buffer items added via update()
+ if (min_value_ == nullptr) {
+ min_value_ = new (allocator_.allocate(1)) T(*other.min_value_);
+ } else {
+ if (C()(*other.min_value_, *min_value_))
+ *min_value_ = conditional_forward<FwdT>(*other.min_value_);
+ }
+
+ if (max_value_ == nullptr) {
+ max_value_ = new (allocator_.allocate(1)) T(*other.max_value_);
+ } else {
+ if (C()(*max_value_, *other.max_value_))
+ *max_value_ = conditional_forward<FwdT>(*other.max_value_);
+ }
+}
+
template<typename T, typename C, typename A>
uint8_t quantiles_sketch<T, C, A>::lowest_zero_bit_starting_at(uint64_t bits, uint8_t starting_bit) {
uint8_t pos = starting_bit & 0X3F;
diff --git a/quantiles/test/CMakeLists.txt b/quantiles/test/CMakeLists.txt
index 9b65a8e..74ef73f 100644
--- a/quantiles/test/CMakeLists.txt
+++ b/quantiles/test/CMakeLists.txt
@@ -18,6 +18,7 @@
add_executable(quantiles_test)
target_link_libraries(quantiles_test quantiles common_test)
+#target_link_libraries(quantiles_test quantiles common)
set_target_properties(quantiles_test PROPERTIES
CXX_STANDARD 11
@@ -40,7 +41,5 @@ target_sources(quantiles_test
PRIVATE
quantiles_sketch_test.cpp
quantiles_compatibility_test.cpp
- #quantiles_sketch_custom_type_test.cpp
- #quantiles_sketch_validation.cpp
- #kolmogorov_smirnov_test.cpp
+ #simple_test.cpp
)
diff --git a/quantiles/test/quantiles_sketch_test.cpp b/quantiles/test/quantiles_sketch_test.cpp
index 3c17ef8..2b6dc4c 100644
--- a/quantiles/test/quantiles_sketch_test.cpp
+++ b/quantiles/test/quantiles_sketch_test.cpp
@@ -474,15 +474,30 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
float split_points[1] = {std::numeric_limits<float>::quiet_NaN()};
REQUIRE_THROWS_AS(sketch.get_CDF(split_points, 1), std::invalid_argument);
}
+
+ SECTION("merge, manual testing") {
+ quantiles_float_sketch sk1(32, 0);
+ quantiles_float_sketch sk2(256, 0);
+ const int n = 10000;
+ for (int i = 0; i < n; i++) {
+ //sk1.update(static_cast<float>(i));
+ sk2.update(static_cast<float>((2 * n) - i - 1));
+ }
+
+ //std::cout << "Min: " << sk1.get_min_value() << std::endl;
+ //std::cout << "Max: " << sk1.get_max_value() << std::endl;
+ std::cout << "Merging..." << std::endl;
+ sk1.merge(sk2);
+ std::cout << "Min: " << sk1.get_min_value() << std::endl;
+ std::cout << "Max: " << sk1.get_max_value() << std::endl;
+
+ std::cout << "n: " << sk1.get_n() << std::endl;
+ }
+
/*
SECTION("merge") {
quantiles_float_sketch sketch1(128, 0);
quantiles_float_sketch sketch2(128, 0);
- const int n = 10000;
- for (int i = 0; i < n; i++) {
- sketch1.update(static_cast<float>(i));
- sketch2.update(static_cast<float>((2 * n) - i - 1));
- }
REQUIRE(sketch1.get_min_value() == 0.0f);
REQUIRE(sketch1.get_max_value() == n - 1);
@@ -575,6 +590,7 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
REQUIRE(sketch2.get_max_value() == 999999.0f);
}
*/
+
SECTION("sketch of ints") {
quantiles_sketch<int> sketch;
REQUIRE_THROWS_AS(sketch.get_quantile(0), std::runtime_error);
@@ -669,7 +685,6 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
REQUIRE(sketch2.get_rank(std::to_string(n)) == sketch1.get_rank(std::to_string(n)));
}
-
SECTION("sketch of strings, single item, bytes") {
quantiles_string_sketch sketch1(64, 0);
sketch1.update("a");
diff --git a/sampling/include/var_opt_sketch_impl.hpp b/sampling/include/var_opt_sketch_impl.hpp
index b2de71f..dd4d463 100644
--- a/sampling/include/var_opt_sketch_impl.hpp
+++ b/sampling/include/var_opt_sketch_impl.hpp
@@ -1683,16 +1683,6 @@ bool var_opt_sketch<T, S, A>::iterator::get_mark() const {
return sk_->marks_ == nullptr ? false : sk_->marks_[idx_];
}
-
-
-// ******************** MOVE TO COMMON UTILS AREA EVENTUALLY *********************
-
-namespace random_utils {
- static std::random_device rd; // possibly unsafe in MinGW with GCC < 9.2
- static std::mt19937_64 rand(rd());
- static std::uniform_real_distribution<> next_double(0.0, 1.0);
-}
-
/**
* Checks if target sampling allocation is more than 50% of max sampling size.
* If so, returns max sampling size, otherwise passes through target size.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org