You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/07/15 01:14:49 UTC

[incubator-datasketches-cpp] branch tuple_sketch updated: stream serialization

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch tuple_sketch
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git


The following commit(s) were added to refs/heads/tuple_sketch by this push:
     new 66d1860  stream serialization
66d1860 is described below

commit 66d1860643b800e944ae930fff24bf842f4e30a3
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Tue Jul 14 18:14:38 2020 -0700

    stream serialization
---
 tuple/include/tuple_sketch.hpp      |   6 +--
 tuple/include/tuple_sketch_impl.hpp | 101 ++++++++++++++++++++++++++++++++----
 tuple/test/tuple_sketch_test.cpp    |  41 +++++++++++----
 3 files changed, 124 insertions(+), 24 deletions(-)

diff --git a/tuple/include/tuple_sketch.hpp b/tuple/include/tuple_sketch.hpp
index 977bf10..90d2046 100644
--- a/tuple/include/tuple_sketch.hpp
+++ b/tuple/include/tuple_sketch.hpp
@@ -134,8 +134,6 @@ public:
   virtual const_iterator end() const = 0;
 
 protected:
-  enum flags { IS_BIG_ENDIAN, IS_READ_ONLY, IS_EMPTY, IS_COMPACT, IS_ORDERED };
-
   virtual void print_specifics(std::ostringstream& os) const = 0;
 
   static uint16_t get_seed_hash(uint64_t seed);
@@ -175,8 +173,6 @@ public:
   using tuple_map = theta_update_sketch_base<Entry, ExtractKey, AllocEntry>;
   using resize_factor = typename tuple_map::resize_factor;
 
-  static const uint8_t SKETCH_TYPE = 2;
-
   // No constructor here. Use builder instead.
   class builder;
 
@@ -398,6 +394,8 @@ public:
   compact_tuple_sketch(bool is_empty, bool is_ordered, uint16_t seed_hash, uint64_t theta, std::vector<Entry, AllocEntry>&& entries);
 
 private:
+  enum flags { IS_BIG_ENDIAN, IS_READ_ONLY, IS_EMPTY, IS_COMPACT, IS_ORDERED };
+
   bool is_empty_;
   bool is_ordered_;
   uint16_t seed_hash_;
diff --git a/tuple/include/tuple_sketch_impl.hpp b/tuple/include/tuple_sketch_impl.hpp
index 23a5a3b..75fa337 100644
--- a/tuple/include/tuple_sketch_impl.hpp
+++ b/tuple/include/tuple_sketch_impl.hpp
@@ -312,8 +312,41 @@ size_t compact_tuple_sketch<S, A>::get_serialized_size_summaries_bytes(const SD&
 template<typename S, typename A>
 template<typename SerDe>
 void compact_tuple_sketch<S, A>::serialize(std::ostream& os, const SerDe& sd) const {
-  unused(os);
-  unused(sd);
+  const bool is_single_item = entries_.size() == 1 && !this->is_estimation_mode();
+  const uint8_t preamble_longs = this->is_empty() || is_single_item ? 1 : this->is_estimation_mode() ? 3 : 2;
+  os.write((char*)&preamble_longs, sizeof(preamble_longs));
+  const uint8_t serial_version = SERIAL_VERSION;
+  os.write((char*)&serial_version, sizeof(serial_version));
+  const uint8_t type = SKETCH_TYPE;
+  os.write((char*)&type, sizeof(type));
+  const uint16_t unused16 = 0;
+  os.write((char*)&unused16, sizeof(unused16));
+  const uint8_t flags_byte(
+    (1 << flags::IS_COMPACT) |
+    (1 << flags::IS_READ_ONLY) |
+    (this->is_empty() ? 1 << flags::IS_EMPTY : 0) |
+    (this->is_ordered() ? 1 << flags::IS_ORDERED : 0)
+  );
+  os.write((char*)&flags_byte, sizeof(flags_byte));
+  const uint16_t seed_hash = get_seed_hash();
+  os.write((char*)&seed_hash, sizeof(seed_hash));
+  if (!this->is_empty()) {
+    if (!is_single_item) {
+      const uint32_t num_entries = entries_.size();
+      os.write((char*)&num_entries, sizeof(num_entries));
+      const uint32_t unused32 = 0;
+      os.write((char*)&unused32, sizeof(unused32));
+      if (this->is_estimation_mode()) {
+        os.write((char*)&(this->theta_), sizeof(uint64_t));
+      }
+    }
+    for (const auto& it: entries_) {
+      os.write((char*)&it.first, sizeof(uint64_t));
+    }
+    for (const auto& it: entries_) {
+      sd.serialize(os, &it.second, 1);
+    }
+  }
 }
 
 template<typename S, typename A>
@@ -335,10 +368,10 @@ auto compact_tuple_sketch<S, A>::serialize(unsigned header_size_bytes, const Ser
   const uint16_t unused16 = 0;
   ptr += copy_to_mem(&unused16, ptr, sizeof(unused16));
   const uint8_t flags_byte(
-    (1 << Base::flags::IS_COMPACT) |
-    (1 << Base::flags::IS_READ_ONLY) |
-    (this->is_empty() ? 1 << Base::flags::IS_EMPTY : 0) |
-    (this->is_ordered() ? 1 << Base::flags::IS_ORDERED : 0)
+    (1 << flags::IS_COMPACT) |
+    (1 << flags::IS_READ_ONLY) |
+    (this->is_empty() ? 1 << flags::IS_EMPTY : 0) |
+    (this->is_ordered() ? 1 << flags::IS_ORDERED : 0)
   );
   ptr += copy_to_mem(&flags_byte, ptr, sizeof(flags_byte));
   const uint16_t seed_hash = get_seed_hash();
@@ -365,6 +398,57 @@ auto compact_tuple_sketch<S, A>::serialize(unsigned header_size_bytes, const Ser
 
 template<typename S, typename A>
 template<typename SerDe>
+compact_tuple_sketch<S, A> compact_tuple_sketch<S, A>::deserialize(std::istream& is, uint64_t seed, const SerDe& sd) {
+  uint8_t preamble_longs;
+  is.read((char*)&preamble_longs, sizeof(preamble_longs));
+  uint8_t serial_version;
+  is.read((char*)&serial_version, sizeof(serial_version));
+  uint8_t type;
+  is.read((char*)&type, sizeof(type));
+  uint16_t unused16;
+  is.read((char*)&unused16, sizeof(unused16));
+  uint8_t flags_byte;
+  is.read((char*)&flags_byte, sizeof(flags_byte));
+  uint16_t seed_hash;
+  is.read((char*)&seed_hash, sizeof(seed_hash));
+  checker<true>::check_sketch_type(type, SKETCH_TYPE);
+  checker<true>::check_serial_version(serial_version, SERIAL_VERSION);
+  const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
+  if (!is_empty) checker<true>::check_seed_hash(seed_hash, compute_seed_hash(seed));
+
+  uint64_t theta = theta_constants::MAX_THETA;
+  uint32_t num_entries = 0;
+  if (!is_empty) {
+    if (preamble_longs == 1) {
+      num_entries = 1;
+    } else {
+      is.read((char*)&num_entries, sizeof(num_entries));
+      uint32_t unused32;
+      is.read((char*)&unused32, sizeof(unused32));
+      if (preamble_longs > 2) {
+        is.read((char*)&theta, sizeof(theta));
+      }
+    }
+  }
+  std::vector<Entry, AllocEntry> entries;
+  if (!is_empty) {
+    entries.reserve(num_entries);
+    std::vector<uint64_t, AllocU64> keys(num_entries);
+    is.read((char*)keys.data(), num_entries * sizeof(uint64_t));
+    std::unique_ptr<S, deleter_of_summaries> summaries(A().allocate(num_entries), deleter_of_summaries(num_entries, false));
+    sd.deserialize(is, summaries.get(), num_entries);
+    summaries.get_deleter().set_destroy(true); // serde did not throw, so the items must be constructed
+    for (size_t i = 0; i < num_entries; ++i) {
+      entries.push_back(Entry(keys[i], std::move(summaries.get()[i])));
+    }
+  }
+  if (!is.good()) throw std::runtime_error("error reading from std::istream");
+  const bool is_ordered = flags_byte & (1 << flags::IS_ORDERED);
+  return compact_tuple_sketch(is_empty, is_ordered, seed_hash, theta, std::move(entries));
+}
+
+template<typename S, typename A>
+template<typename SerDe>
 compact_tuple_sketch<S, A> compact_tuple_sketch<S, A>::deserialize(const void* bytes, size_t size, uint64_t seed, const SerDe& sd) {
   ensure_minimum_memory(size, 8);
   const char* ptr = static_cast<const char*>(bytes);
@@ -383,7 +467,7 @@ compact_tuple_sketch<S, A> compact_tuple_sketch<S, A>::deserialize(const void* b
   ptr += copy_from_mem(ptr, &seed_hash, sizeof(seed_hash));
   checker<true>::check_sketch_type(type, SKETCH_TYPE);
   checker<true>::check_serial_version(serial_version, SERIAL_VERSION);
-  const bool is_empty = flags_byte & (1 << Base::flags::IS_EMPTY);
+  const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
   if (!is_empty) checker<true>::check_seed_hash(seed_hash, compute_seed_hash(seed));
 
   uint64_t theta = theta_constants::MAX_THETA;
@@ -404,7 +488,6 @@ compact_tuple_sketch<S, A> compact_tuple_sketch<S, A>::deserialize(const void* b
     }
   }
   const size_t keys_size_bytes = sizeof(uint64_t) * num_entries;
-  //check_memory_size(ptr - base + keys_size_bytes, size);
   ensure_minimum_memory(size, ptr - base + keys_size_bytes);
   std::vector<Entry, AllocEntry> entries;
   if (!is_empty) {
@@ -418,7 +501,7 @@ compact_tuple_sketch<S, A> compact_tuple_sketch<S, A>::deserialize(const void* b
       entries.push_back(Entry(keys[i], std::move(summaries.get()[i])));
     }
   }
-  const bool is_ordered = flags_byte & (1 << Base::flags::IS_ORDERED);
+  const bool is_ordered = flags_byte & (1 << flags::IS_ORDERED);
   return compact_tuple_sketch(is_empty, is_ordered, seed_hash, theta, std::move(entries));
 }
 
diff --git a/tuple/test/tuple_sketch_test.cpp b/tuple/test/tuple_sketch_test.cpp
index a50cb48..560fef3 100644
--- a/tuple/test/tuple_sketch_test.cpp
+++ b/tuple/test/tuple_sketch_test.cpp
@@ -50,6 +50,7 @@ TEST_CASE("tuple sketch float: builder", "[tuple_sketch]") {
 
 TEST_CASE("tuple sketch float: empty", "[tuple_sketch]") {
   auto update_sketch = update_tuple_sketch<float>::builder().build();
+  std::cout << "sizeof(update_tuple_sketch<float>)=" << sizeof(update_sketch) << std::endl;
   REQUIRE(update_sketch.is_empty());
   REQUIRE(!update_sketch.is_estimation_mode());
   REQUIRE(update_sketch.get_estimate() == 0);
@@ -60,6 +61,7 @@ TEST_CASE("tuple sketch float: empty", "[tuple_sketch]") {
   REQUIRE(!update_sketch.is_ordered());
 
   auto compact_sketch = update_sketch.compact();
+  std::cout << "sizeof(compact_tuple_sketch<float>)=" << sizeof(compact_sketch) << std::endl;
   REQUIRE(compact_sketch.is_empty());
   REQUIRE(!compact_sketch.is_estimation_mode());
   REQUIRE(compact_sketch.get_estimate() == 0);
@@ -108,17 +110,34 @@ TEST_CASE("tuple sketch float: exact mode", "[tuple_sketch]") {
   }
   REQUIRE(count == 2);
 
-  auto bytes = compact_sketch.serialize();
-  auto deserialized_sketch = compact_tuple_sketch<float>::deserialize(bytes.data(), bytes.size());
-  REQUIRE(!deserialized_sketch.is_empty());
-  REQUIRE(!deserialized_sketch.is_estimation_mode());
-  REQUIRE(deserialized_sketch.get_estimate() == 2);
-  REQUIRE(deserialized_sketch.get_lower_bound(1) == 2);
-  REQUIRE(deserialized_sketch.get_upper_bound(1) == 2);
-  REQUIRE(deserialized_sketch.get_theta() == 1);
-  REQUIRE(deserialized_sketch.get_num_retained() == 2);
-  REQUIRE(deserialized_sketch.is_ordered());
-//  std::cout << deserialized_sketch.to_string(true);
+  { // stream
+    std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+    compact_sketch.serialize(s);
+    auto deserialized_sketch = compact_tuple_sketch<float>::deserialize(s);
+    REQUIRE(!deserialized_sketch.is_empty());
+    REQUIRE(!deserialized_sketch.is_estimation_mode());
+    REQUIRE(deserialized_sketch.get_estimate() == 2);
+    REQUIRE(deserialized_sketch.get_lower_bound(1) == 2);
+    REQUIRE(deserialized_sketch.get_upper_bound(1) == 2);
+    REQUIRE(deserialized_sketch.get_theta() == 1);
+    REQUIRE(deserialized_sketch.get_num_retained() == 2);
+    REQUIRE(deserialized_sketch.is_ordered());
+    std::cout << "deserialized sketch:" << std::endl;
+    std::cout << deserialized_sketch.to_string(true);
+  }
+  { // bytes
+    auto bytes = compact_sketch.serialize();
+    auto deserialized_sketch = compact_tuple_sketch<float>::deserialize(bytes.data(), bytes.size());
+    REQUIRE(!deserialized_sketch.is_empty());
+    REQUIRE(!deserialized_sketch.is_estimation_mode());
+    REQUIRE(deserialized_sketch.get_estimate() == 2);
+    REQUIRE(deserialized_sketch.get_lower_bound(1) == 2);
+    REQUIRE(deserialized_sketch.get_upper_bound(1) == 2);
+    REQUIRE(deserialized_sketch.get_theta() == 1);
+    REQUIRE(deserialized_sketch.get_num_retained() == 2);
+    REQUIRE(deserialized_sketch.is_ordered());
+//    std::cout << deserialized_sketch.to_string(true);
+  }
 }
 
 template<typename T>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org