You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2019/07/20 00:19:19 UTC

[incubator-datasketches-cpp] 01/01: no default construction of items

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch kll_no_default_construction
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git

commit 8e94cf03fb6be96f19560e0cfe463f8a4a6c7e9e
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Fri Jul 19 17:18:59 2019 -0700

    no default construction of items
---
 kll/include/kll_helper.hpp         | 209 ++++++++++++++++++-------------
 kll/include/kll_sketch.hpp         | 246 +++++++++++++++++++++----------------
 kll/test/kll_sketch_validation.cpp |   2 +-
 3 files changed, 263 insertions(+), 194 deletions(-)

diff --git a/kll/include/kll_helper.hpp b/kll/include/kll_helper.hpp
index c7f04fd..e39cb7a 100644
--- a/kll/include/kll_helper.hpp
+++ b/kll/include/kll_helper.hpp
@@ -23,10 +23,11 @@
 #include <random>
 #include <stdexcept>
 #include <algorithm>
+#include <chrono>
 
 namespace datasketches {
 
-static std::independent_bits_engine<std::mt19937, 1, uint32_t> random_bit;
+static std::independent_bits_engine<std::mt19937, 1, uint32_t> random_bit(std::chrono::system_clock::now().time_since_epoch().count());
 
 #ifdef KLL_VALIDATION
 extern uint32_t kll_next_offset;
@@ -51,7 +52,7 @@ class kll_helper {
 
     static uint8_t floor_of_log2_of_fraction(uint64_t numer, uint64_t denom) {
       if (denom > numer) return 0;
-      uint8_t count(0);
+      uint8_t count = 0;
       while (true) {
         denom <<= 1;
         if (denom > numer) return count;
@@ -65,7 +66,7 @@ class kll_helper {
     }
 
     static uint32_t compute_total_capacity(uint16_t k, uint8_t m, uint8_t num_levels) {
-      uint32_t total(0);
+      uint32_t total = 0;
       for (uint8_t h = 0; h < num_levels; h++) {
         total += level_capacity(k, num_levels, h, m);
       }
@@ -74,31 +75,31 @@ class kll_helper {
 
     static uint32_t level_capacity(uint16_t k, uint8_t numLevels, uint8_t height, uint8_t min_wid) {
       if (height >= numLevels) throw std::invalid_argument("height >= numLevels");
-      const uint8_t depth(numLevels - height - 1);
+      const uint8_t depth = numLevels - height - 1;
       return std::max((uint32_t) min_wid, int_cap_aux(k, depth));
     }
 
     static uint32_t int_cap_aux(uint16_t k, uint8_t depth) {
       if (depth > 60) throw std::invalid_argument("depth > 60");
       if (depth <= 30) return int_cap_aux_aux(k, depth);
-      const uint8_t half(depth / 2);
-      const uint8_t rest(depth - half);
-      const uint32_t tmp(int_cap_aux_aux(k, half));
+      const uint8_t half = depth / 2;
+      const uint8_t rest = depth - half;
+      const uint32_t tmp = int_cap_aux_aux(k, half);
       return int_cap_aux_aux(tmp, rest);
     }
 
     static uint32_t int_cap_aux_aux(uint16_t k, uint8_t depth) {
       if (depth > 30) throw std::invalid_argument("depth > 30");
-      const uint64_t twok(k << 1); // for rounding, we pre-multiply by 2
-      const uint64_t tmp((uint64_t) (((uint64_t) twok << depth) / powers_of_three[depth]));
-      const uint64_t result((tmp + 1) >> 1); // then here we add 1 and divide by 2
+      const uint64_t twok = k << 1; // for rounding, we pre-multiply by 2
+      const uint64_t tmp = (uint64_t) (((uint64_t) twok << depth) / powers_of_three[depth]);
+      const uint64_t result = (tmp + 1) >> 1; // then here we add 1 and divide by 2
       if (result > k) throw std::logic_error("result > k");
       return result;
     }
 
     static uint64_t sum_the_sample_weights(uint8_t num_levels, const uint32_t* levels) {
-      uint64_t total(0);
-      uint64_t weight(1);
+      uint64_t total = 0;
+      uint64_t weight = 1;
       for (uint8_t lvl = 0; lvl < num_levels; lvl++) {
         total += weight * (levels[lvl + 1] - levels[lvl]);
         weight *= 2;
@@ -141,15 +142,15 @@ class kll_helper {
     template <typename T>
     static void randomly_halve_down(T* buf, uint32_t start, uint32_t length) {
       if (!is_even(length)) throw std::invalid_argument("length must be even");
-      const uint32_t half_length(length / 2);
+      const uint32_t half_length = length / 2;
     #ifdef KLL_VALIDATION
-      const uint32_t offset(deterministic_offset());
+      const uint32_t offset = deterministic_offset();
     #else
-      const uint32_t offset(random_bit());
+      const uint32_t offset = random_bit();
     #endif
-      uint32_t j(start + offset);
+      uint32_t j = start + offset;
       for (uint32_t i = start; i < (start + half_length); i++) {
-        buf[i] = std::move(buf[j]);
+        if (i != j) buf[i] = std::move(buf[j]);
         j += 2;
       }
     }
@@ -157,42 +158,70 @@ class kll_helper {
     template <typename T>
     static void randomly_halve_up(T* buf, uint32_t start, uint32_t length) {
       if (!is_even(length)) throw std::invalid_argument("length must be even");
-      const uint32_t half_length(length / 2);
+      const uint32_t half_length = length / 2;
     #ifdef KLL_VALIDATION
-      const uint32_t offset(deterministic_offset());
+      const uint32_t offset = deterministic_offset();
     #else
-      const uint32_t offset(random_bit());
+      const uint32_t offset = random_bit();
     #endif
-      uint32_t j((start + length) - 1 - offset);
+      uint32_t j = (start + length) - 1 - offset;
       for (uint32_t i = (start + length) - 1; i >= (start + half_length); i--) {
-        buf[i] = std::move(buf[j]);
+        if (i != j) buf[i] = std::move(buf[j]);
         j -= 2;
       }
     }
 
+    // this version assumes that destination has initialized objects
+    // moves objects from both buf_a and buf_b
+    // does not destroy the originals after the move
     template <typename T, typename C>
     static void merge_sorted_arrays(const T* buf_a, uint32_t start_a, uint32_t len_a, const T* buf_b, uint32_t start_b, uint32_t len_b, T* buf_c, uint32_t start_c) {
-      const uint32_t len_c(len_a + len_b);
-      const uint32_t lim_a(start_a + len_a);
-      const uint32_t lim_b(start_b + len_b);
-      const uint32_t lim_c(start_c + len_c);
+      const uint32_t len_c = len_a + len_b;
+      const uint32_t lim_a = start_a + len_a;
+      const uint32_t lim_b = start_b + len_b;
+      const uint32_t lim_c = start_c + len_c;
 
-      uint32_t a(start_a);
-      uint32_t b(start_b);
+      uint32_t a = start_a;
+      uint32_t b = start_b;
 
       for (uint32_t c = start_c; c < lim_c; c++) {
         if (a == lim_a) {
-          buf_c[c] = std::move(buf_b[b]);
-          b++;
+          buf_c[c] = std::move(buf_b[b++]);
         } else if (b == lim_b) {
-          buf_c[c] = std::move(buf_a[a]);
-          a++;
+          buf_c[c] = std::move(buf_a[a++]);
         } else if (C()(buf_a[a], buf_b[b])) {
-          buf_c[c] = std::move(buf_a[a]);
-          a++;
+          buf_c[c] = std::move(buf_a[a++]);
         } else {
-          buf_c[c] = std::move(buf_b[b]);
-          b++;
+          buf_c[c] = std::move(buf_b[b++]);
+        }
+      }
+      if (a != lim_a || b != lim_b) throw std::logic_error("inconsistent state");
+    }
+
+    // this version initializes objects at the destination
+    // moves objects from buf_a and destroys the originals
+    // copies objects from buf_b
+    template <typename T, typename C>
+    static void merge_sorted_arrays_special(const T* buf_a, uint32_t start_a, uint32_t len_a, const T* buf_b, uint32_t start_b, uint32_t len_b, T* buf_c, uint32_t start_c) {
+      const uint32_t len_c = len_a + len_b;
+      const uint32_t lim_a = start_a + len_a;
+      const uint32_t lim_b = start_b + len_b;
+      const uint32_t lim_c = start_c + len_c;
+
+      uint32_t a = start_a;
+      uint32_t b = start_b;
+
+      for (uint32_t c = start_c; c < lim_c; c++) {
+        if (a == lim_a) {
+          new (&buf_c[c]) T(buf_b[b++]);
+        } else if (b == lim_b) {
+          new (&buf_c[c]) T(std::move(buf_a[a]));
+          buf_a[a++].~T();
+        } else if (C()(buf_a[a], buf_b[b])) {
+          new (&buf_c[c]) T(std::move(buf_a[a]));
+          buf_a[a++].~T();
+        } else {
+          new (&buf_c[c]) T(buf_b[b++]);
         }
       }
       if (a != lim_a || b != lim_b) throw std::logic_error("inconsistent state");
@@ -201,7 +230,7 @@ class kll_helper {
     struct compress_result {
       uint8_t final_num_levels;
       uint32_t final_capacity;
-      uint32_t final_pop;
+      uint32_t final_num_items;
     };
 
     /*
@@ -215,106 +244,118 @@ class kll_helper {
      *        halve down, then merge up.
      *   Adjust the boundaries of the level above.
      *
-     * It can be proved that generalCompress returns a sketch that satisfies the space constraints
+     * It can be proved that general_compress returns a sketch that satisfies the space constraints
      * no matter how much data is passed in.
-     * We are pretty sure that it works correctly when inBuf and outBuf are the same.
      * All levels except for level zero must be sorted before calling this, and will still be
      * sorted afterwards.
      * Level zero is not required to be sorted before, and may not be sorted afterwards.
-     *
-     * trashes inBuf and inLevels
-     * modifies outBuf and outLevels
-     *
-     * returns (finalNumLevels, finalCapacity, finalItemCount)
      */
     template <typename T, typename C>
-    static compress_result general_compress(uint16_t k, uint8_t m, uint8_t num_levels_in, T* in_buf,
-            uint32_t* in_levels, T* out_buf, uint32_t* out_levels, bool is_level_zero_sorted)
+    static compress_result general_compress(uint16_t k, uint8_t m, uint8_t num_levels_in, T* items,
+            uint32_t* in_levels, uint32_t* out_levels, bool is_level_zero_sorted)
     {
       if (num_levels_in == 0) throw std::invalid_argument("num_levels_in == 0"); // things are too weird if zero levels are allowed
-      uint8_t num_levels(num_levels_in);
-      uint32_t current_item_count(in_levels[num_levels] - in_levels[0]); // decreases with each compaction
-      uint32_t target_item_count = compute_total_capacity(k, m, num_levels); // increases if we add levels
+      const uint32_t starting_item_count = in_levels[num_levels_in] - in_levels[0];
+      uint8_t current_num_levels = num_levels_in;
+      uint32_t current_item_count = starting_item_count; // decreases with each compaction
+      uint32_t target_item_count = compute_total_capacity(k, m, current_num_levels); // increases if we add levels
       bool done_yet = false;
       out_levels[0] = 0;
-      uint8_t cur_level = 0;
+      uint8_t current_level = 0;
       while (!done_yet) {
 
         // If we are at the current top level, add an empty level above it for convenience,
         // but do not increment num_levels until later
-        if (cur_level == (num_levels - 1)) {
-          in_levels[cur_level + 2] = in_levels[cur_level + 1];
+        if (current_level == (current_num_levels - 1)) {
+          in_levels[current_level + 2] = in_levels[current_level + 1];
         }
 
-        const auto raw_beg(in_levels[cur_level]);
-        const auto raw_lim(in_levels[cur_level + 1]);
-        const auto raw_pop(raw_lim - raw_beg);
+        const auto raw_beg = in_levels[current_level];
+        const auto raw_lim = in_levels[current_level + 1];
+        const auto raw_pop = raw_lim - raw_beg;
 
-        if ((current_item_count < target_item_count) or (raw_pop < level_capacity(k, num_levels, cur_level, m))) {
+        if ((current_item_count < target_item_count) or (raw_pop < level_capacity(k, current_num_levels, current_level, m))) {
           // move level over as is
-          // because in_buf and out_buf could be the same, make sure we are not moving data upwards!
-          if (raw_beg < out_levels[cur_level]) throw std::logic_error("wrong move");
-          std::move(&in_buf[raw_beg], &in_buf[raw_lim], &out_buf[out_levels[cur_level]]);
-          out_levels[cur_level + 1] = out_levels[cur_level] + raw_pop;
+          // make sure we are not moving data upwards
+          if (raw_beg < out_levels[current_level]) throw std::logic_error("wrong move");
+          std::move(&items[raw_beg], &items[raw_lim], &items[out_levels[current_level]]);
+          out_levels[current_level + 1] = out_levels[current_level] + raw_pop;
         } else {
           // The sketch is too full AND this level is too full, so we compact it
           // Note: this can add a level and thus change the sketches capacities
 
-          const auto pop_above(in_levels[cur_level + 2] - raw_lim);
-          const bool odd_pop(is_odd(raw_pop));
-          const auto adj_beg(odd_pop ? 1 + raw_beg : raw_beg);
-          const auto adj_pop(odd_pop ? raw_pop - 1 : raw_pop);
-          const auto half_adj_pop(adj_pop / 2);
+          const auto pop_above = in_levels[current_level + 2] - raw_lim;
+          const bool odd_pop = is_odd(raw_pop);
+          const auto adj_beg = odd_pop ? 1 + raw_beg : raw_beg;
+          const auto adj_pop = odd_pop ? raw_pop - 1 : raw_pop;
+          const auto half_adj_pop = adj_pop / 2;
 
           if (odd_pop) { // copy one guy over
-            out_buf[out_levels[cur_level]] = in_buf[raw_beg];
-            out_levels[cur_level + 1] = out_levels[cur_level] + 1;
+            items[out_levels[current_level]] = std::move(items[raw_beg]);
+            out_levels[current_level + 1] = out_levels[current_level] + 1;
           } else { // copy zero guys over
-            out_levels[cur_level + 1] = out_levels[cur_level];
+            out_levels[current_level + 1] = out_levels[current_level];
           }
 
           // level zero might not be sorted, so we must sort it if we wish to compact it
-          if ((cur_level == 0) and !is_level_zero_sorted) {
-            std::sort(&in_buf[adj_beg], &in_buf[adj_beg + adj_pop]);
+          if ((current_level == 0) and !is_level_zero_sorted) {
+            std::sort(&items[adj_beg], &items[adj_beg + adj_pop], C());
           }
 
           if (pop_above == 0) { // Level above is empty, so halve up
-            randomly_halve_up(in_buf, adj_beg, adj_pop);
+            randomly_halve_up(items, adj_beg, adj_pop);
           } else { // Level above is nonempty, so halve down, then merge up
-            randomly_halve_down(in_buf, adj_beg, adj_pop);
-            merge_sorted_arrays<T, C>(in_buf, adj_beg, half_adj_pop, in_buf, raw_lim, pop_above, in_buf, adj_beg + half_adj_pop);
+            randomly_halve_down(items, adj_beg, adj_pop);
+            merge_sorted_arrays<T, C>(items, adj_beg, half_adj_pop, items, raw_lim, pop_above, items, adj_beg + half_adj_pop);
           }
 
           // track the fact that we just eliminated some data
           current_item_count -= half_adj_pop;
 
           // adjust the boundaries of the level above
-          in_levels[cur_level + 1] = in_levels[cur_level + 1] - half_adj_pop;
+          in_levels[current_level + 1] = in_levels[current_level + 1] - half_adj_pop;
 
-          // Increment numLevels if we just compacted the old top level
-          // This creates some more capacity (the size of the new bottom level)
-          if (cur_level == (num_levels - 1)) {
-            num_levels++;
-            target_item_count += level_capacity(k, num_levels, 0, m);
+          // increment num_levels if we just compacted the old top level
+          // this creates some more capacity (the size of the new bottom level)
+          if (current_level == (current_num_levels - 1)) {
+            current_num_levels++;
+            target_item_count += level_capacity(k, current_num_levels, 0, m);
           }
 
         } // end of code for compacting a level
 
         // determine whether we have processed all levels yet (including any new levels that we created)
 
-        if (cur_level == (num_levels - 1)) done_yet = true;
-        cur_level++;
+        if (current_level == (current_num_levels - 1)) done_yet = true;
+        current_level++;
       } // end of loop over levels
 
-      if ((out_levels[num_levels] - out_levels[0]) != current_item_count) throw std::logic_error("inconsistent state");
+      if ((out_levels[current_num_levels] - out_levels[0]) != current_item_count) throw std::logic_error("inconsistent state");
+
+      for (uint32_t i = current_item_count; i < starting_item_count; i++) items[i].~T();
 
       compress_result result;
-      result.final_num_levels = num_levels;
+      result.final_num_levels = current_num_levels;
       result.final_capacity = target_item_count;
-      result.final_pop = current_item_count;
+      result.final_num_items = current_item_count;
       return result;
     }
 
+    template<typename T>
+    static void copy_construct(const T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first) {
+      while (src_first != src_last) {
+        new (&dst[dst_first++]) T(src[src_first++]);
+      }
+    }
+
+    template<typename T>
+    static void move_construct(T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first) {
+      while (src_first != src_last) {
+        new (&dst[dst_first++]) T(std::move(src[src_first]));
+        src[src_first++].~T();
+      }
+    }
+
 #ifdef KLL_VALIDATION
   private:
 
diff --git a/kll/include/kll_sketch.hpp b/kll/include/kll_sketch.hpp
index 9d65b82..64217c6 100644
--- a/kll/include/kll_sketch.hpp
+++ b/kll/include/kll_sketch.hpp
@@ -162,6 +162,7 @@ class kll_sketch {
     ~kll_sketch();
     kll_sketch& operator=(kll_sketch other);
     void update(const T& value);
+    void update(T&& value);
     void merge(const kll_sketch& other);
     bool is_empty() const;
     uint64_t get_n() const;
@@ -196,8 +197,8 @@ class kll_sketch {
       }
       // the last integer in the levels_ array is not serialized because it can be derived
       uint32_t size = DATA_START + num_levels_ * sizeof(uint32_t);
-      size += S().size_of_item(min_value_);
-      size += S().size_of_item(max_value_);
+      size += S().size_of_item(*min_value_);
+      size += S().size_of_item(*max_value_);
       for (auto& it: *this) size += S().size_of_item(it.first);
       return size;
     }
@@ -267,8 +268,8 @@ class kll_sketch {
     uint8_t levels_size_;
     T* items_;
     uint32_t items_size_;
-    T min_value_;
-    T max_value_;
+    T* min_value_;
+    T* max_value_;
     bool is_level_zero_sorted_;
 
     // for deserialization
@@ -279,6 +280,9 @@ class kll_sketch {
     // the common part of the preamble was read and compatibility checks were done
     kll_sketch(uint16_t k, uint8_t flags_byte, const void* bytes, size_t size);
 
+    // common update code
+    uint32_t internal_update(const T& value);
+
     // The following code is only valid in the special case of exactly reaching capacity while updating.
     // It cannot be used while merging, while reducing k, or anything else.
     void compress_while_updating(void);
@@ -340,11 +344,8 @@ kll_sketch<T, C, S, A>::kll_sketch(uint16_t k) {
   levels_ = new (AllocU32().allocate(2)) uint32_t[2] {k_, k_};
   items_size_ = k_;
   items_ = A().allocate(items_size_);
-  for (unsigned i = 0; i < items_size_; i++) A().construct(&items_[i], T());
-  if (std::is_floating_point<T>::value) {
-    min_value_ = std::numeric_limits<T>::quiet_NaN();
-    max_value_ = std::numeric_limits<T>::quiet_NaN();
-  }
+  min_value_ = A().allocate(1);
+  max_value_ = A().allocate(1);
   is_level_zero_sorted_ = false;
 }
 
@@ -360,9 +361,11 @@ kll_sketch<T, C, S, A>::kll_sketch(const kll_sketch& other) {
   std::copy(&other.levels_[0], &other.levels_[levels_size_], levels_);
   items_size_ = other.items_size_;
   items_ = A().allocate(items_size_);
-  for (unsigned i = 0; i < items_size_; i++) A().construct(&items_[i], other.items_[i]);
-  min_value_ = other.min_value_;
-  max_value_ = other.max_value_;
+  std::copy(other.items_, &other.items_[items_size_], items_);
+  min_value_ = A().allocate(1);
+  max_value_ = A().allocate(1);
+  new (min_value_) T(*other.min_value_);
+  new (max_value_) T(*other.max_value_);
   is_level_zero_sorted_ = other.is_level_zero_sorted_;
 }
 
@@ -385,26 +388,46 @@ kll_sketch<T, C, S, A>& kll_sketch<T, C, S, A>::operator=(kll_sketch other) {
 
 template<typename T, typename C, typename S, typename A>
 kll_sketch<T, C, S, A>::~kll_sketch() {
+  const uint32_t begin = levels_[0];
+  const uint32_t end = begin + get_num_retained();
+  for (uint32_t i = begin; i < end; i++) items_[i].~T();
+  if (!is_empty()) {
+    min_value_->~T();
+    max_value_->~T();
+  }
   AllocU32().deallocate(levels_, levels_size_);
-  for (unsigned i = 0; i < items_size_; i++) A().destroy(&items_[i]);
   A().deallocate(items_, items_size_);
+  A().deallocate(min_value_, 1);
+  A().deallocate(max_value_, 1);
 }
 
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::update(const T& value) {
+  const uint32_t next_pos = internal_update(value);
+  new (&items_[next_pos]) T(value);
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::update(T&& value) {
+  const uint32_t next_pos = internal_update(value);
+  new (&items_[next_pos]) T(std::move(value));
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t kll_sketch<T, C, S, A>::internal_update(const T& value) {
   if (is_empty()) {
-    min_value_ = value;
-    max_value_ = value;
+    new (min_value_) T(value);
+    new (max_value_) T(value);
   } else {
-    if (C()(value, min_value_)) min_value_ = value;
-    if (C()(max_value_, value)) max_value_ = value;
+    if (C()(value, *min_value_)) *min_value_ = value;
+    if (C()(*max_value_, value)) *max_value_ = value;
   }
   if (levels_[0] == 0) compress_while_updating();
   n_++;
   is_level_zero_sorted_ = false;
   const uint32_t next_pos(levels_[0] - 1);
   levels_[0] = next_pos;
-  items_[next_pos] = value;
+  return next_pos;
 }
 
 template<typename T, typename C, typename S, typename A>
@@ -413,19 +436,20 @@ void kll_sketch<T, C, S, A>::merge(const kll_sketch& other) {
   if (m_ != other.m_) {
     throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
   }
-  const uint64_t final_n(n_ + other.n_);
+  const uint64_t final_n = n_ + other.n_;
   for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
     update(other.items_[i]);
   }
-  if (other.num_levels_ >= 2) {
-    merge_higher_levels(other, final_n);
+  if (is_empty()) {
+    new (min_value_) T(*other.min_value_);
+    new (max_value_) T(*other.max_value_);
+  } else {
+    if (C()(*other.min_value_, *min_value_)) *min_value_ = *other.min_value_;
+    if (C()(*max_value_, *other.max_value_)) *max_value_ = *other.max_value_;
   }
+  if (other.num_levels_ >= 2) merge_higher_levels(other, final_n);
   n_ = final_n;
-  if (other.is_estimation_mode()) {
-    min_k_ = std::min(min_k_, other.min_k_);
-  }
-  if (is_empty() or C()(other.min_value_, min_value_)) min_value_ = other.min_value_;
-  if (is_empty() or C()(max_value_, other.max_value_)) max_value_ = other.max_value_;
+  if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
   assert_correct_total_weight();
 }
 
@@ -457,7 +481,7 @@ T kll_sketch<T, C, S, A>::get_min_value() const {
     }
     throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values");
   }
-  return min_value_;
+  return *min_value_;
 }
 
 template<typename T, typename C, typename S, typename A>
@@ -468,7 +492,7 @@ T kll_sketch<T, C, S, A>::get_max_value() const {
     }
     throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values");
   }
-  return max_value_;
+  return *max_value_;
 }
 
 template<typename T, typename C, typename S, typename A>
@@ -479,8 +503,8 @@ T kll_sketch<T, C, S, A>::get_quantile(double fraction) const {
     }
     throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values");
   }
-  if (fraction == 0.0) return min_value_;
-  if (fraction == 1.0) return max_value_;
+  if (fraction == 0.0) return *min_value_;
+  if (fraction == 1.0) return *max_value_;
   if ((fraction < 0.0) or (fraction > 1.0)) {
     throw std::invalid_argument("Fraction cannot be less than zero or greater than 1.0");
   }
@@ -499,8 +523,8 @@ std::unique_ptr<T[]> kll_sketch<T, C, S, A>::get_quantiles(const double* fractio
     if ((fraction < 0.0) or (fraction > 1.0)) {
       throw std::invalid_argument("Fraction cannot be less than zero or greater than 1.0");
     }
-    if      (fraction == 0.0) quantiles[i] = min_value_;
-    else if (fraction == 1.0) quantiles[i] = max_value_;
+    if      (fraction == 0.0) quantiles[i] = *min_value_;
+    else if (fraction == 1.0) quantiles[i] = *max_value_;
     else {
       if (!quantile_calculator) {
         // has side effect of sorting level zero if needed
@@ -575,8 +599,8 @@ void kll_sketch<T, C, S, A>::serialize(std::ostream& os) const {
     os.write((char*)&num_levels_, sizeof(num_levels_));
     os.write((char*)&unused, sizeof(unused));
     os.write((char*)levels_, sizeof(levels_[0]) * num_levels_);
-    S().serialize(os, &min_value_, 1);
-    S().serialize(os, &max_value_, 1);
+    S().serialize(os, min_value_, 1);
+    S().serialize(os, max_value_, 1);
   }
   S().serialize(os, &items_[levels_[0]], get_num_retained());
 }
@@ -615,8 +639,8 @@ std::pair<void_ptr_with_deleter, const size_t> kll_sketch<T, C, S, A>::serialize
       copy_to_mem(&num_levels_, &ptr, sizeof(num_levels_));
       copy_to_mem(&unused, &ptr, sizeof(unused));
       copy_to_mem(levels_, &ptr, sizeof(levels_[0]) * num_levels_);
-      ptr += S().serialize(ptr, &min_value_, 1);
-      ptr += S().serialize(ptr, &max_value_, 1);
+      ptr += S().serialize(ptr, min_value_, 1);
+      ptr += S().serialize(ptr, max_value_, 1);
     }
     ptr += S().serialize(ptr, &items_[levels_[0]], get_num_retained());
   }
@@ -717,9 +741,11 @@ kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint8_t flags_byte, std::istream&
     is.read((char*)levels_, sizeof(levels_[0]) * num_levels_);
   }
   levels_[num_levels_] = capacity;
+  min_value_ = A().allocate(1);
+  max_value_ = A().allocate(1);
   if (!is_single_item) {
-    S().deserialize(is, &min_value_, 1);
-    S().deserialize(is, &max_value_, 1);
+    S().deserialize(is, min_value_, 1);
+    S().deserialize(is, max_value_, 1);
   }
   items_ = A().allocate(capacity);
   items_size_ = capacity;
@@ -727,8 +753,8 @@ kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint8_t flags_byte, std::istream&
   const auto num_items = levels_[num_levels_] - levels_[0];
   S().deserialize(is, &items_[levels_[0]], num_items);
   if (is_single_item) {
-    min_value_ = items_[levels_[0]];
-    max_value_ = items_[levels_[0]];
+    new (min_value_) T(items_[levels_[0]]);
+    new (max_value_) T(items_[levels_[0]]);
   }
   is_level_zero_sorted_ = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
 }
@@ -761,9 +787,11 @@ kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint8_t flags_byte, const void* b
     copy_from_mem(&ptr, levels_, sizeof(levels_[0]) * num_levels_);
   }
   levels_[num_levels_] = capacity;
+  min_value_ = A().allocate(1);
+  max_value_ = A().allocate(1);
   if (!is_single_item) {
-    ptr += S().deserialize(ptr, &min_value_, 1);
-    ptr += S().deserialize(ptr, &max_value_, 1);
+    ptr += S().deserialize(ptr, min_value_, 1);
+    ptr += S().deserialize(ptr, max_value_, 1);
   }
   items_ = A().allocate(capacity);
   items_size_ = capacity;
@@ -771,8 +799,8 @@ kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint8_t flags_byte, const void* b
   const auto num_items(levels_[num_levels_] - levels_[0]);
   ptr += S().deserialize(ptr, &items_[levels_[0]], num_items);
   if (is_single_item) {
-    min_value_ = items_[levels_[0]];
-    max_value_ = items_[levels_[0]];
+    new (min_value_) T(items_[levels_[0]]);
+    new (max_value_) T(items_[levels_[0]]);
   }
   is_level_zero_sorted_ = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
   if (ptr != static_cast<const char*>(bytes) + size) throw std::logic_error("deserialized size mismatch");
@@ -782,27 +810,28 @@ kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint8_t flags_byte, const void* b
 // It cannot be used while merging, while reducing k, or anything else.
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::compress_while_updating(void) {
-  const uint8_t level(find_level_to_compact());
+  const uint8_t level = find_level_to_compact();
 
   // It is important to add the new top level right here. Be aware that this operation
   // grows the buffer and shifts the data and also the boundaries of the data and grows the
-  // levels array and increments numLevels_
+  // levels array and increments num_levels_
   if (level == (num_levels_ - 1)) {
     add_empty_top_level_to_completely_full_sketch();
   }
 
-  const uint32_t raw_beg(levels_[level]);
-  const uint32_t raw_lim(levels_[level + 1]);
+  const uint32_t raw_beg = levels_[level];
+  const uint32_t raw_lim = levels_[level + 1];
   // +2 is OK because we already added a new top level if necessary
-  const uint32_t pop_above(levels_[level + 2] - raw_lim);
-  const uint32_t raw_pop(raw_lim - raw_beg);
-  const bool odd_pop(kll_helper::is_odd(raw_pop));
-  const uint32_t adj_beg(odd_pop ? raw_beg + 1 : raw_beg);
-  const uint32_t adj_pop(odd_pop ? raw_pop - 1 : raw_pop);
-  const uint32_t half_adj_pop(adj_pop / 2);
+  const uint32_t pop_above = levels_[level + 2] - raw_lim;
+  const uint32_t raw_pop = raw_lim - raw_beg;
+  const bool odd_pop = kll_helper::is_odd(raw_pop);
+  const uint32_t adj_beg = odd_pop ? raw_beg + 1 : raw_beg;
+  const uint32_t adj_pop = odd_pop ? raw_pop - 1 : raw_pop;
+  const uint32_t half_adj_pop = adj_pop / 2;
 
   // level zero might not be sorted, so we must sort it if we wish to compact it
-  if (level == 0) {
+  // sort_level_zero() is not used here because of the adjustment for odd number of items
+  if ((level == 0) and !is_level_zero_sorted_) {
     std::sort(&items_[adj_beg], &items_[adj_beg + adj_pop], C());
   }
   if (pop_above == 0) {
@@ -814,32 +843,36 @@ void kll_sketch<T, C, S, A>::compress_while_updating(void) {
   levels_[level + 1] -= half_adj_pop; // adjust boundaries of the level above
   if (odd_pop) {
     levels_[level] = levels_[level + 1] - 1; // the current level now contains one item
-    items_[levels_[level]] = items_[raw_beg]; // namely this leftover guy
+    items_[levels_[level]] = std::move(items_[raw_beg]); // namely this leftover guy
   } else {
     levels_[level] = levels_[level + 1]; // the current level is now empty
   }
 
-  // verify that we freed up halfAdjPop array slots just below the current level
+  // verify that we freed up half_adj_pop array slots just below the current level
   assert (levels_[level] == (raw_beg + half_adj_pop));
 
   // finally, we need to shift up the data in the levels below
   // so that the freed-up space can be used by level zero
   if (level > 0) {
-    const uint32_t amount(raw_beg - levels_[0]);
-    std::move_backward(&items_[levels_[0]], &items_[levels_[0] + amount], &items_[levels_[0] + half_adj_pop + amount]);
-    for (uint8_t lvl = 0; lvl < level; lvl++) {
-      levels_[lvl] += half_adj_pop;
+    const uint32_t amount = raw_beg - levels_[0];
+    const uint32_t first = levels_[0];
+    uint32_t last = levels_[0] + amount;
+    while (first != last) {
+      --last;
+      items_[last + half_adj_pop] = std::move(items_[last]);
     }
+    for (uint8_t lvl = 0; lvl < level; lvl++) levels_[lvl] += half_adj_pop;
   }
+  for (uint32_t i = 0; i < half_adj_pop; i++) items_[i + raw_beg].~T();
 }
 
 template<typename T, typename C, typename S, typename A>
 uint8_t kll_sketch<T, C, S, A>::find_level_to_compact() const {
-  uint8_t level(0);
+  uint8_t level = 0;
   while (true) {
     assert (level < num_levels_);
-    const uint32_t pop(levels_[level + 1] - levels_[level]);
-    const uint32_t cap(kll_helper::level_capacity(k_, num_levels_, level, m_));
+    const uint32_t pop = levels_[level + 1] - levels_[level];
+    const uint32_t cap = kll_helper::level_capacity(k_, num_levels_, level, m_);
     if (pop >= cap) {
       return level;
     }
@@ -849,30 +882,31 @@ uint8_t kll_sketch<T, C, S, A>::find_level_to_compact() const {
 
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::add_empty_top_level_to_completely_full_sketch() {
-  const uint32_t cur_total_cap(levels_[num_levels_]);
+  const uint32_t cur_total_cap = levels_[num_levels_];
 
   // make sure that we are following a certain growth scheme
   assert (levels_[0] == 0);
   assert (items_size_ == cur_total_cap);
 
   // note that merging MIGHT over-grow levels_, in which case we might not have to grow it here
-  if (levels_size_ < (num_levels_ + 2)) {
-    uint32_t* new_levels(AllocU32().allocate(num_levels_ + 2));
+  const uint8_t new_levels_size = num_levels_ + 2;
+  if (levels_size_ < (new_levels_size)) {
+    uint32_t* new_levels = AllocU32().allocate(new_levels_size);
     std::copy(&levels_[0], &levels_[levels_size_], new_levels);
     AllocU32().deallocate(levels_, levels_size_);
     levels_ = new_levels;
-    levels_size_ = num_levels_ + 2;
+    levels_size_ = new_levels_size;
   }
 
-  const uint32_t delta_cap(kll_helper::level_capacity(k_, num_levels_ + 1, 0, m_));
-  const uint32_t new_total_cap(cur_total_cap + delta_cap);
-
-  T* new_buf(A().allocate(new_total_cap));
-  for (unsigned i = 0; i < new_total_cap; i++) A().construct(&new_buf[i], T());
+  const uint32_t delta_cap = kll_helper::level_capacity(k_, num_levels_ + 1, 0, m_);
+  const uint32_t new_total_cap = cur_total_cap + delta_cap;
 
   // move (and shift) the current data into the new buffer
-  std::move(&items_[levels_[0]], &items_[levels_[0] + cur_total_cap], &new_buf[levels_[0] + delta_cap]);
-  for (unsigned i = 0; i < items_size_; i++) A().destroy(&items_[i]);
+  T* new_buf = A().allocate(new_total_cap);
+  for (uint32_t i = 0; i < cur_total_cap; i++) {
+    new (&new_buf[i + delta_cap]) T(std::move(items_[i]));
+    items_[i].~T();
+  }
   A().deallocate(items_, items_size_);
   items_ = new_buf;
   items_size_ = new_total_cap;
@@ -976,8 +1010,9 @@ void kll_sketch<T, C, S, A>::increment_buckets_sorted_level(uint32_t from_index,
 
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::merge_higher_levels(const kll_sketch& other, uint64_t final_n) {
-  const uint32_t tmp_space_needed(get_num_retained() + other.get_num_retained_above_level_zero());
-  const std::unique_ptr<T[]> workbuf(new T[tmp_space_needed]);
+  const uint32_t tmp_num_items = get_num_retained() + other.get_num_retained_above_level_zero();
+  auto deleter = [tmp_num_items](T* ptr) { A().deallocate(ptr, tmp_num_items); };
+  const std::unique_ptr<T, decltype(deleter)> workbuf(A().allocate(tmp_num_items), deleter);
   const uint8_t ub = kll_helper::ub_on_num_levels(final_n);
   const std::unique_ptr<uint32_t[]> worklevels(new uint32_t[ub + 2]); // ub+1 does not work
   const std::unique_ptr<uint32_t[]> outlevels(new uint32_t[ub + 2]);
@@ -986,48 +1021,42 @@ void kll_sketch<T, C, S, A>::merge_higher_levels(const kll_sketch& other, uint64
 
   populate_work_arrays(other, workbuf.get(), worklevels.get(), provisional_num_levels);
 
-  // notice that workbuf is being used as both the input and output here
   const kll_helper::compress_result result = kll_helper::general_compress<T, C>(k_, m_, provisional_num_levels, workbuf.get(),
-      worklevels.get(), workbuf.get(), outlevels.get(), is_level_zero_sorted_);
-  const uint8_t final_num_levels = result.final_num_levels;
-  const uint32_t final_capacity = result.final_capacity;
-  const uint32_t final_pop = result.final_pop;
+      worklevels.get(), outlevels.get(), is_level_zero_sorted_);
 
-  assert (final_num_levels <= ub); // can sometimes be much bigger
+  // ub can sometimes be much bigger
+  if (result.final_num_levels > ub) throw std::logic_error("merge error");
 
-  // now we need to transfer the results back into the "self" sketch
-  if (final_capacity != items_size_) {
+  // now we need to transfer the results back into "this" sketch
+  if (result.final_capacity != items_size_) {
     for (unsigned i = 0; i < items_size_; i++) A().destroy(&items_[i]);
     A().deallocate(items_, items_size_);
-    items_ = A().allocate(final_capacity);
-    items_size_ = final_capacity;
-    for (unsigned i = 0; i < items_size_; i++) A().construct(&items_[i], T());
+    items_size_ = result.final_capacity;
+    items_ = A().allocate(items_size_);
   }
-  const uint32_t free_space_at_bottom = final_capacity - final_pop;
-  std::move(&workbuf[outlevels[0]], &workbuf[outlevels[0] + final_pop], &items_[free_space_at_bottom]);
-  const uint32_t the_shift(free_space_at_bottom - outlevels[0]);
+  const uint32_t free_space_at_bottom = result.final_capacity - result.final_num_items;
+  kll_helper::move_construct<T>(workbuf.get(), outlevels[0], outlevels[0] + result.final_num_items, items_, free_space_at_bottom);
 
-  if (levels_size_ < (final_num_levels + 1)) {
+  if (levels_size_ < (result.final_num_levels + 1)) {
     AllocU32().deallocate(levels_, levels_size_);
-    levels_ = AllocU32().allocate(final_num_levels + 1);
-    levels_size_ = final_num_levels + 1;
+    levels_size_ = result.final_num_levels + 1;
+    levels_ = AllocU32().allocate(levels_size_);
   }
-
-  for (uint8_t lvl = 0; lvl < (final_num_levels + 1); lvl++) { // includes the "extra" index
-    levels_[lvl] = outlevels[lvl] + the_shift;
+  const uint32_t offset = free_space_at_bottom - outlevels[0];
+  for (uint8_t lvl = 0; lvl < levels_size_; lvl++) { // includes the "extra" index
+    levels_[lvl] = outlevels[lvl] + offset;
   }
-
-  num_levels_ = final_num_levels;
+  num_levels_ = result.final_num_levels;
 }
 
+// this leaves items_ uninitialized (all objects moved out and destroyed)
 template<typename T, typename C, typename S, typename A>
 void kll_sketch<T, C, S, A>::populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
   worklevels[0] = 0;
 
-  // Note: the level zero data from "other" was already inserted into "self"
-  const uint32_t self_pop_zero(safe_level_size(0));
-  std::move(&items_[levels_[0]], &items_[levels_[0] + self_pop_zero], &workbuf[worklevels[0]]);
-  worklevels[1] = worklevels[0] + self_pop_zero;
+  // the level zero data from "other" was already inserted into "this"
+  kll_helper::move_construct<T>(items_, levels_[0], levels_[1], workbuf, 0);
+  worklevels[1] = safe_level_size(0);
 
   for (uint8_t lvl = 1; lvl < provisional_num_levels; lvl++) {
     const uint32_t self_pop = safe_level_size(lvl);
@@ -1035,11 +1064,11 @@ void kll_sketch<T, C, S, A>::populate_work_arrays(const kll_sketch& other, T* wo
     worklevels[lvl + 1] = worklevels[lvl] + self_pop + other_pop;
 
     if ((self_pop > 0) and (other_pop == 0)) {
-      std::move(&items_[levels_[lvl]], &items_[levels_[lvl] + self_pop], &workbuf[worklevels[lvl]]);
+      kll_helper::move_construct<T>(items_, levels_[lvl], levels_[lvl] + self_pop, workbuf, worklevels[lvl]);
     } else if ((self_pop == 0) and (other_pop > 0)) {
-      std::copy(&other.items_[other.levels_[lvl]], &other.items_[other.levels_[lvl] + other_pop], &workbuf[worklevels[lvl]]);
+      kll_helper::copy_construct<T>(other.items_, other.levels_[lvl], other.levels_[lvl] + other_pop, workbuf, worklevels[lvl]);
     } else if ((self_pop > 0) and (other_pop > 0)) {
-      kll_helper::merge_sorted_arrays<T, C>(items_, levels_[lvl], self_pop, other.items_,
+      kll_helper::merge_sorted_arrays_special<T, C>(items_, levels_[lvl], self_pop, other.items_,
           other.levels_[lvl], other_pop, workbuf, worklevels[lvl]);
     }
   }
@@ -1123,10 +1152,9 @@ void kll_sketch<T, C, S, A>::to_stream(std::ostream& os, bool print_levels, bool
   os << "   Capacity items : " << items_size_ << std::endl;
   os << "   Retained items : " << get_num_retained() << std::endl;
   os << "   Storage bytes  : " << get_serialized_size_bytes() << std::endl;
-  using std::to_string; // to allow overriding to_string() for a custom type
   if (!is_empty()) {
-    os << "   Min value      : " << get_min_value() << std::endl;
-    os << "   Max value      : " << get_max_value() << std::endl;
+    os << "   Min value      : " << *min_value_ << std::endl;
+    os << "   Max value      : " << *max_value_ << std::endl;
   }
   os << "### End sketch summary" << std::endl;
 
diff --git a/kll/test/kll_sketch_validation.cpp b/kll/test/kll_sketch_validation.cpp
index c15205d..5333936 100644
--- a/kll/test/kll_sketch_validation.cpp
+++ b/kll/test/kll_sketch_validation.cpp
@@ -185,7 +185,7 @@ class kll_sketch_validation: public CppUnit::TestFixture {
         std::cout << " pass" << std::endl;
       } else {
         std::cout << " " << (correct_results[7 * i + 6]) << " != " << hashed_samples;
-        std::cout << sketch;
+        sketch.to_stream(std::cout);
         CPPUNIT_FAIL("fail");
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org