You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/11/17 02:37:06 UTC

[incubator-datasketches-cpp] branch req_sketch updated: serialization and merge

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch req_sketch
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git


The following commit(s) were added to refs/heads/req_sketch by this push:
     new 0446c16  serialization and merge
0446c16 is described below

commit 0446c164c1fdce34fdb76d4b9932cc8949372c11
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Mon Nov 16 18:27:56 2020 -0800

    serialization and merge
---
 common/include/memory_operations.hpp |  12 +++
 req/include/req_common.hpp           |   2 +-
 req/include/req_compactor.hpp        |  38 +++++--
 req/include/req_compactor_impl.hpp   | 133 +++++++++++++++++++-----
 req/include/req_sketch.hpp           |  35 ++++++-
 req/include/req_sketch_impl.hpp      | 195 +++++++++++++++++++++++++++++++++--
 req/test/req_sketch_test.cpp         |  82 +++++++++++++++
 7 files changed, 455 insertions(+), 42 deletions(-)

diff --git a/common/include/memory_operations.hpp b/common/include/memory_operations.hpp
index 80dc3a3..986b2b0 100644
--- a/common/include/memory_operations.hpp
+++ b/common/include/memory_operations.hpp
@@ -52,6 +52,18 @@ static inline size_t copy_to_mem(const void* src, void* dst, size_t size) {
   return size;
 }
 
+template<typename T>
+static inline size_t copy_to_mem(const T& item, void* dst) {
+  memcpy(dst, &item, sizeof(T));
+  return sizeof(T);
+}
+
+template<typename T>
+static inline size_t copy_from_mem(const void* src, T& item) {
+  memcpy(&item, src, sizeof(T));
+  return sizeof(T);
+}
+
 } // namespace
 
 #endif // _MEMORY_OPERATIONS_HPP_
diff --git a/req/include/req_common.hpp b/req/include/req_common.hpp
index 78ce505..715c9a3 100755
--- a/req/include/req_common.hpp
+++ b/req/include/req_common.hpp
@@ -30,7 +30,7 @@ static std::independent_bits_engine<std::mt19937, 1, unsigned> req_random_bit(st
 
 namespace req_constants {
   static const uint16_t MIN_K = 4;
-  static const uint32_t INIT_NUM_SECTIONS = 3;
+  static const uint8_t INIT_NUM_SECTIONS = 3;
 }
 
 } /* namespace datasketches */
diff --git a/req/include/req_compactor.hpp b/req/include/req_compactor.hpp
index f939a19..977fd46 100755
--- a/req/include/req_compactor.hpp
+++ b/req/include/req_compactor.hpp
@@ -44,38 +44,62 @@ public:
   template<typename FwdT>
   void append(FwdT&& item);
 
+  template<typename FwdC>
+  void merge(FwdC&& other);
+
   void sort();
   void merge_sort_in(std::vector<T, Allocator>&& items);
 
   std::vector<T, Allocator> compact();
 
+  /**
+   * Computes size needed to serialize the current state of the compactor.
+   * This version is for fixed-size arithmetic types (integral and floating point).
+   * @return size in bytes needed to serialize this compactor
+   */
+  template<typename S, typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
+  size_t get_serialized_size_bytes(const S& serde) const;
+
+  /**
+   * Computes size needed to serialize the current state of the compactor.
+   * This version is for all other types and can be expensive since every item needs to be looked at.
+   * @return size in bytes needed to serialize this compactor
+   */
+  template<typename S, typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
+  size_t get_serialized_size_bytes(const S& serde) const;
+
   template<typename S>
   void serialize(std::ostream& os, const S& serde) const;
 
   template<typename S>
+  size_t serialize(void* dst, size_t capacity, const S& serde) const;
+
+  template<typename S>
   static req_compactor deserialize(std::istream& is, const S& serde, const Allocator& allocator, bool sorted);
 
-  // for deserialization
-  req_compactor(uint8_t lg_weight, bool coin, bool sorted, double section_size_raw, uint32_t num_sections, uint32_t num_compactions, uint32_t state, std::vector<T, Allocator>&& items);
+  template<typename S>
+  static std::pair<req_compactor, size_t> deserialize(const void* bytes, size_t size, const S& serde, const Allocator& allocator, bool sorted);
 
 private:
   uint8_t lg_weight_;
   bool coin_; // random bit for compaction
   bool sorted_;
-  double section_size_raw_;
+  float section_size_raw_;
   uint32_t section_size_;
-  uint32_t num_sections_;
-  uint32_t num_compactions_;
-  uint32_t state_; // state of the deterministic compaction schedule
+  uint8_t num_sections_;
+  uint64_t state_; // state of the deterministic compaction schedule
   std::vector<T, Allocator> items_;
 
   bool ensure_enough_sections();
   size_t compute_compaction_range(uint32_t secs_to_compact) const;
 
-  static uint32_t nearest_even(double value);
+  static uint32_t nearest_even(float value);
 
   template<typename Iter>
   static std::vector<T, Allocator> get_evens_or_odds(Iter from, Iter to, bool flag);
+
+  // for deserialization
+  req_compactor(uint8_t lg_weight, bool sorted, float section_size_raw, uint8_t num_sections, uint64_t state, std::vector<T, Allocator>&& items);
 };
 
 } /* namespace datasketches */
diff --git a/req/include/req_compactor_impl.hpp b/req/include/req_compactor_impl.hpp
index dc1f87e..dce0a13 100755
--- a/req/include/req_compactor_impl.hpp
+++ b/req/include/req_compactor_impl.hpp
@@ -25,6 +25,7 @@
 #include <algorithm>
 
 #include "count_zeros.hpp"
+#include "conditional_forward.hpp"
 
 namespace datasketches {
 
@@ -36,7 +37,6 @@ sorted_(sorted),
 section_size_raw_(section_size),
 section_size_(section_size),
 num_sections_(req_constants::INIT_NUM_SECTIONS),
-num_compactions_(0),
 state_(0),
 items_(allocator)
 {
@@ -86,9 +86,24 @@ void req_compactor<T, H, C, A>::append(FwdT&& item) {
 }
 
 template<typename T, bool H, typename C, typename A>
+template<typename FwdC>
+void req_compactor<T, H, C, A>::merge(FwdC&& other) {
+  if (lg_weight_ != other.lg_weight_) throw std::logic_error("weight mismatch");
+  state_ |= other.state_;
+  while (ensure_enough_sections()) {}
+  sort();
+  std::vector<T, A> other_items(conditional_forward<FwdC>(other.items_));
+  if (!other.sorted_) std::sort(other_items.begin(), other_items.end(), C());
+  if (other_items.size() > items_.size()) std::swap(items_, other_items);
+  merge_sort_in(std::move(other_items));
+}
+
+template<typename T, bool H, typename C, typename A>
 void req_compactor<T, H, C, A>::sort() {
-  std::sort(items_.begin(), items_.end(), C());
-  sorted_ = true;
+  if (!sorted_) {
+    std::sort(items_.begin(), items_.end(), C());
+    sorted_ = true;
+  }
 }
 
 template<typename T, bool H, typename C, typename A>
@@ -103,19 +118,18 @@ void req_compactor<T, H, C, A>::merge_sort_in(std::vector<T, A>&& items) {
 template<typename T, bool H, typename C, typename A>
 std::vector<T, A> req_compactor<T, H, C, A>::compact() {
   // choose a part of the buffer to compact
-  const uint32_t secs_to_compact = std::min(static_cast<uint32_t>(count_trailing_zeros_in_u32(~state_) + 1), num_sections_);
+  const uint32_t secs_to_compact = std::min(static_cast<uint32_t>(count_trailing_zeros_in_u32(~state_) + 1), static_cast<uint32_t>(num_sections_));
   const size_t compaction_range = compute_compaction_range(secs_to_compact);
   const uint32_t compact_from = compaction_range & 0xFFFFFFFFLL; // low 32
   const uint32_t compact_to = compaction_range >> 32; // high 32
   if (compact_to - compact_from < 2) throw std::logic_error("compaction range error");
 
-  if ((num_compactions_ & 1) == 1) { coin_ = !coin_; } // for odd flip coin;
+  if ((state_ & 1) == 1) { coin_ = !coin_; } // for odd flip coin;
   else { coin_ = req_random_bit(); } // random coin flip
 
   auto promote = get_evens_or_odds(items_.begin() + compact_from, items_.begin() + compact_to, coin_);
   items_.erase(items_.begin() + compact_from, items_.begin() + compact_to);
 
-  ++num_compactions_;
   ++state_;
   ensure_enough_sections();
   return promote;
@@ -123,9 +137,9 @@ std::vector<T, A> req_compactor<T, H, C, A>::compact() {
 
 template<typename T, bool H, typename C, typename A>
 bool req_compactor<T, H, C, A>::ensure_enough_sections() {
-  const double ssr = section_size_raw_ / sqrt(2);
+  const float ssr = section_size_raw_ / sqrt(2);
   const uint32_t ne = nearest_even(ssr);
-  if (num_compactions_ >= 1 << (num_sections_ - 1) && ne >= req_constants::MIN_K) {
+  if (state_ >= 1 << (num_sections_ - 1) && ne >= req_constants::MIN_K) {
     section_size_raw_ = ssr;
     section_size_ = ne;
     num_sections_ <<= 1;
@@ -147,7 +161,7 @@ size_t req_compactor<T, H, C, A>::compute_compaction_range(uint32_t secs_to_comp
 }
 
 template<typename T, bool H, typename C, typename A>
-uint32_t req_compactor<T, H, C, A>::nearest_even(double value) {
+uint32_t req_compactor<T, H, C, A>::nearest_even(float value) {
   return static_cast<uint32_t>(round(value / 2)) << 1;
 }
 
@@ -180,31 +194,68 @@ static inline void write(std::ostream& os, T value) {
   os.write(reinterpret_cast<const char*>(&value), sizeof(T));
 }
 
+// implementation for fixed-size arithmetic types (integral and floating point)
+template<typename T, bool H, typename C, typename A>
+template<typename S, typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
+size_t req_compactor<T, H, C, A>::get_serialized_size_bytes(const S&) const {
+  return sizeof(state_) + sizeof(section_size_raw_) + sizeof(lg_weight_) + sizeof(num_sections_) +
+      sizeof(uint16_t) + // padding
+      sizeof(uint32_t) + // num_items
+      sizeof(TT) * items_.size();
+}
+
+// implementation for all other types
+template<typename T, bool H, typename C, typename A>
+template<typename S, typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
+size_t req_compactor<T, H, C, A>::get_serialized_size_bytes(const S& serde) const {
+  size_t size = sizeof(state_) + sizeof(section_size_raw_) + sizeof(lg_weight_) + sizeof(num_sections_) +
+      sizeof(uint16_t) + // padding
+      sizeof(uint32_t); // num_items
+      sizeof(TT) * items_.size();
+  for (const auto& item: items_) size += serde.size_of_item(item);
+  return size;
+}
+
 template<typename T, bool H, typename C, typename A>
 template<typename S>
 void req_compactor<T, H, C, A>::serialize(std::ostream& os, const S& serde) const {
   const uint32_t num_items = items_.size();
-  write(os, num_items);
-  write(os, section_size_raw_);
-  write(os, num_sections_);
-  write(os, num_compactions_);
   write(os, state_);
+  write(os, section_size_raw_);
   write(os, lg_weight_);
-  const uint8_t coin = coin_;
-  write(os, coin);
+  write(os, num_sections_);
+  const uint16_t padding = 0;
+  write(os, padding);
+  write(os, num_items);
   serde.serialize(os, items_.data(), items_.size());
 }
 
 template<typename T, bool H, typename C, typename A>
 template<typename S>
+size_t req_compactor<T, H, C, A>::serialize(void* dst, size_t capacity, const S& serde) const {
+  uint8_t* ptr = static_cast<uint8_t*>(dst);
+  const uint8_t* end_ptr = ptr + capacity;
+  ptr += copy_to_mem(state_, ptr);
+  ptr += copy_to_mem(section_size_raw_, ptr);
+  ptr += copy_to_mem(lg_weight_, ptr);
+  ptr += copy_to_mem(num_sections_, ptr);
+  const uint16_t padding = 0;
+  ptr += copy_to_mem(padding, ptr);
+  const uint32_t num_items = items_.size();
+  ptr += copy_to_mem(num_items, ptr);
+  ptr += serde.serialize(ptr, end_ptr - ptr, items_.data(), items_.size());
+  return ptr - static_cast<uint8_t*>(dst);
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename S>
 req_compactor<T, H, C, A> req_compactor<T, H, C, A>::deserialize(std::istream& is, const S& serde, const A& allocator, bool sorted) {
+  auto state = read<decltype(state_)>(is);
+  auto section_size_raw = read<decltype(section_size_raw_)>(is);
+  auto lg_weight = read<decltype(lg_weight_)>(is);
+  auto num_sections = read<decltype(num_sections_)>(is);
+  read<uint16_t>(is); // padding
   auto num_items = read<uint32_t>(is);
-  auto section_size_raw = read<double>(is);
-  auto num_sections = read<uint32_t>(is);
-  auto num_compactions = read<uint32_t>(is);
-  auto state = read<uint32_t>(is);
-  auto lg_weight = read<uint8_t>(is);
-  bool coin = read<uint8_t>(is);
   std::vector<T, A> items(allocator);
   A alloc(allocator);
   auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
@@ -215,18 +266,50 @@ req_compactor<T, H, C, A> req_compactor<T, H, C, A>::deserialize(std::istream& i
     (*item_buffer).~T();
   }
   if (!is.good()) throw std::runtime_error("error reading from std::istream");
-  return req_compactor(lg_weight, coin, sorted, section_size_raw, num_sections, num_compactions, state, std::move(items));
+  return req_compactor(lg_weight, sorted, section_size_raw, num_sections, state, std::move(items));
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename S>
+std::pair<req_compactor<T, H, C, A>, size_t> req_compactor<T, H, C, A>::deserialize(const void* bytes, size_t size, const S& serde, const A& allocator, bool sorted) {
+  ensure_minimum_memory(size, 8);
+  const char* ptr = static_cast<const char*>(bytes);
+  const char* end_ptr = static_cast<const char*>(bytes) + size;
+
+  uint64_t state;
+  ptr += copy_from_mem(ptr, state);
+  float section_size_raw;
+  ptr += copy_from_mem(ptr, section_size_raw);
+  uint8_t lg_weight;
+  ptr += copy_from_mem(ptr, lg_weight);
+  uint8_t num_sections;
+  ptr += copy_from_mem(ptr, num_sections);
+  ptr += 2; // padding
+  uint32_t num_items;
+  ptr += copy_from_mem(ptr, num_items);
+  std::vector<T, A> items(allocator);
+  A alloc(allocator);
+  auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
+  std::unique_ptr<T, decltype(item_buffer_deleter)> item_buffer(alloc.allocate(1), item_buffer_deleter);
+  for (uint32_t i = 0; i < num_items; ++i) {
+    ptr += serde.deserialize(ptr, end_ptr - ptr, item_buffer.get(), 1);
+    items.push_back(std::move(*item_buffer));
+    (*item_buffer).~T();
+  }
+  return std::pair<req_compactor, size_t>(
+      req_compactor(lg_weight, sorted, section_size_raw, num_sections, state, std::move(items)),
+      ptr - static_cast<const char*>(bytes)
+  );
 }
 
 template<typename T, bool H, typename C, typename A>
-req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, bool coin, bool sorted, double section_size_raw, uint32_t num_sections, uint32_t num_compactions, uint32_t state, std::vector<T, A>&& items):
+req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, bool sorted, float section_size_raw, uint8_t num_sections, uint64_t state, std::vector<T, A>&& items):
 lg_weight_(lg_weight),
-coin_(coin),
+coin_(req_random_bit()),
 sorted_(sorted),
 section_size_raw_(section_size_raw),
 section_size_(nearest_even(section_size_raw)),
 num_sections_(num_sections),
-num_compactions_(num_compactions),
 state_(state),
 items_(std::move(items))
 {}
diff --git a/req/include/req_sketch.hpp b/req/include/req_sketch.hpp
index 2c25958..2ca578a 100755
--- a/req/include/req_sketch.hpp
+++ b/req/include/req_sketch.hpp
@@ -72,6 +72,9 @@ public:
   template<typename FwdT>
   void update(FwdT&& item);
 
+  template<typename FwdSk>
+  void merge(FwdSk&& other);
+
   /**
    * Returns the min value of the stream.
    * For floating point types: if the sketch is empty this returns NaN.
@@ -106,11 +109,40 @@ public:
   const T& get_quantile(double rank) const;
 
   /**
+   * Computes size needed to serialize the current state of the sketch.
+   * This version is for fixed-size arithmetic types (integral and floating point).
+   * @return size in bytes needed to serialize this sketch
+   */
+  template<typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
+  size_t get_serialized_size_bytes() const;
+
+  /**
+   * Computes size needed to serialize the current state of the sketch.
+   * This version is for all other types and can be expensive since every item needs to be looked at.
+   * @return size in bytes needed to serialize this sketch
+   */
+  template<typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
+  size_t get_serialized_size_bytes() const;
+
+  /**
    * This method serializes the sketch into a given stream in a binary form
    * @param os output stream
    */
   void serialize(std::ostream& os) const;
 
+  // This is a convenience alias for users
+  // The type returned by the following serialize method
+  using vector_bytes = std::vector<uint8_t, typename std::allocator_traits<Allocator>::template rebind_alloc<uint8_t>>;
+
+  /**
+   * This method serializes the sketch as a vector of bytes.
+   * An optional header can be reserved in front of the sketch.
+   * It is a blank space of a given size.
+   * This header is used in Datasketches PostgreSQL extension.
+   * @param header_size_bytes space to reserve in front of the sketch
+   */
+  vector_bytes serialize(unsigned header_size_bytes = 0) const;
+
   /**
    * This method deserializes a sketch from a given stream.
    * @param is input stream
@@ -124,7 +156,7 @@ public:
    * @param size the size of the array
    * @return an instance of a sketch
    */
-  //static req_sketch deserialize(const void* bytes, size_t size);
+  static req_sketch deserialize(const void* bytes, size_t size, const Allocator& allocator = Allocator());
 
   /**
    * Prints a summary of the sketch.
@@ -145,6 +177,7 @@ private:
 
   static const uint8_t SERIAL_VERSION = 1;
   static const uint8_t FAMILY = 17;
+  static const size_t PREAMBLE_SIZE_BYTES = 8;
   enum flags { RESERVED1, RESERVED2, IS_EMPTY, IS_HIGH_RANK, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM };
 
   uint8_t get_num_levels() const;
diff --git a/req/include/req_sketch_impl.hpp b/req/include/req_sketch_impl.hpp
index 57fec03..bf0b701 100755
--- a/req/include/req_sketch_impl.hpp
+++ b/req/include/req_sketch_impl.hpp
@@ -148,6 +148,29 @@ void req_sketch<T, H, C, S, A>::update(FwdT&& item) {
 }
 
 template<typename T, bool H, typename C, typename S, typename A>
+template<typename FwdSk>
+void req_sketch<T, H, C, S, A>::merge(FwdSk&& other) {
+  if (other.is_empty()) return;
+  n_ += other.n_;
+  if (is_empty()) {
+    min_value_ = new (allocator_.allocate(1)) T(conditional_forward<FwdSk>(*other.min_value_));
+    max_value_ = new (allocator_.allocate(1)) T(conditional_forward<FwdSk>(*other.max_value_));
+  } else {
+    if (C()(*other.min_value_, *min_value_)) *min_value_ = conditional_forward<FwdSk>(*other.min_value_);
+    if (C()(*max_value_, *other.max_value_)) *max_value_ = conditional_forward<FwdSk>(*other.max_value_);
+  }
+  // grow until this has at least as many compactors as other
+  while (get_num_levels() < other.get_num_levels()) grow();
+  // merge the items in all height compactors
+  for (size_t i = 0; i < get_num_levels(); ++i) {
+    compactors_[i].merge(conditional_forward<FwdSk>(other.compactors_[i]));
+  }
+  update_max_nom_size();
+  update_num_retained();
+  if (num_retained_ >= max_nom_size_) compress();
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
 const T& req_sketch<T, H, C, S, A>::get_min_value() const {
   if (is_empty()) return get_invalid_value();
   return *min_value_;
@@ -179,7 +202,7 @@ const T& req_sketch<T, H, C, S, A>::get_quantile(double rank) const {
     throw std::invalid_argument("Rank cannot be less than zero or greater than 1.0");
   }
   if (!compactors_[0].is_sorted()) {
-    const_cast<req_compactor<T, H, C, A>&>(compactors_[0]).sort(); // allow this side effect
+    const_cast<Compactor&>(compactors_[0]).sort(); // allow this side effect
   }
   req_quantile_calculator<T, A> quantile_calculator(n_, allocator_);
   for (auto& compactor: compactors_) {
@@ -189,6 +212,42 @@ const T& req_sketch<T, H, C, S, A>::get_quantile(double rank) const {
   return quantile_calculator.get_quantile(rank);
 }
 
+// implementation for fixed-size arithmetic types (integral and floating point)
+template<typename T, bool H, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
+size_t req_sketch<T, H, C, S, A>::get_serialized_size_bytes() const {
+  size_t size = PREAMBLE_SIZE_BYTES;
+  if (is_empty()) return size;
+  if (is_estimation_mode()) {
+    size += sizeof(n_) + sizeof(TT) * 2; // min and max
+  }
+  if (n_ == 1) {
+    size += sizeof(TT);
+  } else {
+    for (const auto& compactor: compactors_) size += compactor.get_serialized_size_bytes(S());
+  }
+  return size;
+}
+
+// implementation for all other types
+template<typename T, bool H, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
+size_t req_sketch<T, H, C, S, A>::get_serialized_size_bytes() const {
+  size_t size = PREAMBLE_SIZE_BYTES;
+  if (is_empty()) return size;
+  if (is_estimation_mode()) {
+    size += sizeof(n_);
+    S().size_of_item(*min_value_);
+    S().size_of_item(*max_value_);
+  }
+  if (n_ == 1) {
+    size += sizeof(TT);
+  } else {
+    for (const auto& compactor: compactors_) size += compactor.get_serialized_size_bytes();
+  }
+  return size;
+}
+
 template<typename T, bool H, typename C, typename S, typename A>
 void req_sketch<T, H, C, S, A>::serialize(std::ostream& os) const {
   const uint8_t preamble_ints = is_estimation_mode() ? 4 : 2;
@@ -224,6 +283,47 @@ void req_sketch<T, H, C, S, A>::serialize(std::ostream& os) const {
 }
 
 template<typename T, bool H, typename C, typename S, typename A>
+auto req_sketch<T, H, C, S, A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
+  const size_t size = header_size_bytes + get_serialized_size_bytes();
+  vector_bytes bytes(size, 0, allocator_);
+  uint8_t* ptr = bytes.data() + header_size_bytes;
+  const uint8_t* end_ptr = ptr + size;
+
+  const uint8_t preamble_ints = is_estimation_mode() ? 4 : 2;
+  ptr += copy_to_mem(preamble_ints, ptr);
+  const uint8_t serial_version = SERIAL_VERSION;
+  ptr += copy_to_mem(serial_version, ptr);
+  const uint8_t family = FAMILY;
+  ptr += copy_to_mem(family, ptr);
+  const bool is_single_item = n_ == 1;
+  const uint8_t flags_byte(
+      (is_empty() ? 1 << flags::IS_EMPTY : 0)
+    | (H ? 1 << flags::IS_HIGH_RANK : 0)
+    | (compactors_[0].is_sorted() ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
+    | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
+  );
+  ptr += copy_to_mem(flags_byte, ptr);
+  ptr += copy_to_mem(k_, ptr);
+  const uint8_t num_levels = get_num_levels();
+  ptr += copy_to_mem(num_levels, ptr);
+  const uint8_t unused = 0;
+  ptr += copy_to_mem(unused, ptr);
+  if (!is_empty()) {
+    if (is_estimation_mode()) {
+      ptr += copy_to_mem(n_, ptr);
+      ptr += S().serialize(ptr, end_ptr - ptr, min_value_, 1);
+      ptr += S().serialize(ptr, end_ptr - ptr, max_value_, 1);
+    }
+    if (is_single_item) {
+      ptr += S().serialize(ptr, end_ptr - ptr, min_value_, 1);
+    } else {
+      for (const auto& compactor: compactors_) ptr += compactor.serialize(ptr, end_ptr - ptr, S());
+    }
+  }
+  return bytes;
+}
+
+template<typename T, bool H, typename C, typename S, typename A>
 req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& is, const A& allocator) {
   const auto preamble_ints = read<uint8_t>(is);
   const auto serial_version = read<uint8_t>(is);
@@ -268,10 +368,10 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& i
     new (max_value_buffer.get()) T(*min_value);
     // copy did not throw, repackage with destrtuctor
     max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
-    compactors.push_back(req_compactor<T, H, C, A>(1, k, allocator, min_value.get(), is_level_0_sorted));
+    compactors.push_back(Compactor(1, k, allocator, min_value.get(), is_level_0_sorted));
   } else {
     for (size_t i = 0; i < num_levels; ++i) {
-      compactors.push_back(req_compactor<T, H, C, A>::deserialize(is, S(), allocator, i == 0 ? is_level_0_sorted : true));
+      compactors.push_back(Compactor::deserialize(is, S(), allocator, i == 0 ? is_level_0_sorted : true));
     }
     if (num_levels == 1) {
       const auto& items = compactors[0].get_items();
@@ -297,10 +397,90 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& i
   return req_sketch(k, n, std::move(min_value), std::move(max_value), std::move(compactors));
 }
 
-//template<typename T, bool H, typename C, typename S, typename A>
-//req_sketch<T, H, C, S, A> deserialize(const void* bytes, size_t size) {
-//
-//}
+template<typename T, bool H, typename C, typename S, typename A>
+req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(const void* bytes, size_t size, const A& allocator) {
+  ensure_minimum_memory(size, 8);
+  const char* ptr = static_cast<const char*>(bytes);
+  const char* end_ptr = static_cast<const char*>(bytes) + size;
+
+  uint8_t preamble_ints;
+  ptr += copy_from_mem(ptr, preamble_ints);
+  uint8_t serial_version;
+  ptr += copy_from_mem(ptr, serial_version);
+  uint8_t family_id;
+  ptr += copy_from_mem(ptr, family_id);
+  uint8_t flags_byte;
+  ptr += copy_from_mem(ptr, flags_byte);
+  uint16_t k;
+  ptr += copy_from_mem(ptr, k);
+  uint8_t num_levels;
+  ptr += copy_from_mem(ptr, num_levels);
+  ++ptr; // unused byte
+
+  // TODO: checks
+  // ensure memory size
+
+  const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
+  if (is_empty) return req_sketch(k, allocator);
+
+  A alloc(allocator);
+  auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
+  std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(alloc.allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(alloc.allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, item_deleter> min_value(nullptr, item_deleter(allocator));
+  std::unique_ptr<T, item_deleter> max_value(nullptr, item_deleter(allocator));
+
+  const bool is_single_item = flags_byte & (1 << flags::IS_SINGLE_ITEM);
+  const bool is_level_0_sorted = flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED);
+  std::vector<Compactor, AllocCompactor> compactors(allocator);
+
+  uint64_t n = 1;
+  if (num_levels > 1) {
+    ptr += copy_from_mem(ptr, n);
+    ptr += S().deserialize(ptr, end_ptr - ptr, min_value_buffer.get(), 1);
+    // serde call did not throw, repackage with destrtuctor
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+    ptr += S().deserialize(ptr, end_ptr - ptr, max_value_buffer.get(), 1);
+    // serde call did not throw, repackage with destrtuctor
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+  }
+
+  if (is_single_item) {
+    ptr += S().deserialize(ptr, end_ptr - ptr, min_value_buffer.get(), 1);
+    // serde call did not throw, repackage with destrtuctor
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+    new (max_value_buffer.get()) T(*min_value);
+    // copy did not throw, repackage with destrtuctor
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+    compactors.push_back(Compactor(1, k, allocator, min_value.get(), is_level_0_sorted));
+  } else {
+    for (size_t i = 0; i < num_levels; ++i) {
+      auto pair = Compactor::deserialize(ptr, end_ptr - ptr, S(), allocator, i == 0 ? is_level_0_sorted : true);
+      compactors.push_back(std::move(pair.first));
+      ptr += pair.second;
+    }
+    if (num_levels == 1) {
+      const auto& items = compactors[0].get_items();
+      n = items.size();
+      auto min_it = items.begin();
+      auto max_it = items.begin();
+      auto it = items.begin();
+      while (it != items.end()) {
+        if (C()(*it, *min_it)) min_it = it;
+        if (C()(*max_it, *it)) max_it = it;
+        ++it;
+      }
+      new (min_value_buffer.get()) T(*min_it);
+      // copy did not throw, repackage with destrtuctor
+      min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+      new (max_value_buffer.get()) T(*max_it);
+      // copy did not throw, repackage with destrtuctor
+      max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+    }
+  }
+
+  return req_sketch(k, n, std::move(min_value), std::move(max_value), std::move(compactors));
+}
 
 template<typename T, bool H, typename C, typename S, typename A>
 void req_sketch<T, H, C, S, A>::grow() {
@@ -355,7 +535,6 @@ string<A> req_sketch<T, H, C, S, A>::to_string(bool print_levels, bool print_ite
   os << "   Levels         : " << compactors_.size() << std::endl;
   os << "   Retained items : " << num_retained_ << std::endl;
   os << "   Capacity items : " << max_nom_size_ << std::endl;
-//  os << "   Storage bytes  : " << get_serialized_size_bytes() << std::endl;
   if (!is_empty()) {
     os << "   Min value      : " << *min_value_ << std::endl;
     os << "   Max value      : " << *max_value_ << std::endl;
diff --git a/req/test/req_sketch_test.cpp b/req/test/req_sketch_test.cpp
index 4520855..ec1fb6f 100755
--- a/req/test/req_sketch_test.cpp
+++ b/req/test/req_sketch_test.cpp
@@ -149,6 +149,21 @@ TEST_CASE("req sketch: stream serialize-deserialize empty", "[req_sketch]") {
   REQUIRE(std::isnan(sketch2.get_max_value()));
 }
 
+TEST_CASE("req sketch: byte serialize-deserialize empty", "[req_sketch]") {
+  req_sketch<float, true> sketch(100);
+
+  auto bytes = sketch.serialize();
+  REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+  auto sketch2 = req_sketch<float, true>::deserialize(bytes.data(), bytes.size());
+  REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+  REQUIRE(sketch2.is_empty() == sketch.is_empty());
+  REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+  REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+  REQUIRE(sketch2.get_n() == sketch.get_n());
+  REQUIRE(std::isnan(sketch2.get_min_value()));
+  REQUIRE(std::isnan(sketch2.get_max_value()));
+}
+
 TEST_CASE("req sketch: stream serialize-deserialize single item", "[req_sketch]") {
   req_sketch<float, true> sketch(100);
   sketch.update(1);
@@ -165,6 +180,22 @@ TEST_CASE("req sketch: stream serialize-deserialize single item", "[req_sketch]"
   REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
 }
 
+TEST_CASE("req sketch: byte serialize-deserialize single item", "[req_sketch]") {
+  req_sketch<float, true> sketch(100);
+  sketch.update(1);
+
+  auto bytes = sketch.serialize();
+  REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+  auto sketch2 = req_sketch<float, true>::deserialize(bytes.data(), bytes.size());
+  REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+  REQUIRE(sketch2.is_empty() == sketch.is_empty());
+  REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+  REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+  REQUIRE(sketch2.get_n() == sketch.get_n());
+  REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+  REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
 TEST_CASE("req sketch: stream serialize-deserialize exact mode", "[req_sketch]") {
   req_sketch<float, true> sketch(100);
   const size_t n = 50;
@@ -183,6 +214,24 @@ TEST_CASE("req sketch: stream serialize-deserialize exact mode", "[req_sketch]")
   REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
 }
 
+TEST_CASE("req sketch: byte serialize-deserialize exact mode", "[req_sketch]") {
+  req_sketch<float, true> sketch(100);
+  const size_t n = 50;
+  for (size_t i = 0; i < n; ++i) sketch.update(i);
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+
+  auto bytes = sketch.serialize();
+  REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+  auto sketch2 = req_sketch<float, true>::deserialize(bytes.data(), bytes.size());
+  REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+  REQUIRE(sketch2.is_empty() == sketch.is_empty());
+  REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+  REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+  REQUIRE(sketch2.get_n() == sketch.get_n());
+  REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+  REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
 TEST_CASE("req sketch: stream serialize-deserialize estimation mode", "[req_sketch]") {
   req_sketch<float, true> sketch(100);
   const size_t n = 100000;
@@ -201,4 +250,37 @@ TEST_CASE("req sketch: stream serialize-deserialize estimation mode", "[req_sket
   REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
 }
 
+TEST_CASE("req sketch: byte serialize-deserialize estimation mode", "[req_sketch]") {
+  req_sketch<float, true> sketch(100);
+  const size_t n = 100000;
+  for (size_t i = 0; i < n; ++i) sketch.update(i);
+  REQUIRE(sketch.is_estimation_mode());
+
+  auto bytes = sketch.serialize();
+  REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+  auto sketch2 = req_sketch<float, true>::deserialize(bytes.data(), bytes.size());
+  REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+  REQUIRE(sketch2.is_empty() == sketch.is_empty());
+  REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+  REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+  REQUIRE(sketch2.get_n() == sketch.get_n());
+  REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+  REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+}
+
+TEST_CASE("req sketch: merge", "[req_sketch]") {
+  req_sketch<float, true> sketch1(100);
+  for (size_t i = 0; i < 1000; ++i) sketch1.update(i);
+
+  req_sketch<float, true> sketch2(100);
+  for (size_t i = 1000; i < 2000; ++i) sketch2.update(i);
+
+  sketch1.merge(sketch2);
+  //std::cout << sketch1.to_string(true, true);
+  REQUIRE(sketch1.get_min_value() == 0);
+  REQUIRE(sketch1.get_max_value() == 1999);
+  //REQUIRE(sketch1.get_quantile(0.5) == 1000);
+  REQUIRE(sketch1.get_rank(1000) == Approx(0.5).margin(0.01));
+}
+
 } /* namespace datasketches */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org