You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/07/23 03:27:50 UTC

[incubator-datasketches-cpp] 01/03: serialization and deserialization

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch tuple_sketch
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git

commit c32d55d5799fdd509c5f67206e413093d9986f55
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Wed Jul 22 20:26:45 2020 -0700

    serialization and deserialization
---
 tuple/include/theta_sketch_experimental.hpp      |  14 +-
 tuple/include/theta_sketch_experimental_impl.hpp | 167 ++++++++++++++++++++++-
 2 files changed, 175 insertions(+), 6 deletions(-)

diff --git a/tuple/include/theta_sketch_experimental.hpp b/tuple/include/theta_sketch_experimental.hpp
index 64629e7..49c4dd7 100644
--- a/tuple/include/theta_sketch_experimental.hpp
+++ b/tuple/include/theta_sketch_experimental.hpp
@@ -20,7 +20,6 @@
 #ifndef THETA_SKETCH_EXPERIMENTAL_HPP_
 #define THETA_SKETCH_EXPERIMENTAL_HPP_
 
-#include "serde.hpp"
 #include "theta_update_sketch_base.hpp"
 
 namespace datasketches {
@@ -284,9 +283,9 @@ public:
   virtual const_iterator end() const;
 
 private:
-  enum flags { IS_BIG_ENDIAN, IS_READ_ONLY, IS_EMPTY, IS_COMPACT, IS_ORDERED };
   theta_table table_;
 
+  // for builder
   update_theta_sketch_experimental(uint8_t lg_cur_size, uint8_t lg_nom_size, resize_factor rf, uint64_t theta,
       uint64_t seed, const Allocator& allocator);
 
@@ -304,6 +303,9 @@ public:
   using AllocBytes = typename std::allocator_traits<Allocator>::template rebind_alloc<uint8_t>;
   using vector_bytes = std::vector<uint8_t, AllocBytes>;
 
+  static const uint8_t SERIAL_VERSION = 3;
+  static const uint8_t SKETCH_TYPE = 3;
+
   // Instances of this type can be obtained:
   // - by compacting an update_theta_sketch
   // - as a result of a set operation
@@ -349,7 +351,8 @@ public:
    * @param seed the seed for the hash function that was used to create the sketch
    * @return an instance of the sketch
    */
-  static compact_theta_sketch_experimental deserialize(std::istream& is, uint64_t seed = DEFAULT_SEED);
+  static compact_theta_sketch_experimental deserialize(std::istream& is,
+      uint64_t seed = DEFAULT_SEED, const Allocator& allocator = Allocator());
 
   /**
    * This method deserializes a sketch from a given array of bytes.
@@ -358,12 +361,15 @@ public:
    * @param seed the seed for the hash function that was used to create the sketch
    * @return an instance of the sketch
    */
-  static compact_theta_sketch_experimental deserialize(const void* bytes, size_t size, uint64_t seed = DEFAULT_SEED);
+  static compact_theta_sketch_experimental deserialize(const void* bytes, size_t size,
+      uint64_t seed = DEFAULT_SEED, const Allocator& allocator = Allocator());
 
   // for internal use
   compact_theta_sketch_experimental(bool is_empty, bool is_ordered, uint16_t seed_hash, uint64_t theta, std::vector<uint64_t, Allocator>&& entries);
 
 private:
+  enum flags { IS_BIG_ENDIAN, IS_READ_ONLY, IS_EMPTY, IS_COMPACT, IS_ORDERED };
+
   bool is_empty_;
   bool is_ordered_;
   uint16_t seed_hash_;
diff --git a/tuple/include/theta_sketch_experimental_impl.hpp b/tuple/include/theta_sketch_experimental_impl.hpp
index 4471215..f5bfd66 100644
--- a/tuple/include/theta_sketch_experimental_impl.hpp
+++ b/tuple/include/theta_sketch_experimental_impl.hpp
@@ -19,7 +19,9 @@
 
 #include <sstream>
 
+#include "serde.hpp"
 #include "binomial_bounds.hpp"
+#include "theta_helpers.hpp"
 
 namespace datasketches {
 
@@ -241,7 +243,7 @@ update_theta_sketch_experimental<A> update_theta_sketch_experimental<A>::builder
 template<typename A>
 compact_theta_sketch_experimental<A>::compact_theta_sketch_experimental(const Base& other, bool ordered):
 is_empty_(other.is_empty()),
-is_ordered_(other.is_ordered()),
+is_ordered_(other.is_ordered() || ordered),
 seed_hash_(other.get_seed_hash()),
 theta_(other.get_theta64()),
 entries_(other.get_allocator())
@@ -312,7 +314,168 @@ auto compact_theta_sketch_experimental<A>::end() const -> const_iterator {
 }
 
 template<typename A>
-void compact_theta_sketch_experimental<A>::print_specifics(std::ostringstream& os) const {
+void compact_theta_sketch_experimental<A>::print_specifics(std::ostringstream&) const {}
+
+template<typename A>
+void compact_theta_sketch_experimental<A>::serialize(std::ostream& os) const {
+  const bool is_single_item = entries_.size() == 1 && !this->is_estimation_mode();
+  const uint8_t preamble_longs = this->is_empty() || is_single_item ? 1 : this->is_estimation_mode() ? 3 : 2;
+  os.write(reinterpret_cast<const char*>(&preamble_longs), sizeof(preamble_longs));
+  const uint8_t serial_version = SERIAL_VERSION;
+  os.write((char*)&serial_version, sizeof(serial_version));
+  const uint8_t type = SKETCH_TYPE;
+  os.write((char*)&type, sizeof(type));
+  const uint16_t unused16 = 0;
+  os.write((char*)&unused16, sizeof(unused16));
+  const uint8_t flags_byte(
+    (1 << flags::IS_COMPACT) |
+    (1 << flags::IS_READ_ONLY) |
+    (this->is_empty() ? 1 << flags::IS_EMPTY : 0) |
+    (this->is_ordered() ? 1 << flags::IS_ORDERED : 0)
+  );
+  os.write((char*)&flags_byte, sizeof(flags_byte));
+  const uint16_t seed_hash = get_seed_hash();
+  os.write((char*)&seed_hash, sizeof(seed_hash));
+  if (!this->is_empty()) {
+    if (!is_single_item) {
+      const uint32_t num_entries = entries_.size();
+      os.write((char*)&num_entries, sizeof(num_entries));
+      const uint32_t unused32 = 0;
+      os.write((char*)&unused32, sizeof(unused32));
+      if (this->is_estimation_mode()) {
+        os.write((char*)&(this->theta_), sizeof(uint64_t));
+      }
+    }
+    os.write((char*)entries_.data(), entries_.size() * sizeof(uint64_t));
+  }
+}
+
+template<typename A>
+auto compact_theta_sketch_experimental<A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
+  const bool is_single_item = entries_.size() == 1 && !this->is_estimation_mode();
+  const uint8_t preamble_longs = this->is_empty() || is_single_item ? 1 : this->is_estimation_mode() ? 3 : 2;
+  const size_t size = header_size_bytes + sizeof(uint64_t) * preamble_longs
+      + sizeof(uint64_t) * entries_.size();
+  vector_bytes bytes(size, 0, entries_.get_allocator());
+  uint8_t* ptr = bytes.data() + header_size_bytes;
+
+  ptr += copy_to_mem(&preamble_longs, ptr, sizeof(preamble_longs));
+  const uint8_t serial_version = SERIAL_VERSION;
+  ptr += copy_to_mem(&serial_version, ptr, sizeof(serial_version));
+  const uint8_t type = SKETCH_TYPE;
+  ptr += copy_to_mem(&type, ptr, sizeof(type));
+  const uint16_t unused16 = 0;
+  ptr += copy_to_mem(&unused16, ptr, sizeof(unused16));
+  const uint8_t flags_byte(
+    (1 << flags::IS_COMPACT) |
+    (1 << flags::IS_READ_ONLY) |
+    (this->is_empty() ? 1 << flags::IS_EMPTY : 0) |
+    (this->is_ordered() ? 1 << flags::IS_ORDERED : 0)
+  );
+  ptr += copy_to_mem(&flags_byte, ptr, sizeof(flags_byte));
+  const uint16_t seed_hash = get_seed_hash();
+  ptr += copy_to_mem(&seed_hash, ptr, sizeof(seed_hash));
+  if (!this->is_empty()) {
+    if (!is_single_item) {
+      const uint32_t num_entries = entries_.size();
+      ptr += copy_to_mem(&num_entries, ptr, sizeof(num_entries));
+      const uint32_t unused32 = 0;
+      ptr += copy_to_mem(&unused32, ptr, sizeof(unused32));
+      if (this->is_estimation_mode()) {
+        ptr += copy_to_mem(&theta_, ptr, sizeof(uint64_t));
+      }
+    }
+    ptr += copy_to_mem(entries_.data(), ptr, entries_.size() * sizeof(uint64_t));
+  }
+  return bytes;
+}
+
+template<typename A>
+compact_theta_sketch_experimental<A> compact_theta_sketch_experimental<A>::deserialize(std::istream& is, uint64_t seed, const A& allocator) {
+  uint8_t preamble_longs;
+  is.read((char*)&preamble_longs, sizeof(preamble_longs));
+  uint8_t serial_version;
+  is.read((char*)&serial_version, sizeof(serial_version));
+  uint8_t type;
+  is.read((char*)&type, sizeof(type));
+  uint16_t unused16;
+  is.read((char*)&unused16, sizeof(unused16));
+  uint8_t flags_byte;
+  is.read((char*)&flags_byte, sizeof(flags_byte));
+  uint16_t seed_hash;
+  is.read((char*)&seed_hash, sizeof(seed_hash));
+  checker<true>::check_sketch_type(type, SKETCH_TYPE);
+  checker<true>::check_serial_version(serial_version, SERIAL_VERSION);
+  const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
+  if (!is_empty) checker<true>::check_seed_hash(seed_hash, compute_seed_hash(seed));
+
+  uint64_t theta = theta_constants::MAX_THETA;
+  uint32_t num_entries = 0;
+  if (!is_empty) {
+    if (preamble_longs == 1) {
+      num_entries = 1;
+    } else {
+      is.read((char*)&num_entries, sizeof(num_entries));
+      uint32_t unused32;
+      is.read((char*)&unused32, sizeof(unused32));
+      if (preamble_longs > 2) {
+        is.read((char*)&theta, sizeof(theta));
+      }
+    }
+  }
+  std::vector<uint64_t, A> entries(num_entries, 0, allocator);
+  if (!is_empty) is.read((char*)entries.data(), sizeof(uint64_t) * entries.size());
+
+  const bool is_ordered = flags_byte & (1 << flags::IS_ORDERED);
+  if (!is.good()) throw std::runtime_error("error reading from std::istream");
+  return compact_theta_sketch_experimental(is_empty, is_ordered, seed_hash, theta, std::move(entries));
+}
+
+template<typename A>
+compact_theta_sketch_experimental<A> compact_theta_sketch_experimental<A>::deserialize(const void* bytes, size_t size, uint64_t seed, const A& allocator) {
+  ensure_minimum_memory(size, 8);
+  const char* ptr = static_cast<const char*>(bytes);
+  const char* base = ptr;
+  uint8_t preamble_longs;
+  ptr += copy_from_mem(ptr, &preamble_longs, sizeof(preamble_longs));
+  uint8_t serial_version;
+  ptr += copy_from_mem(ptr, &serial_version, sizeof(serial_version));
+  uint8_t type;
+  ptr += copy_from_mem(ptr, &type, sizeof(type));
+  uint16_t unused16;
+  ptr += copy_from_mem(ptr, &unused16, sizeof(unused16));
+  uint8_t flags_byte;
+  ptr += copy_from_mem(ptr, &flags_byte, sizeof(flags_byte));
+  uint16_t seed_hash;
+  ptr += copy_from_mem(ptr, &seed_hash, sizeof(seed_hash));
+  checker<true>::check_sketch_type(type, SKETCH_TYPE);
+  checker<true>::check_serial_version(serial_version, SERIAL_VERSION);
+  const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
+  if (!is_empty) checker<true>::check_seed_hash(seed_hash, compute_seed_hash(seed));
+
+  uint64_t theta = theta_constants::MAX_THETA;
+  uint32_t num_entries = 0;
+  if (!is_empty) {
+    if (preamble_longs == 1) {
+      num_entries = 1;
+    } else {
+      ensure_minimum_memory(size, 8); // read the first prelong before this method
+      ptr += copy_from_mem(ptr, &num_entries, sizeof(num_entries));
+      uint32_t unused32;
+      ptr += copy_from_mem(ptr, &unused32, sizeof(unused32));
+      if (preamble_longs > 2) {
+        ensure_minimum_memory(size, (preamble_longs - 1) << 3);
+        ptr += copy_from_mem(ptr, &theta, sizeof(theta));
+      }
+    }
+  }
+  const size_t entries_size_bytes = sizeof(uint64_t) * num_entries;
+  check_memory_size(ptr - base + entries_size_bytes, size);
+  std::vector<uint64_t, A> entries(num_entries, 0, allocator);
+  if (!is_empty) ptr += copy_from_mem(ptr, entries.data(), entries_size_bytes);
+
+  const bool is_ordered = flags_byte & (1 << flags::IS_ORDERED);
+  return compact_theta_sketch_experimental(is_empty, is_ordered, seed_hash, theta, std::move(entries));
 }
 
 } /* namespace datasketches */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org