You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/02/17 21:32:04 UTC

[incubator-datasketches-cpp] branch kll_moving_merge created (now 4629902)

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a change to branch kll_moving_merge
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git.


      at 4629902  moving merge

This branch includes the following new commits:

     new 4629902  moving merge

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org


[incubator-datasketches-cpp] 01/01: moving merge

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch kll_moving_merge
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git

commit 462990200e22170aa2912433125c8a9801e7d16c
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Mon Feb 17 13:31:50 2020 -0800

    moving merge
---
 common/test/test_type.hpp                | 12 ++---
 kll/include/kll_sketch.hpp               | 13 +++---
 kll/include/kll_sketch_impl.hpp          | 76 ++++++++++++++++++++++++++++----
 kll/test/kll_sketch_custom_type_test.cpp | 12 +++++
 4 files changed, 90 insertions(+), 23 deletions(-)

diff --git a/common/test/test_type.hpp b/common/test/test_type.hpp
index 487da08..cf4a44f 100644
--- a/common/test/test_type.hpp
+++ b/common/test/test_type.hpp
@@ -29,25 +29,25 @@ class test_type {
 public:
   // no default constructor should be required
   test_type(int value): value(value) {
-    if (DEBUG) std::cerr << "A constructor" << std::endl;
+    if (DEBUG) std::cerr << "test_type constructor" << std::endl;
   }
   ~test_type() {
-    if (DEBUG) std::cerr << "A destructor" << std::endl;
+    if (DEBUG) std::cerr << "test_type destructor" << std::endl;
   }
   test_type(const test_type& other): value(other.value) {
-    if (DEBUG) std::cerr << "A copy constructor" << std::endl;
+    if (DEBUG) std::cerr << "test_type copy constructor" << std::endl;
   }
   // noexcept is important here so that, for instance, std::vector could move this type
   test_type(test_type&& other) noexcept : value(other.value) {
-    if (DEBUG) std::cerr << "A move constructor" << std::endl;
+    if (DEBUG) std::cerr << "test_type move constructor" << std::endl;
   }
   test_type& operator=(const test_type& other) {
-    if (DEBUG) std::cerr << "A copy assignment" << std::endl;
+    if (DEBUG) std::cerr << "test_type copy assignment" << std::endl;
     value = other.value;
     return *this;
   }
   test_type& operator=(test_type&& other) {
-    if (DEBUG) std::cerr << "A move assignment" << std::endl;
+    if (DEBUG) std::cerr << "test_type move assignment" << std::endl;
     value = other.value;
     return *this;
   }
diff --git a/kll/include/kll_sketch.hpp b/kll/include/kll_sketch.hpp
index 28f5fcb..d33c945 100644
--- a/kll/include/kll_sketch.hpp
+++ b/kll/include/kll_sketch.hpp
@@ -169,6 +169,7 @@ class kll_sketch {
     void update(const T& value);
     void update(T&& value);
     void merge(const kll_sketch& other);
+    void merge(kll_sketch&& other);
     bool is_empty() const;
     uint64_t get_n() const;
     uint32_t get_num_retained() const;
@@ -208,12 +209,6 @@ class kll_sketch {
       return size;
     }
 
-    // this may need to be specialized to return correct size if sizeof(T) does not match the actual serialized size of an item
-    // this method is for the user's convenience to predict the sketch size before serialization
-    // and is not used in the serialization and deserialization code
-    // predicting the size before serialization may not make sense if the item type is not of a fixed size (like string)
-    static size_t get_max_serialized_size_bytes(uint16_t k, uint64_t n);
-
     void serialize(std::ostream& os) const;
     typedef vector_u8<A> vector_bytes; // alias for users
     vector_bytes serialize(unsigned header_size_bytes = 0) const;
@@ -287,7 +282,8 @@ class kll_sketch {
     kll_sketch(uint16_t k, uint8_t flags_byte, const void* bytes, size_t size);
 
     // common update code
-    inline uint32_t internal_update(const T& value);
+    inline void update_min_max(const T& value);
+    inline uint32_t internal_update();
 
     // The following code is only valid in the special case of exactly reaching capacity while updating.
     // It cannot be used while merging, while reducing k, or anything else.
@@ -302,8 +298,9 @@ class kll_sketch {
         const T* split_points, uint32_t size, double* buckets) const;
     void increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
         const T* split_points, uint32_t size, double* buckets) const;
-    void merge_higher_levels(const kll_sketch& other, uint64_t final_n);
+    template<typename O> void merge_higher_levels(O&& other, uint64_t final_n);
     void populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
+    void populate_work_arrays(kll_sketch&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
     void assert_correct_total_weight() const;
     uint32_t safe_level_size(uint8_t level) const;
     uint32_t get_num_retained_above_level_zero() const;
diff --git a/kll/include/kll_sketch_impl.hpp b/kll/include/kll_sketch_impl.hpp
index efd76e2..55a97f6 100644
--- a/kll/include/kll_sketch_impl.hpp
+++ b/kll/include/kll_sketch_impl.hpp
@@ -153,18 +153,20 @@ kll_sketch<T, C, S, A>::~kll_sketch() {
 
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::update(const T& value) {
-  const uint32_t index = internal_update(value);
+  update_min_max(value);
+  const uint32_t index = internal_update();
   new (&items_[index]) T(value);
 }
 
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::update(T&& value) {
-  const uint32_t index = internal_update(value);
+  update_min_max(value);
+  const uint32_t index = internal_update();
   new (&items_[index]) T(std::move(value));
 }
 
 template<typename T, typename C, typename S, typename A>
-uint32_t kll_sketch<T, C, S, A>::internal_update(const T& value) {
+void kll_sketch<T, C, S, A>::update_min_max(const T& value) {
   if (is_empty()) {
     min_value_ = new (A().allocate(1)) T(value);
     max_value_ = new (A().allocate(1)) T(value);
@@ -172,6 +174,10 @@ uint32_t kll_sketch<T, C, S, A>::internal_update(const T& value) {
     if (C()(value, *min_value_)) *min_value_ = value;
     if (C()(*max_value_, value)) *max_value_ = value;
   }
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t kll_sketch<T, C, S, A>::internal_update() {
   if (levels_[0] == 0) compress_while_updating();
   n_++;
   is_level_zero_sorted_ = false;
@@ -184,10 +190,6 @@ void kll_sketch<T, C, S, A>::merge(const kll_sketch& other) {
   if (m_ != other.m_) {
     throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
   }
-  const uint64_t final_n = n_ + other.n_;
-  for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
-    update(other.items_[i]);
-  }
   if (is_empty()) {
     min_value_ = new (A().allocate(1)) T(*other.min_value_);
     max_value_ = new (A().allocate(1)) T(*other.max_value_);
@@ -195,6 +197,11 @@ void kll_sketch<T, C, S, A>::merge(const kll_sketch& other) {
     if (C()(*other.min_value_, *min_value_)) *min_value_ = *other.min_value_;
     if (C()(*max_value_, *other.max_value_)) *max_value_ = *other.max_value_;
   }
+  const uint64_t final_n = n_ + other.n_;
+  for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
+    const uint32_t index = internal_update();
+    new (&items_[index]) T(other.items_[i]);
+  }
   if (other.num_levels_ >= 2) merge_higher_levels(other, final_n);
   n_ = final_n;
   if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
@@ -202,6 +209,30 @@ void kll_sketch<T, C, S, A>::merge(const kll_sketch& other) {
 }
 
 template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::merge(kll_sketch&& other) {
+  if (other.is_empty()) return;
+  if (m_ != other.m_) {
+    throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
+  }
+  if (is_empty()) {
+    min_value_ = new (A().allocate(1)) T(std::move(*other.min_value_));
+    max_value_ = new (A().allocate(1)) T(std::move(*other.max_value_));
+  } else {
+    if (C()(*other.min_value_, *min_value_)) *min_value_ = std::move(*other.min_value_);
+    if (C()(*max_value_, *other.max_value_)) *max_value_ = std::move(*other.max_value_);
+  }
+  const uint64_t final_n = n_ + other.n_;
+  for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
+    const uint32_t index = internal_update();
+    new (&items_[index]) T(std::move(other.items_[i]));
+  }
+  if (other.num_levels_ >= 2) merge_higher_levels(std::forward<kll_sketch>(other), final_n);
+  n_ = final_n;
+  if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
+  assert_correct_total_weight();
+}
+
+template<typename T, typename C, typename S, typename A>
 bool kll_sketch<T, C, S, A>::is_empty() const {
   return n_ == 0;
 }
@@ -734,7 +765,8 @@ void kll_sketch<T, C, S, A>::increment_buckets_sorted_level(uint32_t from_index,
 }
 
 template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::merge_higher_levels(const kll_sketch& other, uint64_t final_n) {
+template<typename O>
+void kll_sketch<T, C, S, A>::merge_higher_levels(O&& other, uint64_t final_n) {
   const uint32_t tmp_num_items = get_num_retained() + other.get_num_retained_above_level_zero();
   auto tmp_items_deleter = [tmp_num_items](T* ptr) { A().deallocate(ptr, tmp_num_items); }; // no destructor needed
   const std::unique_ptr<T, decltype(tmp_items_deleter)> workbuf(A().allocate(tmp_num_items), tmp_items_deleter);
@@ -746,7 +778,7 @@ void kll_sketch<T, C, S, A>::merge_higher_levels(const kll_sketch& other, uint64
 
   const uint8_t provisional_num_levels = std::max(num_levels_, other.num_levels_);
 
-  populate_work_arrays(other, workbuf.get(), worklevels.get(), provisional_num_levels);
+  populate_work_arrays(std::forward<O>(other), workbuf.get(), worklevels.get(), provisional_num_levels);
 
   const kll_helper::compress_result result = kll_helper::general_compress<T, C>(k_, m_, provisional_num_levels, workbuf.get(),
       worklevels.get(), outlevels.get(), is_level_zero_sorted_);
@@ -776,6 +808,7 @@ void kll_sketch<T, C, S, A>::merge_higher_levels(const kll_sketch& other, uint64
 }
 
 // this leaves items_ uninitialized (all objects moved out and destroyed)
+// this version copies objects from the incoming sketch
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
   worklevels[0] = 0;
@@ -799,6 +832,31 @@ void kll_sketch<T, C, S, A>::populate_work_arrays(const kll_sketch& other, T* wo
   }
 }
 
+// this leaves items_ uninitialized (all objects moved out and destroyed)
+// this version moves objects from the incoming sketch
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::populate_work_arrays(kll_sketch&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
+  worklevels[0] = 0;
+
+  // the level zero data from "other" was already inserted into "this"
+  kll_helper::move_construct<T>(items_, levels_[0], levels_[1], workbuf, 0, true);
+  worklevels[1] = safe_level_size(0);
+
+  for (uint8_t lvl = 1; lvl < provisional_num_levels; lvl++) {
+    const uint32_t self_pop = safe_level_size(lvl);
+    const uint32_t other_pop = other.safe_level_size(lvl);
+    worklevels[lvl + 1] = worklevels[lvl] + self_pop + other_pop;
+
+    if ((self_pop > 0) and (other_pop == 0)) {
+      kll_helper::move_construct<T>(items_, levels_[lvl], levels_[lvl] + self_pop, workbuf, worklevels[lvl], true);
+    } else if ((self_pop == 0) and (other_pop > 0)) {
+      kll_helper::move_construct<T>(other.items_, other.levels_[lvl], other.levels_[lvl] + other_pop, workbuf, worklevels[lvl], false);
+    } else if ((self_pop > 0) and (other_pop > 0)) {
+      kll_helper::merge_sorted_arrays<T, C>(items_, levels_[lvl], self_pop, other.items_, other.levels_[lvl], other_pop, workbuf, worklevels[lvl]);
+    }
+  }
+}
+
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::assert_correct_total_weight() const {
   const uint64_t total(kll_helper::sum_the_sample_weights(num_levels_, levels_));
diff --git a/kll/test/kll_sketch_custom_type_test.cpp b/kll/test/kll_sketch_custom_type_test.cpp
index de3b193..eeee0a5 100644
--- a/kll/test/kll_sketch_custom_type_test.cpp
+++ b/kll/test/kll_sketch_custom_type_test.cpp
@@ -36,6 +36,7 @@ class kll_sketch_custom_type_test: public CppUnit::TestFixture {
   CPPUNIT_TEST(merge_small);
   CPPUNIT_TEST(merge_higher_levels);
   CPPUNIT_TEST(serialize_deserialize);
+  CPPUNIT_TEST(moving_merge);
   CPPUNIT_TEST_SUITE_END();
 
 public:
@@ -150,6 +151,17 @@ public:
     CPPUNIT_ASSERT_EQUAL(sketch1.get_rank(n / 2), sketch2.get_rank(n / 2));
   }
 
+  void moving_merge() {
+    kll_test_type_sketch sketch1(8);
+    for (int i = 0; i < 10; i++) sketch1.update(i);
+    kll_test_type_sketch sketch2(8);
+    sketch2.update(10);
+    sketch2.merge(std::move(sketch1));
+    CPPUNIT_ASSERT_EQUAL(0, sketch2.get_min_value().get_value());
+    CPPUNIT_ASSERT_EQUAL(10, sketch2.get_max_value().get_value());
+    CPPUNIT_ASSERT_EQUAL(11, (int) sketch2.get_n());
+  }
+
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION(kll_sketch_custom_type_test);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org