You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/11/18 05:18:14 UTC

[incubator-datasketches-cpp] branch req_sketch updated: serialize raw items

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch req_sketch
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git


The following commit(s) were added to refs/heads/req_sketch by this push:
     new b096a63  serialize raw items
b096a63 is described below

commit b096a63512c2cf70c4ae1261422a2c1ddd6088ab
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Tue Nov 17 21:18:02 2020 -0800

    serialize raw items
---
 req/include/req_compactor.hpp      |  14 ++++-
 req/include/req_compactor_impl.hpp |  54 +++++++++++++++----
 req/include/req_sketch_impl.hpp    | 108 +++++++++++++++++--------------------
 req/test/req_sketch_test.cpp       |  34 ++++++++++--
 4 files changed, 138 insertions(+), 72 deletions(-)

diff --git a/req/include/req_compactor.hpp b/req/include/req_compactor.hpp
index 977fd46..2c429fa 100755
--- a/req/include/req_compactor.hpp
+++ b/req/include/req_compactor.hpp
@@ -30,7 +30,7 @@ typename Allocator
 >
 class req_compactor {
 public:
-  req_compactor(uint8_t lg_weight, uint32_t section_size, const Allocator& allocator, const T* item_ptr = nullptr, bool sorted = true);
+  req_compactor(uint8_t lg_weight, uint32_t section_size, const Allocator& allocator, bool sorted = true);
 
   bool is_sorted() const;
   uint32_t get_num_items() const;
@@ -80,6 +80,18 @@ public:
   template<typename S>
   static std::pair<req_compactor, size_t> deserialize(const void* bytes, size_t size, const S& serde, const Allocator& allocator, bool sorted);
 
+  template<typename S>
+  static req_compactor deserialize(std::istream& is, const S& serde, const Allocator& allocator, bool sorted, uint16_t k, uint8_t num_items);
+
+  template<typename S>
+  static std::pair<req_compactor, size_t> deserialize(const void* bytes, size_t size, const S& serde, const Allocator& allocator, bool sorted, uint16_t k, uint8_t num_items);
+
+  template<typename S>
+  static std::vector<T, Allocator> deserialize_items(std::istream& is, const S& serde, const Allocator& allocator, size_t num);
+
+  template<typename S>
+  static std::pair<std::vector<T, Allocator>, size_t> deserialize_items(const void* bytes, size_t size, const S& serde, const Allocator& allocator, size_t num);
+
 private:
   uint8_t lg_weight_;
   bool coin_; // random bit for compaction
diff --git a/req/include/req_compactor_impl.hpp b/req/include/req_compactor_impl.hpp
index dce0a13..cc44ea6 100755
--- a/req/include/req_compactor_impl.hpp
+++ b/req/include/req_compactor_impl.hpp
@@ -30,7 +30,7 @@
 namespace datasketches {
 
 template<typename T, bool H, typename C, typename A>
-req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, uint32_t section_size, const A& allocator, const T* item_ptr, bool sorted):
+req_compactor<T, H, C, A>::req_compactor(uint8_t lg_weight, uint32_t section_size, const A& allocator, bool sorted):
 lg_weight_(lg_weight),
 coin_(false),
 sorted_(sorted),
@@ -39,9 +39,7 @@ section_size_(section_size),
 num_sections_(req_constants::INIT_NUM_SECTIONS),
 state_(0),
 items_(allocator)
-{
-  if (item_ptr != nullptr) items_.push_back(*item_ptr);
-}
+{}
 
 template<typename T, bool H, typename C, typename A>
 bool req_compactor<T, H, C, A>::is_sorted() const {
@@ -256,17 +254,32 @@ req_compactor<T, H, C, A> req_compactor<T, H, C, A>::deserialize(std::istream& i
   auto num_sections = read<decltype(num_sections_)>(is);
   read<uint16_t>(is); // padding
   auto num_items = read<uint32_t>(is);
+  auto items = deserialize_items(is, serde, allocator, num_items);
+  return req_compactor(lg_weight, sorted, section_size_raw, num_sections, state, std::move(items));
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename S>
+req_compactor<T, H, C, A> req_compactor<T, H, C, A>::deserialize(std::istream& is, const S& serde, const A& allocator, bool sorted, uint16_t k, uint8_t num_items) {
+  auto items = deserialize_items(is, serde, allocator, num_items);
+  return req_compactor(0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(items));
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename S>
+std::vector<T, A> req_compactor<T, H, C, A>::deserialize_items(std::istream& is, const S& serde, const A& allocator, size_t num) {
   std::vector<T, A> items(allocator);
+  items.reserve(num);
   A alloc(allocator);
   auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
   std::unique_ptr<T, decltype(item_buffer_deleter)> item_buffer(alloc.allocate(1), item_buffer_deleter);
-  for (uint32_t i = 0; i < num_items; ++i) {
+  for (uint32_t i = 0; i < num; ++i) {
     serde.deserialize(is, item_buffer.get(), 1);
     items.push_back(std::move(*item_buffer));
     (*item_buffer).~T();
   }
   if (!is.good()) throw std::runtime_error("error reading from std::istream");
-  return req_compactor(lg_weight, sorted, section_size_raw, num_sections, state, std::move(items));
+  return items;
 }
 
 template<typename T, bool H, typename C, typename A>
@@ -287,17 +300,40 @@ std::pair<req_compactor<T, H, C, A>, size_t> req_compactor<T, H, C, A>::deserial
   ptr += 2; // padding
   uint32_t num_items;
   ptr += copy_from_mem(ptr, num_items);
+  auto pair = deserialize_items(ptr, end_ptr - ptr, serde, allocator, num_items);
+  ptr += pair.second;
+  return std::pair<req_compactor, size_t>(
+      req_compactor(lg_weight, sorted, section_size_raw, num_sections, state, std::move(pair.first)),
+      ptr - static_cast<const char*>(bytes)
+  );
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename S>
+std::pair<req_compactor<T, H, C, A>, size_t> req_compactor<T, H, C, A>::deserialize(const void* bytes, size_t size, const S& serde, const A& allocator, bool sorted, uint16_t k, uint8_t num_items) {
+  auto pair = deserialize_items(bytes, size, serde, allocator, num_items);
+  return std::pair<req_compactor, size_t>(
+      req_compactor(0, sorted, k, req_constants::INIT_NUM_SECTIONS, 0, std::move(pair.first)),
+      pair.second
+  );
+}
+
+template<typename T, bool H, typename C, typename A>
+template<typename S>
+std::pair<std::vector<T, A>, size_t> req_compactor<T, H, C, A>::deserialize_items(const void* bytes, size_t size, const S& serde, const A& allocator, size_t num) {
+  const char* ptr = static_cast<const char*>(bytes);
+  const char* end_ptr = static_cast<const char*>(bytes) + size;
   std::vector<T, A> items(allocator);
   A alloc(allocator);
   auto item_buffer_deleter = [&alloc](T* ptr) { alloc.deallocate(ptr, 1); };
   std::unique_ptr<T, decltype(item_buffer_deleter)> item_buffer(alloc.allocate(1), item_buffer_deleter);
-  for (uint32_t i = 0; i < num_items; ++i) {
+  for (uint32_t i = 0; i < num; ++i) {
     ptr += serde.deserialize(ptr, end_ptr - ptr, item_buffer.get(), 1);
     items.push_back(std::move(*item_buffer));
     (*item_buffer).~T();
   }
-  return std::pair<req_compactor, size_t>(
-      req_compactor(lg_weight, sorted, section_size_raw, num_sections, state, std::move(items)),
+  return std::pair<std::vector<T, A>, size_t>(
+      std::move(items),
       ptr - static_cast<const char*>(bytes)
   );
 }
diff --git a/req/include/req_sketch_impl.hpp b/req/include/req_sketch_impl.hpp
index 542a989..96e10c2 100755
--- a/req/include/req_sketch_impl.hpp
+++ b/req/include/req_sketch_impl.hpp
@@ -265,10 +265,10 @@ void req_sketch<T, H, C, S, A>::serialize(std::ostream& os) const {
   );
   write(os, flags_byte);
   write(os, k_);
-  const uint8_t num_levels = get_num_levels();
+  const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
   write(os, num_levels);
-  const uint8_t unused = 0;
-  write(os, unused);
+  const uint8_t num_raw_items = raw_items ? n_ : 0;
+  write(os, num_raw_items);
   if (is_empty()) return;
   if (is_estimation_mode()) {
     write(os, n_);
@@ -276,7 +276,7 @@ void req_sketch<T, H, C, S, A>::serialize(std::ostream& os) const {
     S().serialize(os, max_value_, 1);
   }
   if (raw_items) {
-    S().serialize(os, min_value_, 1);
+    S().serialize(os, compactors_[0].get_items().data(), num_raw_items);
   } else {
     for (const auto& compactor: compactors_) compactor.serialize(os, S());
   }
@@ -304,10 +304,10 @@ auto req_sketch<T, H, C, S, A>::serialize(unsigned header_size_bytes) const -> v
   );
   ptr += copy_to_mem(flags_byte, ptr);
   ptr += copy_to_mem(k_, ptr);
-  const uint8_t num_levels = get_num_levels();
+  const uint8_t num_levels = is_empty() ? 0 : get_num_levels();
   ptr += copy_to_mem(num_levels, ptr);
-  const uint8_t unused = 0;
-  ptr += copy_to_mem(unused, ptr);
+  const uint8_t num_raw_items = raw_items ? n_ : 0;
+  ptr += copy_to_mem(num_raw_items, ptr);
   if (!is_empty()) {
     if (is_estimation_mode()) {
       ptr += copy_to_mem(n_, ptr);
@@ -315,7 +315,7 @@ auto req_sketch<T, H, C, S, A>::serialize(unsigned header_size_bytes) const -> v
       ptr += S().serialize(ptr, end_ptr - ptr, max_value_, 1);
     }
     if (raw_items) {
-      ptr += S().serialize(ptr, end_ptr - ptr, min_value_, 1);
+      ptr += S().serialize(ptr, end_ptr - ptr, compactors_[0].get_items().data(), num_raw_items);
     } else {
       for (const auto& compactor: compactors_) ptr += compactor.serialize(ptr, end_ptr - ptr, S());
     }
@@ -331,9 +331,8 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& i
   const auto flags_byte = read<uint8_t>(is);
   const auto k = read<uint16_t>(is);
   const auto num_levels = read<uint8_t>(is);
-  read<uint8_t>(is); // unused byte
+  const auto num_raw_items = read<uint8_t>(is);
 
-  std::cout << "flags=" << std::hex << ((int)flags_byte) << "\n";
   // TODO: checks
 
   if (!is.good()) throw std::runtime_error("error reading from std::istream");
@@ -363,35 +362,29 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(std::istream& i
   }
 
   if (raw_items) {
-    S().deserialize(is, min_value_buffer.get(), 1);
-    // serde call did not throw, repackage with destrtuctor
-    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
-    new (max_value_buffer.get()) T(*min_value);
-    // copy did not throw, repackage with destrtuctor
-    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
-    compactors.push_back(Compactor(1, k, allocator, min_value.get(), is_level_0_sorted));
+    compactors.push_back(Compactor::deserialize(is, S(), allocator, is_level_0_sorted, k, num_raw_items));
   } else {
     for (size_t i = 0; i < num_levels; ++i) {
       compactors.push_back(Compactor::deserialize(is, S(), allocator, i == 0 ? is_level_0_sorted : true));
     }
-    if (num_levels == 1) {
-      const auto& items = compactors[0].get_items();
-      n = items.size();
-      auto min_it = items.begin();
-      auto max_it = items.begin();
-      auto it = items.begin();
-      while (it != items.end()) {
-        if (C()(*it, *min_it)) min_it = it;
-        if (C()(*max_it, *it)) max_it = it;
-        ++it;
-      }
-      new (min_value_buffer.get()) T(*min_it);
-      // copy did not throw, repackage with destrtuctor
-      min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
-      new (max_value_buffer.get()) T(*max_it);
-      // copy did not throw, repackage with destrtuctor
-      max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+  }
+  if (num_levels == 1) {
+    const auto& items = compactors[0].get_items();
+    n = items.size();
+    auto min_it = items.begin();
+    auto max_it = items.begin();
+    auto it = items.begin();
+    while (it != items.end()) {
+      if (C()(*it, *min_it)) min_it = it;
+      if (C()(*max_it, *it)) max_it = it;
+      ++it;
     }
+    new (min_value_buffer.get()) T(*min_it);
+    // copy did not throw, repackage with destrtuctor
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+    new (max_value_buffer.get()) T(*max_it);
+    // copy did not throw, repackage with destrtuctor
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
   }
 
   if (!is.good()) throw std::runtime_error("error reading from std::istream");
@@ -416,7 +409,8 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(const void* byt
   ptr += copy_from_mem(ptr, k);
   uint8_t num_levels;
   ptr += copy_from_mem(ptr, num_levels);
-  ++ptr; // unused byte
+  uint8_t num_raw_items;
+  ptr += copy_from_mem(ptr, num_raw_items);
 
   // TODO: checks
   // ensure memory size
@@ -447,37 +441,33 @@ req_sketch<T, H, C, S, A> req_sketch<T, H, C, S, A>::deserialize(const void* byt
   }
 
   if (raw_items) {
-    ptr += S().deserialize(ptr, end_ptr - ptr, min_value_buffer.get(), 1);
-    // serde call did not throw, repackage with destrtuctor
-    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
-    new (max_value_buffer.get()) T(*min_value);
-    // copy did not throw, repackage with destrtuctor
-    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
-    compactors.push_back(Compactor(1, k, allocator, min_value.get(), is_level_0_sorted));
+    auto pair = Compactor::deserialize(ptr, end_ptr - ptr, S(), allocator, is_level_0_sorted, k, num_raw_items);
+    compactors.push_back(std::move(pair.first));
+    ptr += pair.second;
   } else {
     for (size_t i = 0; i < num_levels; ++i) {
       auto pair = Compactor::deserialize(ptr, end_ptr - ptr, S(), allocator, i == 0 ? is_level_0_sorted : true);
       compactors.push_back(std::move(pair.first));
       ptr += pair.second;
     }
-    if (num_levels == 1) {
-      const auto& items = compactors[0].get_items();
-      n = items.size();
-      auto min_it = items.begin();
-      auto max_it = items.begin();
-      auto it = items.begin();
-      while (it != items.end()) {
-        if (C()(*it, *min_it)) min_it = it;
-        if (C()(*max_it, *it)) max_it = it;
-        ++it;
-      }
-      new (min_value_buffer.get()) T(*min_it);
-      // copy did not throw, repackage with destrtuctor
-      min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
-      new (max_value_buffer.get()) T(*max_it);
-      // copy did not throw, repackage with destrtuctor
-      max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
+  }
+  if (num_levels == 1) {
+    const auto& items = compactors[0].get_items();
+    n = items.size();
+    auto min_it = items.begin();
+    auto max_it = items.begin();
+    auto it = items.begin();
+    while (it != items.end()) {
+      if (C()(*it, *min_it)) min_it = it;
+      if (C()(*max_it, *it)) max_it = it;
+      ++it;
     }
+    new (min_value_buffer.get()) T(*min_it);
+    // copy did not throw, repackage with destrtuctor
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter(allocator));
+    new (max_value_buffer.get()) T(*max_it);
+    // copy did not throw, repackage with destrtuctor
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter(allocator));
   }
 
   return req_sketch(k, n, std::move(min_value), std::move(max_value), std::move(compactors));
diff --git a/req/test/req_sketch_test.cpp b/req/test/req_sketch_test.cpp
index 2953a8a..e0e6c9a 100755
--- a/req/test/req_sketch_test.cpp
+++ b/req/test/req_sketch_test.cpp
@@ -301,7 +301,6 @@ TEST_CASE("req sketch: stream deserialize from Java - empty", "[req_sketch]") {
   is.exceptions(std::ios::failbit | std::ios::badbit);
   is.open(input_path + "req_float_empty_from_java.sk", std::ios::binary);
   auto sketch = req_sketch<float, true>::deserialize(is);
-  std::cout << sketch.to_string();
   REQUIRE(sketch.is_empty());
   REQUIRE_FALSE(sketch.is_estimation_mode());
   REQUIRE(sketch.get_n() == 0);
@@ -315,13 +314,42 @@ TEST_CASE("req sketch: stream deserialize from Java - single item", "[req_sketch
   is.exceptions(std::ios::failbit | std::ios::badbit);
   is.open(input_path + "req_float_single_item_from_java.sk", std::ios::binary);
   auto sketch = req_sketch<float, true>::deserialize(is);
-  std::cout << sketch.to_string();
   REQUIRE_FALSE(sketch.is_empty());
   REQUIRE_FALSE(sketch.is_estimation_mode());
   REQUIRE(sketch.get_n() == 1);
   REQUIRE(sketch.get_num_retained() == 1);
   REQUIRE(sketch.get_min_value() == 1);
   REQUIRE(sketch.get_max_value() == 1);
+  REQUIRE(sketch.get_rank(1) == 0);
+  REQUIRE(sketch.get_rank<true>(1) == 1);
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - raw items", "[req_sketch]") {
+  std::ifstream is;
+  is.exceptions(std::ios::failbit | std::ios::badbit);
+  is.open(input_path + "req_float_raw_items_from_java.sk", std::ios::binary);
+  auto sketch = req_sketch<float, true>::deserialize(is);
+  REQUIRE_FALSE(sketch.is_empty());
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 4);
+  REQUIRE(sketch.get_num_retained() == 4);
+  REQUIRE(sketch.get_min_value() == 0);
+  REQUIRE(sketch.get_max_value() == 3);
+  REQUIRE(sketch.get_rank(2) == 0.5);
+}
+
+TEST_CASE("req sketch: stream deserialize from Java - exact mode", "[req_sketch]") {
+  std::ifstream is;
+  is.exceptions(std::ios::failbit | std::ios::badbit);
+  is.open(input_path + "req_float_exact_from_java.sk", std::ios::binary);
+  auto sketch = req_sketch<float, true>::deserialize(is);
+  REQUIRE_FALSE(sketch.is_empty());
+  REQUIRE_FALSE(sketch.is_estimation_mode());
+  REQUIRE(sketch.get_n() == 100);
+  REQUIRE(sketch.get_num_retained() == 100);
+  REQUIRE(sketch.get_min_value() == 0);
+  REQUIRE(sketch.get_max_value() == 99);
+  REQUIRE(sketch.get_rank(50) == 0.5);
 }
 
 TEST_CASE("req sketch: stream deserialize from Java - estimation mode", "[req_sketch]") {
@@ -329,13 +357,13 @@ TEST_CASE("req sketch: stream deserialize from Java - estimation mode", "[req_sk
   is.exceptions(std::ios::failbit | std::ios::badbit);
   is.open(input_path + "req_float_estimation_from_java.sk", std::ios::binary);
   auto sketch = req_sketch<float, true>::deserialize(is);
-  std::cout << sketch.to_string();
   REQUIRE_FALSE(sketch.is_empty());
   REQUIRE(sketch.is_estimation_mode());
   REQUIRE(sketch.get_n() == 10000);
   REQUIRE(sketch.get_num_retained() == 2942);
   REQUIRE(sketch.get_min_value() == 0);
   REQUIRE(sketch.get_max_value() == 9999);
+  REQUIRE(sketch.get_rank(5000) == 0.5);
 }
 
 TEST_CASE("req sketch: merge", "[req_sketch]") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org