You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by jm...@apache.org on 2022/04/20 23:27:33 UTC

[datasketches-cpp] branch quantiles updated: incomplete merge implementation

This is an automated email from the ASF dual-hosted git repository.

jmalkin pushed a commit to branch quantiles
in repository https://gitbox.apache.org/repos/asf/datasketches-cpp.git


The following commit(s) were added to refs/heads/quantiles by this push:
     new 06331f5  incomplete merge implementation
06331f5 is described below

commit 06331f5120763644e3da1a105044fa7e651bf876
Author: Jon <jm...@apache.org>
AuthorDate: Wed Apr 20 16:25:24 2022 -0700

    incomplete merge implementation
---
 common/include/common_defs.hpp              |   7 ++
 common/test/test_type.hpp                   |   2 +
 quantiles/include/quantiles_sketch.hpp      |   4 +
 quantiles/include/quantiles_sketch_impl.hpp | 141 +++++++++++++++++++++++++++-
 quantiles/test/CMakeLists.txt               |   5 +-
 quantiles/test/quantiles_sketch_test.cpp    |  27 ++++--
 sampling/include/var_opt_sketch_impl.hpp    |  10 --
 7 files changed, 172 insertions(+), 24 deletions(-)

diff --git a/common/include/common_defs.hpp b/common/include/common_defs.hpp
index 1c3a6e0..a1ab6cf 100644
--- a/common/include/common_defs.hpp
+++ b/common/include/common_defs.hpp
@@ -40,6 +40,13 @@ template<typename A> using string = std::basic_string<char, std::char_traits<cha
 static std::independent_bits_engine<std::mt19937, 1, uint32_t>
   random_bit(static_cast<uint32_t>(std::chrono::system_clock::now().time_since_epoch().count()));
 
+// common random declarations
+namespace random_utils {
+  static std::random_device rd; // possibly unsafe in MinGW with GCC < 9.2
+  static std::mt19937_64 rand(rd());
+  static std::uniform_real_distribution<> next_double(0.0, 1.0);
+}
+
 
 // utility function to hide unused compiler warning
 // usually has no additional cost
diff --git a/common/test/test_type.hpp b/common/test/test_type.hpp
index 18be598..dacb370 100644
--- a/common/test/test_type.hpp
+++ b/common/test/test_type.hpp
@@ -20,7 +20,9 @@
 #ifndef CLASS_TEST_TYPE_HPP_
 #define CLASS_TEST_TYPE_HPP_
 
+#include <cstring>
 #include <iostream>
+#include "memory_operations.hpp"
 
 namespace datasketches {
 
diff --git a/quantiles/include/quantiles_sketch.hpp b/quantiles/include/quantiles_sketch.hpp
index ec3b475..d033cb9 100644
--- a/quantiles/include/quantiles_sketch.hpp
+++ b/quantiles/include/quantiles_sketch.hpp
@@ -494,6 +494,7 @@ private:
                                        Level& buf_size_2k, bool apply_as_update,
                                        quantiles_sketch<T,C,A>& sketch);
   static void zip_buffer(Level& buf_in, Level& buf_out);
+  static void zip_buffer_with_stride(Level& buf_in, Level& buf_out, uint16_t stride);
   static void merge_two_size_k_buffers(Level& arr_in_1, Level& arr_in_2, Level& arr_out);
 
   template<typename SerDe>
@@ -513,6 +514,9 @@ private:
   static uint32_t compute_valid_levels(uint64_t bit_pattern);
   static uint8_t compute_levels_needed(uint16_t k, uint64_t n);
 
+  template<typename FwdT>
+  void downsampling_merge(FwdT&& other);
+
   /**
    * Returns the zero-based bit position of the lowest zero bit of <i>bits</i> starting at
    * <i>startingBit</i>. If input is all ones, this returns 64.
diff --git a/quantiles/include/quantiles_sketch_impl.hpp b/quantiles/include/quantiles_sketch_impl.hpp
index 0e63efc..1119829 100644
--- a/quantiles/include/quantiles_sketch_impl.hpp
+++ b/quantiles/include/quantiles_sketch_impl.hpp
@@ -29,6 +29,7 @@
 
 #include "common_defs.hpp"
 #include "count_zeros.hpp"
+#include "conditional_forward.hpp"
 #include "quantiles_sketch.hpp"
 
 namespace datasketches {
@@ -164,6 +165,40 @@ void quantiles_sketch<T, C, A>::update(FwdT&& item) {
     process_full_base_buffer();
 }
 
+template<typename T, typename C, typename A>
+template<typename FwdT>
+void quantiles_sketch<T, C, A>::merge(FwdT&& other) {
+  if (other.is_empty()) {
+    return; // nothing to do
+  } else if (!other.is_estimation_mode()) {
+    // other is exact, stream in regardless of k
+    for (auto item : other.base_buffer_) {
+      update(conditional_forward<FwdT>(item));
+    }
+    return; // we're done
+  }
+
+  if (is_empty()) {
+    std::cerr << "Copy, possibly downsampling" << std::endl;
+    if (k_ >= other.get_k()) {
+      std::cerr << "Copy other into self" << std::endl;
+      // empty, so copy other (since we can't change it) and replace self with copy
+      quantiles_sketch<T, C, A> sk_copy(k_, allocator_);
+      //sk_copy.merge(std::forward<FwdT>(other));
+      sk_copy.merge(other);
+      *this = std::move(sk_copy);
+    } else { // k_ < other.get_k()
+      std::cerr << "Downsampling merge other into self" << std::endl;
+      // copy, maybe with downsampling
+      downsampling_merge(std::forward<FwdT>(other));
+      
+    }
+  } else {
+    // merge, maybe with downsampling
+    std::cerr << "Merge, possibly downsampling" << std::endl;
+  }
+}
+
 template<typename T, typename C, typename A>
 template<typename SerDe>
 void quantiles_sketch<T, C, A>::serialize(std::ostream& os, const SerDe& serde) const {
@@ -772,7 +807,7 @@ uint8_t quantiles_sketch<T, C, A>::compute_levels_needed(const uint16_t k, const
 
 template<typename T, typename C, typename A>
 void quantiles_sketch<T, C, A>::check_k(uint16_t k) {
-  if (k < quantiles_constants::MIN_K || k > quantiles_constants::MAX_K || (k & k - 1) != 0) {
+  if (k < quantiles_constants::MIN_K || k > quantiles_constants::MAX_K || (k & (k - 1)) != 0) {
     throw std::invalid_argument("k must be a power of 2 that is >= "
       + std::to_string(quantiles_constants::MIN_K) + " and <= "
       + std::to_string(quantiles_constants::MAX_K) + ". Found: " + std::to_string(k));
@@ -879,9 +914,9 @@ bool quantiles_sketch<T, C, A>::grow_levels_if_needed() {
 
 template<typename T, typename C, typename A>
 void quantiles_sketch<T, C, A>::in_place_propagate_carry(uint8_t starting_level,
-                                                            Level& buf_size_k, Level& buf_size_2k, 
-                                                            bool apply_as_update,
-                                                            quantiles_sketch<T,C,A>& sketch) {
+                                                         Level& buf_size_k, Level& buf_size_2k, 
+                                                         bool apply_as_update,
+                                                         quantiles_sketch<T,C,A>& sketch) {
   const uint64_t bit_pattern = sketch.bit_pattern_;
   const int k = sketch.k_;
 
@@ -893,7 +928,9 @@ void quantiles_sketch<T, C, A>::in_place_propagate_carry(uint8_t starting_level,
     zip_buffer(buf_size_2k, sketch.levels_[ending_level]);
   } else {
     // merge_into version of computation
-    std::move(&buf_size_k[0], &buf_size_k[0] + k, &sketch.levels_[ending_level][0]);
+    for (uint16_t i = 0; i < k; ++i) {
+      sketch.levels_[ending_level].push_back(std::move(buf_size_k[i]));
+    }
   }
 
   for (uint64_t lvl = starting_level; lvl < ending_level; lvl++) {
@@ -929,6 +966,22 @@ void quantiles_sketch<T, C, A>::zip_buffer(Level& buf_in, Level& buf_out) {
   buf_in.clear();
 }
 
+template<typename T, typename C, typename A>
+void quantiles_sketch<T, C, A>::zip_buffer_with_stride(Level& buf_in, Level& buf_out, uint16_t stride) {
+  // Random offset in range [0, stride)
+  std::uniform_int_distribution<uint16_t> dist(0, stride - 1);
+  uint16_t rand_offset = dist(random_utils::rand);
+  
+  assert(buf_in.size() == (1 << stride) * buf_out.capacity());
+  assert(buf_out.size() == 0);
+  size_t k = buf_out.capacity();
+  for (uint16_t i = rand_offset, o = 0; o < k; i += 2, ++o) {
+    buf_out.push_back(buf_in[i]);
+  }
+  // do not clear input buffer
+}
+
+
 template<typename T, typename C, typename A>
 void quantiles_sketch<T, C, A>::merge_two_size_k_buffers(Level& src_1, Level& src_2, Level& dst) {
   assert(src_1.size() == src_2.size());
@@ -955,6 +1008,84 @@ void quantiles_sketch<T, C, A>::merge_two_size_k_buffers(Level& src_1, Level& sr
   }
 }
 
+/**
+ * Merges the other sketch into the current sketch with a smaller value of K.
+ * However, it is required that the ratio of the two K values be a power of 2.
+ * I.e., other.get_k() = this.get_k() * 2^(nonnegative integer).
+ * other is modified only if elements can be moved out of it
+ */
+template<typename T, typename C, typename A>
+template<typename FwdT>
+void quantiles_sketch<T, C, A>::downsampling_merge(FwdT&& other) { 
+  if (other.get_k() % k_ != 0) {
+    throw std::invalid_argument("other.get_k() is not a multiple of k_");
+  }
+  assert(!other.is_empty());
+
+  const uint32_t downsample_factor = other.get_k() / k_;
+  const uint32_t lg_sample_factor = count_trailing_zeros_in_u32(downsample_factor);
+
+  uint64_t new_n = n_ + other.get_n();
+
+  // move items from other's base buffer
+  for (uint16_t i = 0; i < other.base_buffer_.size(); ++i) {
+    update(conditional_forward<FwdT>(other.base_buffer_[i]));
+  }
+
+  // check (after moving raw items) if we need to extetend levels array
+  uint8_t levels_needed = compute_levels_needed(k_, new_n);
+  if (levels_needed > levels_.size()) {
+    levels_.reserve(levels_needed);
+    while (levels_.size() < levels_needed) {
+      Level empty_level(allocator_);
+      empty_level.reserve(k_);
+      levels_.push_back(std::move(empty_level));
+    }
+  }
+
+  Level down_buf(allocator_);
+  down_buf.reserve(k_);
+
+  Level scratch_buf(allocator_);
+  scratch_buf.reserve(2 * k_);
+
+  uint64_t src_pattern = other.bit_pattern_;
+  for (uint8_t src_lvl = 0; src_pattern != 0; ++src_lvl, src_pattern >>= 1) {
+    if ((src_pattern & 1) > 0) {
+      down_buf.clear();
+      scratch_buf.clear();
+
+      // zip with stride, leaving input buffer intact
+      zip_buffer_with_stride(other.levels_[src_lvl], down_buf, lg_sample_factor);
+
+      // propagate-carry
+      in_place_propagate_carry(src_lvl + lg_sample_factor,
+                               down_buf, scratch_buf,
+                               false, *this);
+      // update n_ at the end
+    }
+  }
+  n_ = new_n;
+  assert((n_ / (2 * k_)) == bit_pattern_); // internal consistency check
+
+  // update min/max values
+  // can't just check is_empty() since min/max might not have been set if
+  // there were no base buffer items added via update()
+  if (min_value_ == nullptr) {
+    min_value_ = new (allocator_.allocate(1)) T(*other.min_value_);
+  } else {
+    if (C()(*other.min_value_, *min_value_))
+      *min_value_ = conditional_forward<FwdT>(*other.min_value_);
+  }
+
+  if (max_value_ == nullptr) {
+    max_value_ = new (allocator_.allocate(1)) T(*other.max_value_);
+  } else {
+    if (C()(*max_value_, *other.max_value_))
+      *max_value_ = conditional_forward<FwdT>(*other.max_value_);
+  }
+}
+
 template<typename T, typename C, typename A>
 uint8_t quantiles_sketch<T, C, A>::lowest_zero_bit_starting_at(uint64_t bits, uint8_t starting_bit) {
   uint8_t pos = starting_bit & 0X3F;
diff --git a/quantiles/test/CMakeLists.txt b/quantiles/test/CMakeLists.txt
index 9b65a8e..74ef73f 100644
--- a/quantiles/test/CMakeLists.txt
+++ b/quantiles/test/CMakeLists.txt
@@ -18,6 +18,7 @@
 add_executable(quantiles_test)
 
 target_link_libraries(quantiles_test quantiles common_test)
+#target_link_libraries(quantiles_test quantiles common)
 
 set_target_properties(quantiles_test PROPERTIES
   CXX_STANDARD 11
@@ -40,7 +41,5 @@ target_sources(quantiles_test
   PRIVATE
     quantiles_sketch_test.cpp
     quantiles_compatibility_test.cpp
-    #quantiles_sketch_custom_type_test.cpp
-    #quantiles_sketch_validation.cpp
-    #kolmogorov_smirnov_test.cpp
+    #simple_test.cpp
 )
diff --git a/quantiles/test/quantiles_sketch_test.cpp b/quantiles/test/quantiles_sketch_test.cpp
index 3c17ef8..2b6dc4c 100644
--- a/quantiles/test/quantiles_sketch_test.cpp
+++ b/quantiles/test/quantiles_sketch_test.cpp
@@ -474,15 +474,30 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
     float split_points[1] = {std::numeric_limits<float>::quiet_NaN()};
     REQUIRE_THROWS_AS(sketch.get_CDF(split_points, 1), std::invalid_argument);
   }
+
+  SECTION("merge, manual testing") {
+    quantiles_float_sketch sk1(32, 0);
+    quantiles_float_sketch sk2(256, 0);
+    const int n = 10000;
+    for (int i = 0; i < n; i++) {
+      //sk1.update(static_cast<float>(i));
+      sk2.update(static_cast<float>((2 * n) - i - 1));
+    }
+
+    //std::cout << "Min: " << sk1.get_min_value() << std::endl;
+    //std::cout << "Max: " << sk1.get_max_value() << std::endl;
+    std::cout << "Merging..." << std::endl;
+    sk1.merge(sk2);
+    std::cout << "Min: " << sk1.get_min_value() << std::endl;
+    std::cout << "Max: " << sk1.get_max_value() << std::endl;
+
+    std::cout << "n: " << sk1.get_n() << std::endl;
+  }
+
 /*
   SECTION("merge") {
     quantiles_float_sketch sketch1(128, 0);
     quantiles_float_sketch sketch2(128, 0);
-    const int n = 10000;
-    for (int i = 0; i < n; i++) {
-      sketch1.update(static_cast<float>(i));
-      sketch2.update(static_cast<float>((2 * n) - i - 1));
-    }
 
     REQUIRE(sketch1.get_min_value() == 0.0f);
     REQUIRE(sketch1.get_max_value() == n - 1);
@@ -575,6 +590,7 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
     REQUIRE(sketch2.get_max_value() == 999999.0f);
   }
 */
+
   SECTION("sketch of ints") {
     quantiles_sketch<int> sketch;
     REQUIRE_THROWS_AS(sketch.get_quantile(0), std::runtime_error);
@@ -669,7 +685,6 @@ TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
     REQUIRE(sketch2.get_rank(std::to_string(n)) == sketch1.get_rank(std::to_string(n)));
   }
 
-
   SECTION("sketch of strings, single item, bytes") {
     quantiles_string_sketch sketch1(64, 0);
     sketch1.update("a");
diff --git a/sampling/include/var_opt_sketch_impl.hpp b/sampling/include/var_opt_sketch_impl.hpp
index b2de71f..dd4d463 100644
--- a/sampling/include/var_opt_sketch_impl.hpp
+++ b/sampling/include/var_opt_sketch_impl.hpp
@@ -1683,16 +1683,6 @@ bool var_opt_sketch<T, S, A>::iterator::get_mark() const {
   return sk_->marks_ == nullptr ? false : sk_->marks_[idx_];
 }
 
-
-
-// ******************** MOVE TO COMMON UTILS AREA EVENTUALLY *********************
-
-namespace random_utils {
-  static std::random_device rd; // possibly unsafe in MinGW with GCC < 9.2
-  static std::mt19937_64 rand(rd());
-  static std::uniform_real_distribution<> next_double(0.0, 1.0);
-}
-
 /**
  * Checks if target sampling allocation is more than 50% of max sampling size.
  * If so, returns max sampling size, otherwise passes through target size.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org