You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/11/10 03:23:50 UTC

[incubator-datasketches-cpp] branch req_sketch updated: serialized layout change

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch req_sketch
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git


The following commit(s) were added to refs/heads/req_sketch by this push:
     new 78ebd88  serialized layout change
78ebd88 is described below

commit 78ebd88135c7fc97bb1b394173d260f4e1d23712
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Mon Nov 9 18:58:41 2020 -0800

    serialized layout change
---
 req/include/req_common.hpp         |  2 +-
 req/include/req_compactor.hpp      |  2 +-
 req/include/req_compactor_impl.hpp |  8 ++--
 req/include/req_sketch.hpp         |  6 +--
 req/include/req_sketch_impl.hpp    | 97 ++++++++++++++++++++++----------------
 5 files changed, 67 insertions(+), 48 deletions(-)

diff --git a/req/include/req_common.hpp b/req/include/req_common.hpp
index 1797a36..78ce505 100755
--- a/req/include/req_common.hpp
+++ b/req/include/req_common.hpp
@@ -29,7 +29,7 @@ namespace datasketches {
 static std::independent_bits_engine<std::mt19937, 1, unsigned> req_random_bit(std::chrono::system_clock::now().time_since_epoch().count());
 
 namespace req_constants {
-  static const uint32_t MIN_K = 4;
+  static const uint16_t MIN_K = 4;
   static const uint32_t INIT_NUM_SECTIONS = 3;
 }
 
diff --git a/req/include/req_compactor.hpp b/req/include/req_compactor.hpp
index 6f97813..f939a19 100755
--- a/req/include/req_compactor.hpp
+++ b/req/include/req_compactor.hpp
@@ -30,7 +30,7 @@ typename Allocator
 >
 class req_compactor {
 public:
-  req_compactor(uint8_t lg_weight, uint32_t section_size, const Allocator& allocator);
+  req_compactor(uint8_t lg_weight, uint32_t section_size, const Allocator& allocator, const T* item_ptr = nullptr, bool sorted = true);
 
   bool is_sorted() const;
   uint32_t get_num_items() const;
diff --git a/req/include/req_compactor_impl.hpp b/req/include/req_compactor_impl.hpp
index d803832..dc1f87e 100755
--- a/req/include/req_compactor_impl.hpp
+++ b/req/include/req_compactor_impl.hpp
@@ -29,17 +29,19 @@
 namespace datasketches {
 
 template<typename T, bool H, typename C, typename A>
-req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, uint32_t section_size, const A& allocator):
+req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, uint32_t section_size, const A& allocator, const T* item_ptr, bool sorted):
 lg_weight_(lg_weight),
 coin_(false),
-sorted_(true),
+sorted_(sorted),
 section_size_raw_(section_size),
 section_size_(section_size),
 num_sections_(req_constants::INIT_NUM_SECTIONS),
 num_compactions_(0),
 state_(0),
 items_(allocator)
-{}
+{
+  if (item_ptr != nullptr) items_.push_back(*item_ptr);
+}
 
 template<typename T, bool H, typename C, typename A>
 bool req_compactor<T, H, C, A>::is_sorted() const {
diff --git a/req/include/req_sketch.hpp b/req/include/req_sketch.hpp
index 6c8912c..117182a 100755
--- a/req/include/req_sketch.hpp
+++ b/req/include/req_sketch.hpp
@@ -38,7 +38,7 @@ public:
   using Compactor = req_compactor<T, IsHighRank, Comparator, Allocator>;
   using AllocCompactor = typename std::allocator_traits<Allocator>::template rebind_alloc<Compactor>;
 
-  explicit req_sketch(uint32_t k, const Allocator& allocator = Allocator());
+  explicit req_sketch(uint16_t k, const Allocator& allocator = Allocator());
   ~req_sketch();
   // TODO: copy, move, assign
 
@@ -132,7 +132,7 @@ public:
 
 private:
   Allocator allocator_;
-  uint32_t k_;
+  uint16_t k_;
   uint32_t max_nom_size_;
   uint32_t num_retained_;
   uint64_t n_;
@@ -142,7 +142,7 @@ private:
 
   static const uint8_t SERIAL_VERSION = 1;
   static const uint8_t FAMILY = 17;
-  enum flags { IS_EMPTY, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM, IS_HIGH_RANK, IS_ESTIMATION_MODE };
+  enum flags { RESERVED1, RESERVED2, IS_EMPTY, IS_HIGH_RANK, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM };
 
   uint8_t get_num_levels() const;
   void grow();
diff --git a/req/include/req_sketch_impl.hpp b/req/include/req_sketch_impl.hpp
index 09e19f2..bf2ce7b 100755
--- a/req/include/req_sketch_impl.hpp
+++ b/req/include/req_sketch_impl.hpp
@@ -25,9 +25,9 @@
 namespace datasketches {
 
 template<typename T, bool H, typename C, typename S, typename A>
-req_sketch<T, H, C, S, A>::req_sketch(uint32_t k, const A& allocator):
+req_sketch<T, H, C, S, A>::req_sketch(uint16_t k, const A& allocator):
 allocator_(allocator),
-k_(std::max(k & -2, req_constants::MIN_K)), //rounds down one if odd
+k_(std::max(static_cast<int>(k) & -2, static_cast<int>(req_constants::MIN_K))), //rounds down one if odd
 max_nom_size_(0),
 num_retained_(0),
 n_(0),
@@ -136,42 +136,46 @@ const T& req_sketch<T, H, C, S, A>::get_quantile(double rank) const {
 template<typename T, bool H, typename C, typename S, typename A>
 void req_sketch<T, H, C, S, A>::serialize(std::ostream& os) const {
   const uint8_t preamble_longs = 1;
-  os.write(reinterpret_cast<const char*>(&preamble_longs), sizeof(preamble_longs));
+  write(os, preamble_longs);
   const uint8_t serial_version = SERIAL_VERSION;
-  os.write(reinterpret_cast<const char*>(&serial_version), sizeof(serial_version));
+  write(os, serial_version);
   const uint8_t family = FAMILY;
-  os.write(reinterpret_cast<const char*>(&family), sizeof(family));
+  write(os, family);
   const bool is_single_item = n_ == 1;
   const uint8_t flags_byte(
       (is_empty() ? 1 << flags::IS_EMPTY : 0)
+      | (H ? 1 << flags::IS_HIGH_RANK : 0)
     | (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
     | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
-    | (H ? 1 << flags::IS_HIGH_RANK : 0)
-    | (is_estimation_mode() ? 1 << flags::IS_ESTIMATION_MODE : 0)
   );
-  os.write(reinterpret_cast<const char*>(&flags_byte), sizeof(flags_byte));
-  os.write(reinterpret_cast<const char*>(&k_), sizeof(k_));
+  write(os, flags_byte);
+  write(os, k_);
+  const uint8_t num_levels = get_num_levels();
+  write(os, num_levels);
+  const uint8_t unused = 0;
+  write(os, unused);
   if (is_empty()) return;
-  if (!is_single_item) {
-    if (is_estimation_mode()) {
-      os.write(reinterpret_cast<const char*>(&n_), sizeof(n_));
-      const uint8_t num_levels = compactors_.size();
-      os.write(reinterpret_cast<const char*>(&num_levels), sizeof(num_levels));
-    }
-    // do we want some padding here?
+  if (is_estimation_mode()) {
+    write(os, n_);
     S().serialize(os, min_value_, 1);
     S().serialize(os, max_value_, 1);
   }
-  for (const auto& compactor: compactors_) compactor.serialize(os, S());
+  if (is_single_item) {
+    S().serialize(os, min_value_, 1);
+  } else {
+    for (const auto& compactor: compactors_) compactor.serialize(os, S());
+  }
 }
 
 template<typename T, bool H, typename C, typename S, typename A>
 req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& is, const A& allocator) {
-  auto preamble_longs = read<uint8_t>(is);
-  auto serial_version = read<uint8_t>(is);
-  auto family_id = read<uint8_t>(is);
-  auto flags_byte = read<uint8_t>(is);
-  auto k = read<uint32_t>(is);
+  const auto preamble_longs = read<uint8_t>(is);
+  const auto serial_version = read<uint8_t>(is);
+  const auto family_id = read<uint8_t>(is);
+  const auto flags_byte = read<uint8_t>(is);
+  const auto k = read<uint16_t>(is);
+  const auto num_levels = read<uint8_t>(is);
+  const auto unused = read<uint8_t>(is);
 
   // TODO: checks
 
@@ -180,15 +184,7 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& i
   if (is_empty) return req_sketch(k, allocator);
 
   uint64_t n = 1;
-  uint8_t num_levels = 1;
-  const bool is_single_item = flags_byte & (1 << flags::IS_SINGLE_ITEM);
-  const bool is_estimation_mode = flags_byte & (1 << flags::IS_ESTIMATION_MODE);
-  if (!is_single_item) {
-    if (is_estimation_mode) {
-      n = read<uint64_t>(is);
-      num_levels = read<uint8_t>(is);
-    }
-  }
+  if (num_levels > 1) n = read<uint64_t>(is);
 
   A alloc(allocator);
   auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
@@ -197,7 +193,8 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& i
   std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
   std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
 
-  if (!is_single_item) {
+  const bool is_single_item = flags_byte & (1 << flags::IS_SINGLE_ITEM);
+  if (num_levels > 1) {
     S().deserialize(is, min_value_buffer.get(), 1);
     // serde call did not throw, repackage with destrtuctor
     min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
@@ -209,18 +206,38 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& i
   const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
   std::vector<Compactor, AllocCompactor> compactors(allocator);
   std::unique_ptr<T, decltype(item_buffer_deleter)> item_buffer(alloc.allocate(1), item_buffer_deleter);
-  for (size_t i = 0; i < num_levels; ++i) {
-    auto compactor = req_compactor<T, H, C, A>::deserialize(is, S(), allocator, i == 0 ? is_level_0_sorted : true);
-    compactors.push_back(std::move(compactor));
-  }
-  if (!is_estimation_mode) n = compactors[0].get_num_items();
   if (is_single_item) {
-    new (min_value_buffer.get()) T(compactors[0].get_items()[0]);
-    // copy did not throw, repackage with destrtuctor
+    S().deserialize(is, min_value_buffer.get(), 1);
+    // serde call did not throw, repackage with destrtuctor
     min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
-    new (max_value_buffer.get()) T(compactors[0].get_items()[0]);
+    new (max_value_buffer.get()) T(*min_value);
     // copy did not throw, repackage with destrtuctor
     max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+    compactors.push_back(req_compactor<T, H, C, A>(1, k, allocator, min_value.get(), is_level_0_sorted));
+  } else {
+    if (num_levels == 1) {
+      const auto& items = compactors[0].get_items();
+      n = items.size();
+      auto min_it = items.begin();
+      auto max_it = items.begin();
+      auto it = items.begin();
+      while (it != items.end()) {
+        if (C()(*it, *min_it)) min_it = it;
+        if (C()(*max_it, *it)) max_it = it;
+        ++it;
+      }
+      new (min_value_buffer.get()) T(*min_it);
+      // copy did not throw, repackage with destrtuctor
+      min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+      new (max_value_buffer.get()) T(*max_it);
+      // copy did not throw, repackage with destrtuctor
+      max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+    } else {
+      for (size_t i = 0; i < num_levels; ++i) {
+        auto compactor = req_compactor<T, H, C, A>::deserialize(is, S(), allocator, i == 0 ? is_level_0_sorted : true);
+        compactors.push_back(std::move(compactor));
+      }
+    }
   }
 
   if (!is.good()) throw std::runtime_error("error reading from std::istream");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org