You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2023/01/13 03:38:58 UTC

[datasketches-cpp] branch theta_compression updated: stream serialization for compressed

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch theta_compression
in repository https://gitbox.apache.org/repos/asf/datasketches-cpp.git


The following commit(s) were added to refs/heads/theta_compression by this push:
     new 2c1d9ef  stream serialization for compressed
2c1d9ef is described below

commit 2c1d9ef4640ab9920392313d4d507f44d888f717
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Thu Jan 12 19:38:53 2023 -0800

    stream serialization for compressed
---
 common/include/common_defs.hpp      |   2 +-
 theta/include/theta_sketch.hpp      |  30 +++-
 theta/include/theta_sketch_impl.hpp | 327 ++++++++++++++++++++++++------------
 theta/test/theta_sketch_test.cpp    |  32 +++-
 4 files changed, 275 insertions(+), 116 deletions(-)

diff --git a/common/include/common_defs.hpp b/common/include/common_defs.hpp
index 7437536..c066f63 100644
--- a/common/include/common_defs.hpp
+++ b/common/include/common_defs.hpp
@@ -77,7 +77,7 @@ static inline void read(std::istream& is, T* ptr, size_t size_bytes) {
 }
 
 template<typename T>
-static inline void write(std::ostream& os, T& value) {
+static inline void write(std::ostream& os, T value) {
   os.write(reinterpret_cast<const char*>(&value), sizeof(T));
 }
 
diff --git a/theta/include/theta_sketch.hpp b/theta/include/theta_sketch.hpp
index 8950b63..b374d14 100644
--- a/theta/include/theta_sketch.hpp
+++ b/theta/include/theta_sketch.hpp
@@ -318,7 +318,8 @@ public:
   using AllocBytes = typename std::allocator_traits<Allocator>::template rebind_alloc<uint8_t>;
   using vector_bytes = std::vector<uint8_t, AllocBytes>;
 
-  static const uint8_t SERIAL_VERSION = 3;
+  static const uint8_t UNCOMPRESSED_SERIAL_VERSION = 3;
+  static const uint8_t COMPRESSED_SERIAL_VERSION = 4;
   static const uint8_t SKETCH_TYPE = 3;
 
   // Instances of this type can be obtained:
@@ -356,6 +357,23 @@ public:
    */
   vector_bytes serialize(unsigned header_size_bytes = 0) const;
 
+  /**
+   * This method serializes the sketch into a given stream in a compressed binary form.
+   * Compression is applied to ordered sketches except empty and single item.
+   * For unordered, empty and single item sketches this method is equivalent to serialize()
+   * @param os output stream
+   */
+  void serialize_compressed(std::ostream& os) const;
+
+  /**
+   * This method serializes the sketch as a vector of bytes.
+   * An optional header can be reserved in front of the sketch.
+   * It is an uninitialized space of a given size.
+   * This header is used in Datasketches PostgreSQL extension.
+   * Compression is applied to ordered sketches except empty and single item.
+   * For unordered, empty and single item sketches this method is equivalent to serialize()
+   * @param header_size_bytes space to reserve in front of the sketch
+   */
   vector_bytes serialize_compressed(unsigned header_size_bytes = 0) const;
 
   virtual iterator begin();
@@ -394,7 +412,15 @@ private:
   uint64_t theta_;
   std::vector<uint64_t, Allocator> entries_;
 
-  vector_bytes serialize_version_4_MLZ(unsigned header_size_bytes = 0) const;
+  bool is_suitable_for_compression() const;
+  uint8_t compute_min_leading_zeros() const;
+  void serialize_version_4(std::ostream& os) const;
+  vector_bytes serialize_version_4(unsigned header_size_bytes = 0) const;
+
+  static compact_theta_sketch_alloc deserialize_v1(uint8_t preamble_longs, std::istream& is, uint64_t seed, const Allocator& allocator);
+  static compact_theta_sketch_alloc deserialize_v2(uint8_t preamble_longs, std::istream& is, uint64_t seed, const Allocator& allocator);
+  static compact_theta_sketch_alloc deserialize_v3(uint8_t preamble_longs, std::istream& is, uint64_t seed, const Allocator& allocator);
+  static compact_theta_sketch_alloc deserialize_v4(uint8_t preamble_longs, std::istream& is, uint64_t seed, const Allocator& allocator);
 
   virtual void print_specifics(std::ostringstream& os) const;
 };
diff --git a/theta/include/theta_sketch_impl.hpp b/theta/include/theta_sketch_impl.hpp
index ed50bba..888fd09 100644
--- a/theta/include/theta_sketch_impl.hpp
+++ b/theta/include/theta_sketch_impl.hpp
@@ -345,12 +345,9 @@ template<typename A>
 void compact_theta_sketch_alloc<A>::serialize(std::ostream& os) const {
   const uint8_t preamble_longs = this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2;
   write(os, preamble_longs);
-  const uint8_t serial_version = SERIAL_VERSION;
-  write(os, serial_version);
-  const uint8_t type = SKETCH_TYPE;
-  write(os, type);
-  const uint16_t unused16 = 0;
-  write(os, unused16);
+  write(os, UNCOMPRESSED_SERIAL_VERSION);
+  write(os, SKETCH_TYPE);
+  write<uint16_t>(os, 0); // unused
   const uint8_t flags_byte(
     (1 << flags::IS_COMPACT) |
     (1 << flags::IS_READ_ONLY) |
@@ -358,13 +355,10 @@ void compact_theta_sketch_alloc<A>::serialize(std::ostream& os) const {
     (this->is_ordered() ? 1 << flags::IS_ORDERED : 0)
   );
   write(os, flags_byte);
-  const uint16_t seed_hash = get_seed_hash();
-  write(os, seed_hash);
+  write(os, get_seed_hash());
   if (preamble_longs > 1) {
-    const uint32_t num_entries = static_cast<uint32_t>(entries_.size());
-    write(os, num_entries);
-    const uint32_t unused32 = 0;
-    write(os, unused32);
+    write(os, static_cast<uint32_t>(entries_.size()));
+    write<uint32_t>(os, 0); // unused
   }
   if (this->is_estimation_mode()) write(os, this->theta_);
   if (entries_.size() > 0) write(os, entries_.data(), entries_.size() * sizeof(uint64_t));
@@ -379,7 +373,7 @@ auto compact_theta_sketch_alloc<A>::serialize(unsigned header_size_bytes) const
   uint8_t* ptr = bytes.data() + header_size_bytes;
 
   ptr += copy_to_mem(preamble_longs, ptr);
-  const uint8_t serial_version = SERIAL_VERSION;
+  const uint8_t serial_version = UNCOMPRESSED_SERIAL_VERSION;
   ptr += copy_to_mem(serial_version, ptr);
   const uint8_t type = SKETCH_TYPE;
   ptr += copy_to_mem(type, ptr);
@@ -404,16 +398,27 @@ auto compact_theta_sketch_alloc<A>::serialize(unsigned header_size_bytes) const
 }
 
 template<typename A>
-auto compact_theta_sketch_alloc<A>::serialize_compressed(unsigned header_size_bytes) const -> vector_bytes {
+bool compact_theta_sketch_alloc<A>::is_suitable_for_compression() const {
   if (!this->is_ordered() || entries_.size() == 0 ||
-      (entries_.size() == 1 && !this->is_estimation_mode())) return serialize(header_size_bytes);
-  return serialize_version_4_MLZ(header_size_bytes);
+        (entries_.size() == 1 && !this->is_estimation_mode())) return false;
+  return true;
 }
 
 template<typename A>
-auto compact_theta_sketch_alloc<A>::serialize_version_4_MLZ(unsigned header_size_bytes) const -> vector_bytes {
-  const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1;
-  // compression based on leading zeros in deltas between ordered hash values
+void compact_theta_sketch_alloc<A>::serialize_compressed(std::ostream& os) const {
+  if (is_suitable_for_compression()) return serialize_version_4(os);
+  return serialize(os);
+}
+
+template<typename A>
+auto compact_theta_sketch_alloc<A>::serialize_compressed(unsigned header_size_bytes) const -> vector_bytes {
+  if (is_suitable_for_compression()) return serialize_version_4(header_size_bytes);
+  return serialize(header_size_bytes);
+}
+
+template<typename A>
+uint8_t compact_theta_sketch_alloc<A>::compute_min_leading_zeros() const {
+  // compression is based on leading zeros in deltas between ordered hash values
   // assumes ordered sketch
   uint64_t previous = 0;
   uint64_t ored = 0;
@@ -422,7 +427,66 @@ auto compact_theta_sketch_alloc<A>::serialize_version_4_MLZ(unsigned header_size
     ored |= delta;
     previous = entry;
   }
-  uint8_t min_entry_zeros = count_leading_zeros_in_u64(ored);
+  return count_leading_zeros_in_u64(ored);
+}
+
+template<typename A>
+void compact_theta_sketch_alloc<A>::serialize_version_4(std::ostream& os) const {
+  const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1;
+  uint8_t min_entry_zeros = compute_min_leading_zeros();
+
+  // store num_entries as whole bytes since whole-byte blocks will follow (most probably)
+  const uint8_t num_entries_bytes = whole_bytes_to_hold_bits<uint8_t>(32 - count_leading_zeros_in_u32(entries_.size()));
+
+  write(os, preamble_longs);
+  write(os, COMPRESSED_SERIAL_VERSION);
+  write(os, SKETCH_TYPE);
+  write(os, min_entry_zeros);
+  write(os, num_entries_bytes);
+  const uint8_t flags_byte(
+    (1 << flags::IS_COMPACT) |
+    (1 << flags::IS_READ_ONLY) |
+    (this->is_empty() ? 1 << flags::IS_EMPTY : 0) |
+    (this->is_ordered() ? 1 << flags::IS_ORDERED : 0)
+  );
+  write(os, flags_byte);
+  write(os, get_seed_hash());
+  if (this->is_estimation_mode()) write(os, this->theta_);
+  uint32_t num_entries = entries_.size();
+  for (unsigned i = 0; i < num_entries_bytes; ++i) {
+    write(os, static_cast<uint8_t>(num_entries & 0xff));
+    num_entries >>= 8;
+  }
+
+  const uint8_t entry_bits = 64 - min_entry_zeros;
+  uint64_t previous = 0;
+  uint64_t deltas[8];
+  vector_bytes buffer(entry_bits, 0, entries_.get_allocator()); // block of 8 entries takes entry_bits bytes
+
+  unsigned i;
+  for (i = 0; i + 7 < entries_.size(); i += 8) {
+    for (unsigned j = 0; j < 8; ++j) {
+      deltas[j] = entries_[i + j] - previous;
+      previous = entries_[i + j];
+    }
+    pack_bits_block8(deltas, buffer.data(), entry_bits);
+    write(os, buffer.data(), buffer.size());
+  }
+
+  uint8_t offset = 0;
+  uint8_t* ptr = buffer.data();
+  for (; i < entries_.size(); ++i) {
+    const uint64_t delta = entries_[i] - previous;
+    previous = entries_[i];
+    offset = pack_bits(delta, entry_bits, ptr, offset);
+  }
+  write(os, buffer.data(), ptr - buffer.data());
+}
+
+template<typename A>
+auto compact_theta_sketch_alloc<A>::serialize_version_4(unsigned header_size_bytes) const -> vector_bytes {
+  const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1;
+  uint8_t min_entry_zeros = compute_min_leading_zeros();
   size_t compressed_bits = (64 - min_entry_zeros) * entries_.size();
 
   // store num_entries as whole bytes since whole-byte blocks will follow (most probably)
@@ -434,7 +498,7 @@ auto compact_theta_sketch_alloc<A>::serialize_version_4_MLZ(unsigned header_size
   uint8_t* ptr = bytes.data() + header_size_bytes;
 
   ptr += copy_to_mem(preamble_longs, ptr);
-  const uint8_t serial_version = 4;
+  const uint8_t serial_version = COMPRESSED_SERIAL_VERSION;
   ptr += copy_to_mem<uint8_t>(serial_version, ptr);
   ptr += copy_to_mem(SKETCH_TYPE, ptr);
   ptr += copy_to_mem(min_entry_zeros, ptr);
@@ -457,8 +521,7 @@ auto compact_theta_sketch_alloc<A>::serialize_version_4_MLZ(unsigned header_size
   }
 
   const uint8_t entry_bits = 64 - min_entry_zeros;
-
-  previous = 0;
+  uint64_t previous = 0;
   uint64_t deltas[8];
 
   unsigned i;
@@ -485,101 +548,145 @@ compact_theta_sketch_alloc<A> compact_theta_sketch_alloc<A>::deserialize(std::is
   const auto preamble_longs = read<uint8_t>(is);
   const auto serial_version = read<uint8_t>(is);
   const auto type = read<uint8_t>(is);
+  checker<true>::check_sketch_type(type, SKETCH_TYPE);
   switch (serial_version) {
-  case SERIAL_VERSION: {
-      read<uint16_t>(is); // unused
-      const auto flags_byte = read<uint8_t>(is);
-      const auto seed_hash = read<uint16_t>(is);
-      checker<true>::check_sketch_type(type, SKETCH_TYPE);
-      checker<true>::check_serial_version(serial_version, SERIAL_VERSION);
-      const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
-      if (!is_empty) checker<true>::check_seed_hash(seed_hash, compute_seed_hash(seed));
-
-      uint64_t theta = theta_constants::MAX_THETA;
-      uint32_t num_entries = 0;
-      if (!is_empty) {
-        if (preamble_longs == 1) {
-          num_entries = 1;
-        } else {
-          num_entries = read<uint32_t>(is);
-          read<uint32_t>(is); // unused
-          if (preamble_longs > 2) {
-            theta = read<uint64_t>(is);
-          }
-        }
-      }
-      std::vector<uint64_t, A> entries(num_entries, 0, allocator);
-      if (!is_empty) read(is, entries.data(), sizeof(uint64_t) * entries.size());
+    case 4:
+      return deserialize_v4(preamble_longs, is, seed, allocator);
+    case 3:
+      return deserialize_v3(preamble_longs, is, seed, allocator);
+    case 1:
+      return deserialize_v1(preamble_longs, is, seed, allocator);
+    case 2:
+      return deserialize_v2(preamble_longs, is, seed, allocator);
+    default:
+      throw std::invalid_argument("unexpected sketch serialization version " + std::to_string(serial_version));
+  }
+}
 
-      const bool is_ordered = flags_byte & (1 << flags::IS_ORDERED);
+template<typename A>
+compact_theta_sketch_alloc<A> compact_theta_sketch_alloc<A>::deserialize_v1(
+    uint8_t, std::istream& is, uint64_t seed, const A& allocator)
+{
+  const auto seed_hash = compute_seed_hash(seed);
+  read<uint8_t>(is); // unused
+  read<uint32_t>(is); // unused
+  const auto num_entries = read<uint32_t>(is);
+  read<uint32_t>(is); //unused
+  const auto theta = read<uint64_t>(is);
+  std::vector<uint64_t, A> entries(num_entries, 0, allocator);
+  bool is_empty = (num_entries == 0) && (theta == theta_constants::MAX_THETA);
+  if (!is_empty) read(is, entries.data(), sizeof(uint64_t) * entries.size());
+  if (!is.good()) throw std::runtime_error("error reading from std::istream");
+  return compact_theta_sketch_alloc(is_empty, true, seed_hash, theta, std::move(entries));
+}
+
+template<typename A>
+compact_theta_sketch_alloc<A> compact_theta_sketch_alloc<A>::deserialize_v2(
+    uint8_t preamble_longs, std::istream& is, uint64_t seed, const A& allocator)
+{
+  read<uint8_t>(is); // unused
+  read<uint16_t>(is); // unused
+  const uint16_t seed_hash = read<uint16_t>(is);
+  checker<true>::check_seed_hash(seed_hash, compute_seed_hash(seed));
+  if (preamble_longs == 1) {
+    if (!is.good()) throw std::runtime_error("error reading from std::istream");
+    std::vector<uint64_t, A> entries(0, 0, allocator);
+    return compact_theta_sketch_alloc(true, true, seed_hash, theta_constants::MAX_THETA, std::move(entries));
+  } else if (preamble_longs == 2) {
+    const uint32_t num_entries = read<uint32_t>(is);
+    read<uint32_t>(is); // unused
+    std::vector<uint64_t, A> entries(num_entries, 0, allocator);
+    if (num_entries == 0) {
+      return compact_theta_sketch_alloc(true, true, seed_hash, theta_constants::MAX_THETA, std::move(entries));
+    }
+    read(is, entries.data(), entries.size() * sizeof(uint64_t));
+    if (!is.good()) throw std::runtime_error("error reading from std::istream");
+    return compact_theta_sketch_alloc(false, true, seed_hash, theta_constants::MAX_THETA, std::move(entries));
+  } else if (preamble_longs == 3) {
+    const uint32_t num_entries = read<uint32_t>(is);
+    read<uint32_t>(is); // unused
+    const auto theta = read<uint64_t>(is);
+    bool is_empty = (num_entries == 0) && (theta == theta_constants::MAX_THETA);
+    std::vector<uint64_t, A> entries(num_entries, 0, allocator);
+    if (is_empty) {
       if (!is.good()) throw std::runtime_error("error reading from std::istream");
-      return compact_theta_sketch_alloc(is_empty, is_ordered, seed_hash, theta, std::move(entries));
+      return compact_theta_sketch_alloc(true, true, seed_hash, theta, std::move(entries));
+    } else {
+      read(is, entries.data(), sizeof(uint64_t) * entries.size());
+      if (!is.good()) throw std::runtime_error("error reading from std::istream");
+      return compact_theta_sketch_alloc(false, true, seed_hash, theta, std::move(entries));
+    }
+  } else {
+    throw std::invalid_argument(std::to_string(preamble_longs) + " longs of premable, but expected 1, 2, or 3");
   }
-  case 1: {
-      const auto seed_hash = compute_seed_hash(seed);
-      checker<true>::check_sketch_type(type, SKETCH_TYPE);
-      read<uint8_t>(is); // unused
+}
+
+template<typename A>
+compact_theta_sketch_alloc<A> compact_theta_sketch_alloc<A>::deserialize_v3(
+    uint8_t preamble_longs, std::istream& is, uint64_t seed, const A& allocator)
+{
+  read<uint16_t>(is); // unused
+  const auto flags_byte = read<uint8_t>(is);
+  const auto seed_hash = read<uint16_t>(is);
+  const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
+  if (!is_empty) checker<true>::check_seed_hash(seed_hash, compute_seed_hash(seed));
+  uint64_t theta = theta_constants::MAX_THETA;
+  uint32_t num_entries = 0;
+  if (!is_empty) {
+    if (preamble_longs == 1) {
+      num_entries = 1;
+    } else {
+      num_entries = read<uint32_t>(is);
       read<uint32_t>(is); // unused
-      const auto num_entries = read<uint32_t>(is);
-      read<uint32_t>(is); //unused
-      const auto theta = read<uint64_t>(is);
-      std::vector<uint64_t, A> entries(num_entries, 0, allocator);
-      bool is_empty = (num_entries == 0) && (theta == theta_constants::MAX_THETA);
-      if (!is_empty)
-          read(is, entries.data(), sizeof(uint64_t) * entries.size());
-      if (!is.good())
-          throw std::runtime_error("error reading from std::istream");
-      return compact_theta_sketch_alloc(is_empty, true, seed_hash, theta, std::move(entries));
+      if (preamble_longs > 2) theta = read<uint64_t>(is);
+    }
   }
-  case 2: {
-      checker<true>::check_sketch_type(type, SKETCH_TYPE);
-      read<uint8_t>(is); // unused
-      read<uint16_t>(is); // unused
-      const uint16_t seed_hash = read<uint16_t>(is);
-      checker<true>::check_seed_hash(seed_hash, compute_seed_hash(seed));
-      if (preamble_longs == 1) {
-          if (!is.good())
-              throw std::runtime_error("error reading from std::istream");
-          std::vector<uint64_t, A> entries(0, 0, allocator);
-          return compact_theta_sketch_alloc(true, true, seed_hash, theta_constants::MAX_THETA, std::move(entries));
-      } else if (preamble_longs == 2) {
-          const uint32_t num_entries = read<uint32_t>(is);
-          read<uint32_t>(is); // unused
-          std::vector<uint64_t, A> entries(num_entries, 0, allocator);
-          if (num_entries == 0) {
-              return compact_theta_sketch_alloc(true, true, seed_hash, theta_constants::MAX_THETA, std::move(entries));
-          }
-          read(is, entries.data(), entries.size() * sizeof(uint64_t));
-          if (!is.good())
-              throw std::runtime_error("error reading from std::istream");
-          return compact_theta_sketch_alloc(false, true, seed_hash, theta_constants::MAX_THETA, std::move(entries));
-      } else if (preamble_longs == 3) {
-          const uint32_t num_entries = read<uint32_t>(is);
-          read<uint32_t>(is); // unused
-          const auto theta = read<uint64_t>(is);
-          bool is_empty = (num_entries == 0) && (theta == theta_constants::MAX_THETA);
-          std::vector<uint64_t, A> entries(num_entries, 0, allocator);
-          if (is_empty) {
-              if (!is.good())
-                  throw std::runtime_error("error reading from std::istream");
-              return compact_theta_sketch_alloc(true, true, seed_hash, theta, std::move(entries));
-          } else {
-              read(is, entries.data(), sizeof(uint64_t) * entries.size());
-              if (!is.good())
-                  throw std::runtime_error("error reading from std::istream");
-              return compact_theta_sketch_alloc(false, true, seed_hash, theta, std::move(entries));
-          }
-      } else {
-          throw std::invalid_argument(std::to_string(preamble_longs) + " longs of premable, but expected 1, 2, or 3");
-      }
+  std::vector<uint64_t, A> entries(num_entries, 0, allocator);
+  if (!is_empty) read(is, entries.data(), sizeof(uint64_t) * entries.size());
+  const bool is_ordered = flags_byte & (1 << flags::IS_ORDERED);
+  if (!is.good()) throw std::runtime_error("error reading from std::istream");
+  return compact_theta_sketch_alloc(is_empty, is_ordered, seed_hash, theta, std::move(entries));
+}
+
+template<typename A>
+compact_theta_sketch_alloc<A> compact_theta_sketch_alloc<A>::deserialize_v4(
+    uint8_t preamble_longs, std::istream& is, uint64_t seed, const A& allocator)
+{
+  const auto min_entry_zeros = read<uint8_t>(is);
+  const auto num_entries_bytes = read<uint8_t>(is);
+  const auto flags_byte = read<uint8_t>(is);
+  const auto seed_hash = read<uint16_t>(is);
+  const bool is_empty = flags_byte & (1 << flags::IS_EMPTY);
+  if (!is_empty) checker<true>::check_seed_hash(seed_hash, compute_seed_hash(seed));
+  uint64_t theta = theta_constants::MAX_THETA;
+  if (preamble_longs > 1) theta = read<uint64_t>(is);
+  uint32_t num_entries = 0;
+  for (unsigned i = 0; i < num_entries_bytes; ++i) {
+    num_entries |= read<uint8_t>(is) << (i << 3);
   }
-  default:
-      // this should always fail since the valid cases are handled above
-      checker<true>::check_serial_version(serial_version, SERIAL_VERSION);
-      // this throw is never reached, because check_serial_version will throw an informative exception.
-      // This is only here to avoid a compiler warning about a path without a return value.
-      throw std::invalid_argument("unexpected sketch serialization version");
+  const uint8_t entry_bits = 64 - min_entry_zeros;
+  vector_bytes buffer(entry_bits, 0, allocator); // block of 8 entries takes entry_bits bytes
+  std::vector<uint64_t, A> entries(num_entries, 0, allocator);
+  unsigned i;
+  for (i = 0; i + 7 < num_entries; i += 8) {
+    read(is, buffer.data(), buffer.size());
+    unpack_bits_block8(&entries[i], buffer.data(), entry_bits);
+  }
+  read(is, buffer.data(), whole_bytes_to_hold_bits((num_entries - i) * entry_bits));
+  if (!is.good()) throw std::runtime_error("error reading from std::istream");
+  const uint8_t* ptr = buffer.data();
+  uint8_t offset = 0;
+  for (; i < num_entries; ++i) {
+    offset = unpack_bits(entries[i], entry_bits, ptr, offset);
+  }
+  // undo deltas
+  uint64_t previous = 0;
+  for (i = 0; i < num_entries; ++i) {
+    entries[i] += previous;
+    previous = entries[i];
   }
+  const bool is_ordered = flags_byte & (1 << flags::IS_ORDERED);
+  return compact_theta_sketch_alloc(is_empty, is_ordered, seed_hash, theta, std::move(entries));
 }
 
 template<typename A>
diff --git a/theta/test/theta_sketch_test.cpp b/theta/test/theta_sketch_test.cpp
index f57e0e5..5ba2889 100644
--- a/theta/test/theta_sketch_test.cpp
+++ b/theta/test/theta_sketch_test.cpp
@@ -703,13 +703,39 @@ TEST_CASE("theta sketch: wrap compact v2 estimation from java", "[theta_sketch]"
   }
 }
 
-TEST_CASE("theta sketch: wrapped compressed", "[theta_sketch]") {
+TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") {
   auto update_sketch = update_theta_sketch::builder().build();
   for (int i = 0; i < 10000; i++) update_sketch.update(i);
   auto compact_sketch = update_sketch.compact();
+
   auto bytes = compact_sketch.serialize_compressed();
-  auto wrapped_compressed = wrapped_compact_theta_sketch::wrap(bytes.data(), bytes.size());
-  auto iter = wrapped_compressed.begin();
+  { // deserialize bytes
+    auto deserialized_sketch = compact_theta_sketch::deserialize(bytes.data(), bytes.size());
+    REQUIRE(deserialized_sketch.get_num_retained() == compact_sketch.get_num_retained());
+    REQUIRE(deserialized_sketch.get_theta() == compact_sketch.get_theta());
+    auto iter = deserialized_sketch.begin();
+    for (const auto key: compact_sketch) {
+      REQUIRE(*iter == key);
+      ++iter;
+    }
+  }
+  { // wrap bytes
+    auto wrapped_sketch = wrapped_compact_theta_sketch::wrap(bytes.data(), bytes.size());
+    REQUIRE(wrapped_sketch.get_num_retained() == compact_sketch.get_num_retained());
+    REQUIRE(wrapped_sketch.get_theta() == compact_sketch.get_theta());
+    auto iter = wrapped_sketch.begin();
+    for (const auto key: compact_sketch) {
+      REQUIRE(*iter == key);
+      ++iter;
+    }
+  }
+
+  std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+  compact_sketch.serialize_compressed(s);
+  auto deserialized_sketch = compact_theta_sketch::deserialize(s);
+  REQUIRE(deserialized_sketch.get_num_retained() == compact_sketch.get_num_retained());
+  REQUIRE(deserialized_sketch.get_theta() == compact_sketch.get_theta());
+  auto iter = deserialized_sketch.begin();
   for (const auto key: compact_sketch) {
     REQUIRE(*iter == key);
     ++iter;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org