You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/08/12 22:50:14 UTC

[incubator-datasketches-cpp] branch kll_merge_blocks_fix created (now 7cafbef)

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a change to branch kll_merge_blocks_fix
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git.


      at 7cafbef  rewritten quantile calculator

This branch includes the following new commits:

     new 7cafbef  rewritten quantile calculator

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org


[incubator-datasketches-cpp] 01/01: rewritten quantile calculator

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch kll_merge_blocks_fix
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git

commit 7cafbef53f475b966c1984dce395a940c6cc93f5
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Wed Aug 12 15:49:52 2020 -0700

    rewritten quantile calculator
---
 kll/include/kll_quantile_calculator.hpp      |  37 +++---
 kll/include/kll_quantile_calculator_impl.hpp | 161 ++++++++++++---------------
 2 files changed, 93 insertions(+), 105 deletions(-)

diff --git a/kll/include/kll_quantile_calculator.hpp b/kll/include/kll_quantile_calculator.hpp
index f77071e..bc60f26 100644
--- a/kll/include/kll_quantile_calculator.hpp
+++ b/kll/include/kll_quantile_calculator.hpp
@@ -26,31 +26,38 @@ namespace datasketches {
 
 template <typename T, typename C, typename A>
 class kll_quantile_calculator {
-  typedef typename std::allocator_traits<A>::template rebind_alloc<uint32_t> AllocU32;
-  typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64;
   public:
     // assumes that all levels are sorted including level 0
     kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n);
-    ~kll_quantile_calculator();
     T get_quantile(double fraction) const;
 
   private:
+    using AllocU32 = typename std::allocator_traits<A>::template rebind_alloc<uint32_t>;
+    using vector_u32 = std::vector<uint32_t, AllocU32>;
+    using Entry = std::pair<T, uint64_t>;
+    using AllocEntry = typename std::allocator_traits<A>::template rebind_alloc<Entry>;
+    using Container = std::vector<Entry, AllocEntry>;
     uint64_t n_;
-    T* items_;
-    uint64_t* weights_;
-    uint32_t* levels_;
-    uint8_t levels_size_;
-    uint8_t num_levels_;
+    vector_u32 levels_;
+    Container entries_;
 
-    void populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
+    void populate_from_sketch(const T* items, const uint32_t* levels, uint8_t num_levels);
     T approximately_answer_positional_query(uint64_t pos) const;
-    static void convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size);
+    void convert_to_preceding_cummulative();
+    uint32_t chunk_containing_pos(uint64_t pos) const;
+    uint32_t search_for_chunk_containing_pos(uint64_t pos, uint32_t l, uint32_t r) const;
+    static void merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items);
+    static void merge_sorted_blocks_direct(Container& orig, Container& temp, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
+    static void merge_sorted_blocks_reversed(Container& orig, Container& temp, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
     static uint64_t pos_of_phi(double phi, uint64_t n);
-    static uint32_t chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos);
-    static uint32_t search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r);
-    static void blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
-    static void blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
-    static void tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2);
+
+    template<typename Comparator>
+    struct compare_pair_by_first {
+      template<typename Entry1, typename Entry2>
+      bool operator()(Entry1&& a, Entry2&& b) const {
+        return Comparator()(std::forward<Entry1>(a).first, std::forward<Entry2>(b).first);
+      }
+    };
 };
 
 } /* namespace datasketches */
diff --git a/kll/include/kll_quantile_calculator_impl.hpp b/kll/include/kll_quantile_calculator_impl.hpp
index d4f4c04..77aeb05 100644
--- a/kll/include/kll_quantile_calculator_impl.hpp
+++ b/kll/include/kll_quantile_calculator_impl.hpp
@@ -22,31 +22,24 @@
 
 #include <memory>
 #include <cmath>
+#include <algorithm>
+
+#include <iostream>
 
 #include "kll_helper.hpp"
 
 namespace datasketches {
 
 template <typename T, typename C, typename A>
-kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n) {
-  n_ = n;
+kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n):
+n_(n), levels_(num_levels + 1)
+{
   const uint32_t num_items = levels[num_levels] - levels[0];
-  items_ = A().allocate(num_items);
-  weights_ = AllocU64().allocate(num_items + 1); // one more is intentional
-  levels_size_ = num_levels + 1;
-  levels_ = AllocU32().allocate(levels_size_);
-  populate_from_sketch(items, num_items, levels, num_levels);
-  blocky_tandem_merge_sort(items_, weights_, num_items, levels_, num_levels_);
-  convert_to_preceding_cummulative(weights_, num_items + 1);
-}
-
-template <typename T, typename C, typename A>
-kll_quantile_calculator<T, C, A>::~kll_quantile_calculator() {
-  const uint32_t num_items = levels_[num_levels_] - levels_[0];
-  for (uint32_t i = 0; i < num_items; i++) items_[i].~T();
-  A().deallocate(items_, num_items);
-  AllocU64().deallocate(weights_, num_items + 1);
-  AllocU32().deallocate(levels_, levels_size_);
+  entries_.reserve(num_items);
+  populate_from_sketch(items, levels, num_levels);
+  merge_sorted_blocks(entries_, levels_.data(), levels_.size() - 1, num_items);
+  if (!is_sorted(entries_.begin(), entries_.end(), compare_pair_by_first<C>())) throw std::logic_error("entries must be sorted");
+  convert_to_preceding_cummulative();
 }
 
 template <typename T, typename C, typename A>
@@ -55,17 +48,18 @@ T kll_quantile_calculator<T, C, A>::get_quantile(double fraction) const {
 }
 
 template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
-  kll_helper::copy_construct<T>(items, levels[0], levels[num_levels], items_, 0);
-  uint8_t src_level = 0;
-  uint8_t dst_level = 0;
+void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, const uint32_t* levels, uint8_t num_levels) {
+  size_t src_level = 0;
+  size_t dst_level = 0;
   uint64_t weight = 1;
   uint32_t offset = levels[0];
   while (src_level < num_levels) {
     const uint32_t from_index(levels[src_level] - offset);
     const uint32_t to_index(levels[src_level + 1] - offset); // exclusive
     if (from_index < to_index) { // skip empty levels
-      std::fill(&weights_[from_index], &weights_[to_index], weight);
+      for (uint32_t i = from_index; i < to_index; ++i) {
+        entries_.push_back(Entry(items[i + offset], weight));
+      }
       levels_[dst_level] = from_index;
       levels_[dst_level + 1] = to_index;
       dst_level++;
@@ -73,24 +67,24 @@ void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, uint
     src_level++;
     weight *= 2;
   }
-  weights_[num_items] = 0;
-  num_levels_ = dst_level;
+  if (levels_.size() > static_cast<size_t>(dst_level + 1)) levels_.resize(dst_level + 1);
 }
 
 template <typename T, typename C, typename A>
 T kll_quantile_calculator<T, C, A>::approximately_answer_positional_query(uint64_t pos) const {
   if (pos >= n_) throw std::logic_error("position out of range");
-  const uint32_t weights_size(levels_[num_levels_] + 1);
-  const uint32_t index = chunk_containing_pos(weights_, weights_size, pos);
-  return items_[index];
+  const uint32_t num_items = levels_[levels_.size() - 1];
+  if (pos > entries_[num_items - 1].second) return entries_[num_items - 1].first;
+  const uint32_t index = chunk_containing_pos(pos);
+  return entries_[index].first;
 }
 
 template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size) {
-  uint64_t subtotal(0);
-  for (uint32_t i = 0; i < weights_size; i++) {
-    const uint32_t new_subtotal = subtotal + weights[i];
-    weights[i] = subtotal;
+void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative() {
+  uint64_t subtotal = 0;
+  for (auto& entry: entries_) {
+    const uint32_t new_subtotal = subtotal + entry.second;
+    entry.second = subtotal;
     subtotal = new_subtotal;
   }
 }
@@ -102,87 +96,74 @@ uint64_t kll_quantile_calculator<T, C, A>::pos_of_phi(double phi, uint64_t n) {
 }
 
 template <typename T, typename C, typename A>
-uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos) {
-  if (weights_size <= 1) throw std::logic_error("weights array too short"); // weights_ contains an "extra" position
-  const uint32_t nominal_length(weights_size - 1);
-  if (pos < weights[0]) throw std::logic_error("position too small");
-  if (pos >= weights[nominal_length]) throw std::logic_error("position too large");
-  return search_for_chunk_containing_pos(weights, pos, 0, nominal_length);
+uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t pos) const {
+  if (entries_.size() < 1) throw std::logic_error("array too short");
+  if (pos < entries_[0].second) throw std::logic_error("position too small");
+  if (pos > entries_[entries_.size() - 1].second) throw std::logic_error("position too large");
+  return search_for_chunk_containing_pos(pos, 0, entries_.size());
 }
 
 template <typename T, typename C, typename A>
-uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r) {
+uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(uint64_t pos, uint32_t l, uint32_t r) const {
   if (l + 1 == r) {
     return l;
   }
   const uint32_t m(l + (r - l) / 2);
-  if (arr[m] <= pos) {
-    return search_for_chunk_containing_pos(arr, pos, m, r);
+  if (entries_[m].second <= pos) {
+    return search_for_chunk_containing_pos(pos, m, r);
   }
-  return search_for_chunk_containing_pos(arr, pos, l, m);
+  return search_for_chunk_containing_pos(pos, l, m);
 }
 
 template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
+void kll_quantile_calculator<T, C, A>::merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items) {
   if (num_levels == 1) return;
-
-  // move the input in preparation for the "ping-pong" reduction strategy
-  auto tmp_items_deleter = [num_items](T* ptr) {
-    for (uint32_t i = 0; i < num_items; i++) ptr[i].~T();
-    A().deallocate(ptr, num_items);
-  };
-  std::unique_ptr<T, decltype(tmp_items_deleter)> tmp_items(A().allocate(num_items), tmp_items_deleter);
-  kll_helper::move_construct<T>(items, 0, num_items, tmp_items.get(), 0, false); // do not destroy since the items will be moved back
-  auto tmp_weights_deleter = [num_items](uint64_t* ptr) { AllocU64().deallocate(ptr, num_items); };
-  std::unique_ptr<uint64_t[], decltype(tmp_weights_deleter)> tmp_weights(AllocU64().allocate(num_items), tmp_weights_deleter); // don't need the extra one here
-  std::copy(weights, &weights[num_items], tmp_weights.get());
-  blocky_tandem_merge_sort_recursion(tmp_items.get(), tmp_weights.get(), items, weights, levels, 0, num_levels);
+  Container temporary;
+  temporary.reserve(num_items);
+  merge_sorted_blocks_direct(entries, temporary, levels, 0, num_levels);
 }
 
 template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels) {
+void kll_quantile_calculator<T, C, A>::merge_sorted_blocks_direct(Container& orig, Container& temp, const uint32_t* levels,
+    uint8_t starting_level, uint8_t num_levels) {
   if (num_levels == 1) return;
   const uint8_t num_levels_1 = num_levels / 2;
   const uint8_t num_levels_2 = num_levels - num_levels_1;
-  if (num_levels_1 < 1) throw std::logic_error("level above 0 expected");
-  if (num_levels_2 < num_levels_1) throw std::logic_error("wrong order of levels");
   const uint8_t starting_level_1 = starting_level;
   const uint8_t starting_level_2 = starting_level + num_levels_1;
-  // swap roles of src and dst
-  blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_1, num_levels_1);
-  blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_2, num_levels_2);
-  tandem_merge(items_src, weights_src, items_dst, weights_dst, levels, starting_level_1, num_levels_1, starting_level_2, num_levels_2);
+  const auto chunk_begin = temp.begin() + temp.size();
+  merge_sorted_blocks_reversed(orig, temp, levels, starting_level_1, num_levels_1);
+  merge_sorted_blocks_reversed(orig, temp, levels, starting_level_2, num_levels_2);
+  const uint32_t num_items_1 = levels[starting_level_1 + num_levels_1] - levels[starting_level_1];
+  std::merge(
+    std::make_move_iterator(chunk_begin), std::make_move_iterator(chunk_begin + num_items_1),
+    std::make_move_iterator(chunk_begin + num_items_1), std::make_move_iterator(temp.end()),
+    orig.begin() + levels[starting_level], compare_pair_by_first<C>()
+  );
+  temp.erase(chunk_begin, temp.end());
 }
 
 template <typename T, typename C, typename A>
-void kll_quantile_calculator<T, C, A>::tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2) {
-  const auto from_index_1 = levels[starting_level_1];
-  const auto to_index_1 = levels[starting_level_1 + num_levels_1]; // exclusive
-  const auto from_index_2 = levels[starting_level_2];
-  const auto to_index_2 = levels[starting_level_2 + num_levels_2]; // exclusive
-  auto i_src_1 = from_index_1;
-  auto i_src_2 = from_index_2;
-  auto i_dst = from_index_1;
-
-  while ((i_src_1 < to_index_1) && (i_src_2 < to_index_2)) {
-    if (C()(items_src[i_src_1], items_src[i_src_2])) {
-      items_dst[i_dst] = std::move(items_src[i_src_1]);
-      weights_dst[i_dst] = weights_src[i_src_1];
-      i_src_1++;
-    } else {
-      items_dst[i_dst] = std::move(items_src[i_src_2]);
-      weights_dst[i_dst] = weights_src[i_src_2];
-      i_src_2++;
-    }
-    i_dst++;
-  }
-  if (i_src_1 < to_index_1) {
-    std::move(&items_src[i_src_1], &items_src[to_index_1], &items_dst[i_dst]);
-    std::copy(&weights_src[i_src_1], &weights_src[to_index_1], &weights_dst[i_dst]);
-  } else if (i_src_2 < to_index_2) {
-    std::move(&items_src[i_src_2], &items_src[to_index_2], &items_dst[i_dst]);
-    std::copy(&weights_src[i_src_2], &weights_src[to_index_2], &weights_dst[i_dst]);
+void kll_quantile_calculator<T, C, A>::merge_sorted_blocks_reversed(Container& orig, Container& temp, const uint32_t* levels,
+    uint8_t starting_level, uint8_t num_levels) {
+  if (num_levels == 1) {
+    std::move(orig.begin() + levels[starting_level], orig.begin() + levels[starting_level + 1], std::back_inserter(temp));
+    return;
   }
+  const uint8_t num_levels_1 = num_levels / 2;
+  const uint8_t num_levels_2 = num_levels - num_levels_1;
+  const uint8_t starting_level_1 = starting_level;
+  const uint8_t starting_level_2 = starting_level + num_levels_1;
+  merge_sorted_blocks_direct(orig, temp, levels, starting_level_1, num_levels_1);
+  merge_sorted_blocks_direct(orig, temp, levels, starting_level_2, num_levels_2);
+  std::merge(
+    std::make_move_iterator(orig.begin() + levels[starting_level_1]),
+    std::make_move_iterator(orig.begin() + levels[starting_level_1 + num_levels_1]),
+    std::make_move_iterator(orig.begin() + levels[starting_level_2]),
+    std::make_move_iterator(orig.begin() + levels[starting_level_2 + num_levels_2]),
+    std::back_inserter(temp),
+    compare_pair_by_first<C>()
+  );
 }
 
 } /* namespace datasketches */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org