You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/02/17 21:32:05 UTC

[incubator-datasketches-cpp] 01/01: moving merge

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch kll_moving_merge
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git

commit 462990200e22170aa2912433125c8a9801e7d16c
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Mon Feb 17 13:31:50 2020 -0800

    moving merge
---
 common/test/test_type.hpp                | 12 ++---
 kll/include/kll_sketch.hpp               | 13 +++---
 kll/include/kll_sketch_impl.hpp          | 76 ++++++++++++++++++++++++++++----
 kll/test/kll_sketch_custom_type_test.cpp | 12 +++++
 4 files changed, 90 insertions(+), 23 deletions(-)

diff --git a/common/test/test_type.hpp b/common/test/test_type.hpp
index 487da08..cf4a44f 100644
--- a/common/test/test_type.hpp
+++ b/common/test/test_type.hpp
@@ -29,25 +29,25 @@ class test_type {
 public:
   // no default constructor should be required
   test_type(int value): value(value) {
-    if (DEBUG) std::cerr << "A constructor" << std::endl;
+    if (DEBUG) std::cerr << "test_type constructor" << std::endl;
   }
   ~test_type() {
-    if (DEBUG) std::cerr << "A destructor" << std::endl;
+    if (DEBUG) std::cerr << "test_type destructor" << std::endl;
   }
   test_type(const test_type& other): value(other.value) {
-    if (DEBUG) std::cerr << "A copy constructor" << std::endl;
+    if (DEBUG) std::cerr << "test_type copy constructor" << std::endl;
   }
   // noexcept is important here so that, for instance, std::vector could move this type
   test_type(test_type&& other) noexcept : value(other.value) {
-    if (DEBUG) std::cerr << "A move constructor" << std::endl;
+    if (DEBUG) std::cerr << "test_type move constructor" << std::endl;
   }
   test_type& operator=(const test_type& other) {
-    if (DEBUG) std::cerr << "A copy assignment" << std::endl;
+    if (DEBUG) std::cerr << "test_type copy assignment" << std::endl;
     value = other.value;
     return *this;
   }
   test_type& operator=(test_type&& other) {
-    if (DEBUG) std::cerr << "A move assignment" << std::endl;
+    if (DEBUG) std::cerr << "test_type move assignment" << std::endl;
     value = other.value;
     return *this;
   }
diff --git a/kll/include/kll_sketch.hpp b/kll/include/kll_sketch.hpp
index 28f5fcb..d33c945 100644
--- a/kll/include/kll_sketch.hpp
+++ b/kll/include/kll_sketch.hpp
@@ -169,6 +169,7 @@ class kll_sketch {
     void update(const T& value);
     void update(T&& value);
     void merge(const kll_sketch& other);
+    void merge(kll_sketch&& other);
     bool is_empty() const;
     uint64_t get_n() const;
     uint32_t get_num_retained() const;
@@ -208,12 +209,6 @@ class kll_sketch {
       return size;
     }
 
-    // this may need to be specialized to return correct size if sizeof(T) does not match the actual serialized size of an item
-    // this method is for the user's convenience to predict the sketch size before serialization
-    // and is not used in the serialization and deserialization code
-    // predicting the size before serialization may not make sense if the item type is not of a fixed size (like string)
-    static size_t get_max_serialized_size_bytes(uint16_t k, uint64_t n);
-
     void serialize(std::ostream& os) const;
     typedef vector_u8<A> vector_bytes; // alias for users
     vector_bytes serialize(unsigned header_size_bytes = 0) const;
@@ -287,7 +282,8 @@ class kll_sketch {
     kll_sketch(uint16_t k, uint8_t flags_byte, const void* bytes, size_t size);
 
     // common update code
-    inline uint32_t internal_update(const T& value);
+    inline void update_min_max(const T& value);
+    inline uint32_t internal_update();
 
     // The following code is only valid in the special case of exactly reaching capacity while updating.
     // It cannot be used while merging, while reducing k, or anything else.
@@ -302,8 +298,9 @@ class kll_sketch {
         const T* split_points, uint32_t size, double* buckets) const;
     void increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
         const T* split_points, uint32_t size, double* buckets) const;
-    void merge_higher_levels(const kll_sketch& other, uint64_t final_n);
+    template<typename O> void merge_higher_levels(O&& other, uint64_t final_n);
     void populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
+    void populate_work_arrays(kll_sketch&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
     void assert_correct_total_weight() const;
     uint32_t safe_level_size(uint8_t level) const;
     uint32_t get_num_retained_above_level_zero() const;
diff --git a/kll/include/kll_sketch_impl.hpp b/kll/include/kll_sketch_impl.hpp
index efd76e2..55a97f6 100644
--- a/kll/include/kll_sketch_impl.hpp
+++ b/kll/include/kll_sketch_impl.hpp
@@ -153,18 +153,20 @@ kll_sketch<T, C, S, A>::~kll_sketch() {
 
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::update(const T& value) {
-  const uint32_t index = internal_update(value);
+  update_min_max(value);
+  const uint32_t index = internal_update();
   new (&items_[index]) T(value);
 }
 
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::update(T&& value) {
-  const uint32_t index = internal_update(value);
+  update_min_max(value);
+  const uint32_t index = internal_update();
   new (&items_[index]) T(std::move(value));
 }
 
 template<typename T, typename C, typename S, typename A>
-uint32_t kll_sketch<T, C, S, A>::internal_update(const T& value) {
+void kll_sketch<T, C, S, A>::update_min_max(const T& value) {
   if (is_empty()) {
     min_value_ = new (A().allocate(1)) T(value);
     max_value_ = new (A().allocate(1)) T(value);
@@ -172,6 +174,10 @@ uint32_t kll_sketch<T, C, S, A>::internal_update(const T& value) {
     if (C()(value, *min_value_)) *min_value_ = value;
     if (C()(*max_value_, value)) *max_value_ = value;
   }
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t kll_sketch<T, C, S, A>::internal_update() {
   if (levels_[0] == 0) compress_while_updating();
   n_++;
   is_level_zero_sorted_ = false;
@@ -184,10 +190,6 @@ void kll_sketch<T, C, S, A>::merge(const kll_sketch& other) {
   if (m_ != other.m_) {
     throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
   }
-  const uint64_t final_n = n_ + other.n_;
-  for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
-    update(other.items_[i]);
-  }
   if (is_empty()) {
     min_value_ = new (A().allocate(1)) T(*other.min_value_);
     max_value_ = new (A().allocate(1)) T(*other.max_value_);
@@ -195,6 +197,11 @@ void kll_sketch<T, C, S, A>::merge(const kll_sketch& other) {
     if (C()(*other.min_value_, *min_value_)) *min_value_ = *other.min_value_;
     if (C()(*max_value_, *other.max_value_)) *max_value_ = *other.max_value_;
   }
+  const uint64_t final_n = n_ + other.n_;
+  for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
+    const uint32_t index = internal_update();
+    new (&items_[index]) T(other.items_[i]);
+  }
   if (other.num_levels_ >= 2) merge_higher_levels(other, final_n);
   n_ = final_n;
   if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
@@ -202,6 +209,30 @@ void kll_sketch<T, C, S, A>::merge(const kll_sketch& other) {
 }
 
 template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::merge(kll_sketch&& other) {
+  if (other.is_empty()) return;
+  if (m_ != other.m_) {
+    throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
+  }
+  if (is_empty()) {
+    min_value_ = new (A().allocate(1)) T(std::move(*other.min_value_));
+    max_value_ = new (A().allocate(1)) T(std::move(*other.max_value_));
+  } else {
+    if (C()(*other.min_value_, *min_value_)) *min_value_ = std::move(*other.min_value_);
+    if (C()(*max_value_, *other.max_value_)) *max_value_ = std::move(*other.max_value_);
+  }
+  const uint64_t final_n = n_ + other.n_;
+  for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
+    const uint32_t index = internal_update();
+    new (&items_[index]) T(std::move(other.items_[i]));
+  }
+  if (other.num_levels_ >= 2) merge_higher_levels(std::forward<kll_sketch>(other), final_n);
+  n_ = final_n;
+  if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
+  assert_correct_total_weight();
+}
+
+template<typename T, typename C, typename S, typename A>
 bool kll_sketch<T, C, S, A>::is_empty() const {
   return n_ == 0;
 }
@@ -734,7 +765,8 @@ void kll_sketch<T, C, S, A>::increment_buckets_sorted_level(uint32_t from_index,
 }
 
 template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::merge_higher_levels(const kll_sketch& other, uint64_t final_n) {
+template<typename O>
+void kll_sketch<T, C, S, A>::merge_higher_levels(O&& other, uint64_t final_n) {
   const uint32_t tmp_num_items = get_num_retained() + other.get_num_retained_above_level_zero();
   auto tmp_items_deleter = [tmp_num_items](T* ptr) { A().deallocate(ptr, tmp_num_items); }; // no destructor needed
   const std::unique_ptr<T, decltype(tmp_items_deleter)> workbuf(A().allocate(tmp_num_items), tmp_items_deleter);
@@ -746,7 +778,7 @@ void kll_sketch<T, C, S, A>::merge_higher_levels(const kll_sketch& other, uint64
 
   const uint8_t provisional_num_levels = std::max(num_levels_, other.num_levels_);
 
-  populate_work_arrays(other, workbuf.get(), worklevels.get(), provisional_num_levels);
+  populate_work_arrays(std::forward<O>(other), workbuf.get(), worklevels.get(), provisional_num_levels);
 
   const kll_helper::compress_result result = kll_helper::general_compress<T, C>(k_, m_, provisional_num_levels, workbuf.get(),
       worklevels.get(), outlevels.get(), is_level_zero_sorted_);
@@ -776,6 +808,7 @@ void kll_sketch<T, C, S, A>::merge_higher_levels(const kll_sketch& other, uint64
 }
 
 // this leaves items_ uninitialized (all objects moved out and destroyed)
+// this version copies objects from the incoming sketch
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
   worklevels[0] = 0;
@@ -799,6 +832,31 @@ void kll_sketch<T, C, S, A>::populate_work_arrays(const kll_sketch& other, T* wo
   }
 }
 
+// this leaves items_ uninitialized (all objects moved out and destroyed)
+// this version moves objects from the incoming sketch
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::populate_work_arrays(kll_sketch&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
+  worklevels[0] = 0;
+
+  // the level zero data from "other" was already inserted into "this"
+  kll_helper::move_construct<T>(items_, levels_[0], levels_[1], workbuf, 0, true);
+  worklevels[1] = safe_level_size(0);
+
+  for (uint8_t lvl = 1; lvl < provisional_num_levels; lvl++) {
+    const uint32_t self_pop = safe_level_size(lvl);
+    const uint32_t other_pop = other.safe_level_size(lvl);
+    worklevels[lvl + 1] = worklevels[lvl] + self_pop + other_pop;
+
+    if ((self_pop > 0) and (other_pop == 0)) {
+      kll_helper::move_construct<T>(items_, levels_[lvl], levels_[lvl] + self_pop, workbuf, worklevels[lvl], true);
+    } else if ((self_pop == 0) and (other_pop > 0)) {
+      kll_helper::move_construct<T>(other.items_, other.levels_[lvl], other.levels_[lvl] + other_pop, workbuf, worklevels[lvl], false);
+    } else if ((self_pop > 0) and (other_pop > 0)) {
+      kll_helper::merge_sorted_arrays<T, C>(items_, levels_[lvl], self_pop, other.items_, other.levels_[lvl], other_pop, workbuf, worklevels[lvl]);
+    }
+  }
+}
+
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::assert_correct_total_weight() const {
   const uint64_t total(kll_helper::sum_the_sample_weights(num_levels_, levels_));
diff --git a/kll/test/kll_sketch_custom_type_test.cpp b/kll/test/kll_sketch_custom_type_test.cpp
index de3b193..eeee0a5 100644
--- a/kll/test/kll_sketch_custom_type_test.cpp
+++ b/kll/test/kll_sketch_custom_type_test.cpp
@@ -36,6 +36,7 @@ class kll_sketch_custom_type_test: public CppUnit::TestFixture {
   CPPUNIT_TEST(merge_small);
   CPPUNIT_TEST(merge_higher_levels);
   CPPUNIT_TEST(serialize_deserialize);
+  CPPUNIT_TEST(moving_merge);
   CPPUNIT_TEST_SUITE_END();
 
 public:
@@ -150,6 +151,17 @@ public:
     CPPUNIT_ASSERT_EQUAL(sketch1.get_rank(n / 2), sketch2.get_rank(n / 2));
   }
 
+  void moving_merge() {
+    kll_test_type_sketch sketch1(8);
+    for (int i = 0; i < 10; i++) sketch1.update(i);
+    kll_test_type_sketch sketch2(8);
+    sketch2.update(10);
+    sketch2.merge(std::move(sketch1));
+    CPPUNIT_ASSERT_EQUAL(0, sketch2.get_min_value().get_value());
+    CPPUNIT_ASSERT_EQUAL(10, sketch2.get_max_value().get_value());
+    CPPUNIT_ASSERT_EQUAL(11, (int) sketch2.get_n());
+  }
+
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION(kll_sketch_custom_type_test);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org