You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by GitBox <gi...@apache.org> on 2020/02/18 00:56:49 UTC

[GitHub] [incubator-datasketches-cpp] jmalkin commented on a change in pull request #87: Varopt Sampling

jmalkin commented on a change in pull request #87: Varopt Sampling
URL: https://github.com/apache/incubator-datasketches-cpp/pull/87#discussion_r380410762
 
 

 ##########
 File path: sampling/include/var_opt_sketch_impl.hpp
 ##########
 @@ -0,0 +1,1591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _VAR_OPT_SKETCH_IMPL_HPP_
+#define _VAR_OPT_SKETCH_IMPL_HPP_
+
+#include "var_opt_sketch.hpp"
+#include "serde.hpp"
+#include "bounds_binomial_proportions.hpp"
+
+#include <memory>
+#include <sstream>
+#include <cmath>
+#include <random>
+#include <algorithm>
+
+namespace datasketches {
+
+/**
+ * Implementation code for the VarOpt sketch.
+ * 
+ * author Kevin Lang 
+ * author Jon Malkin
+ */
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf) :
+  var_opt_sketch<T,S,A>(k, rf, false) {}
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::var_opt_sketch(const var_opt_sketch& other) :
+  k_(other.k_),
+  h_(other.h_),
+  m_(other.m_),
+  r_(other.r_),
+  n_(other.n_),
+  total_wt_r_(other.total_wt_r_),
+  rf_(other.rf_),
+  curr_items_alloc_(other.curr_items_alloc_),
+  filled_data_(other.filled_data_),
+  data_(nullptr),
+  weights_(nullptr),
+  num_marks_in_h_(other.num_marks_in_h_),
+  marks_(nullptr)
+  {
+    data_ = A().allocate(curr_items_alloc_);
+    if (other.filled_data_) {
+      // copy everything
+      for (size_t i = 0; i < curr_items_alloc_; ++i)
+        A().construct(&data_[i], T(other.data_[i]));
+    } else {
+      // skip gap or anything unused at the end
+      for (size_t i = 0; i < h_; ++i)
+        A().construct(&data_[i], T(other.data_[i]));
+      for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
+        A().construct(&data_[i], T(other.data_[i]));
+    }
+    
+    weights_ = AllocDouble().allocate(curr_items_alloc_);
+    // doubles so can successfully copy regardless of the internal state
+    std::copy(&other.weights_[0], &other.weights_[curr_items_alloc_], weights_);
+    
+    if (other.marks_ != nullptr) {
+      marks_ = AllocBool().allocate(curr_items_alloc_);
+      std::copy(&other.marks_[0], &other.marks_[curr_items_alloc_], marks_);
+    }
+  }
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::var_opt_sketch(const var_opt_sketch& other, bool as_sketch, uint64_t adjusted_n) :
+  k_(other.k_),
+  h_(other.h_),
+  m_(other.m_),
+  r_(other.r_),
+  n_(adjusted_n),
+  total_wt_r_(other.total_wt_r_),
+  rf_(other.rf_),
+  curr_items_alloc_(other.curr_items_alloc_),
+  filled_data_(other.filled_data_),
+  data_(nullptr),
+  weights_(nullptr),
+  num_marks_in_h_(other.num_marks_in_h_),
+  marks_(nullptr)
+  {
+    data_ = A().allocate(curr_items_alloc_);
+    if (other.filled_data_) {
+      // copy everything
+      for (size_t i = 0; i < curr_items_alloc_; ++i)
+        A().construct(&data_[i], T(other.data_[i]));
+    } else {
+      // skip gap or anything unused at the end
+      for (size_t i = 0; i < h_; ++i)
+        A().construct(&data_[i], T(other.data_[i]));
+      for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i)
+        A().construct(&data_[i], T(other.data_[i]));
+    }
+
+    weights_ = AllocDouble().allocate(curr_items_alloc_);
+    // doubles so can successfully copy regardless of the internal state
+    std::copy(&other.weights_[0], &other.weights_[curr_items_alloc_], weights_);
+
+    if (!as_sketch && other.marks_ != nullptr) {
+      marks_ = AllocBool().allocate(curr_items_alloc_);
+      std::copy(&other.marks_[0], &other.marks_[curr_items_alloc_], marks_);
+    }
+  }
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::var_opt_sketch(T* data, double* weights, size_t len,
+                                      uint32_t k, uint64_t n, uint32_t h_count, uint32_t r_count, double total_wt_r) :
+  k_(k),
+  h_(h_count),
+  m_(0),
+  r_(r_count),
+  n_(n),
+  total_wt_r_(total_wt_r),
+  rf_(DEFAULT_RESIZE_FACTOR),
+  curr_items_alloc_(len),
+  filled_data_(n > k),
+  data_(data),
+  weights_(weights),
+  num_marks_in_h_(0),
+  marks_(nullptr)
+  {}
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::var_opt_sketch(var_opt_sketch&& other) noexcept :
+  k_(other.k_),
+  h_(other.h_),
+  m_(other.m_),
+  r_(other.r_),
+  n_(other.n_),
+  total_wt_r_(other.total_wt_r_),
+  rf_(other.rf_),
+  curr_items_alloc_(other.curr_items_alloc_),
+  filled_data_(other.filled_data_),
+  data_(other.data_),
+  weights_(other.weights_),
+  num_marks_in_h_(other.num_marks_in_h_),
+  marks_(other.marks_)
+  {
+    other.data_ = nullptr;
+    other.weights_ = nullptr;
+    other.marks_ = nullptr;
+  }
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget) :
+  k_(k), h_(0), m_(0), r_(0), n_(0), total_wt_r_(0.0), rf_(rf) {
+  if (k == 0 || k_ > MAX_K) {
+    throw std::invalid_argument("k must be at least 1 and less than 2^31 - 1");
+  }
+
+  uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k_));
+  int initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf_, MIN_LG_ARR_ITEMS);
+  curr_items_alloc_ = get_adjusted_size(k_, 1 << initial_lg_size);
+  if (curr_items_alloc_ == k_) { // if full size, need to leave 1 for the gap
+    ++curr_items_alloc_;
+  }
+
+  allocate_data_arrays(curr_items_alloc_, is_gadget);
+  num_marks_in_h_ = 0;
+}
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::~var_opt_sketch() {
+  if (data_ != nullptr) {
+    if (filled_data_) {
+      // destroy everything
+      for (size_t i = 0; i < curr_items_alloc_; ++i) {
+        A().destroy(data_ + i);      
+      }
+    } else {
+      // skip gap or anything unused at the end
+      for (size_t i = 0; i < h_; ++i) {
+        A().destroy(data_+ i);
+      }
+    
+      for (size_t i = h_ + 1; i < h_ + r_ + 1; ++i) {
+        A().destroy(data_ + i);
+      }
+    }
+    A().deallocate(data_, curr_items_alloc_);
+  }
+
+  if (weights_ != nullptr) {
+    AllocDouble().deallocate(weights_, curr_items_alloc_);
+  }
+  
+  if (marks_ != nullptr) {
+    AllocBool().deallocate(marks_, curr_items_alloc_);
+  }
+}
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>& var_opt_sketch<T,S,A>::operator=(const var_opt_sketch& other) {
+  var_opt_sketch<T,S,A> sk_copy(other);
+  std::swap(k_, sk_copy.k_);
+  std::swap(h_, sk_copy.h_);
+  std::swap(m_, sk_copy.m_);
+  std::swap(r_, sk_copy.r_);
+  std::swap(n_, sk_copy.n_);
+  std::swap(total_wt_r_, sk_copy.total_wt_r_);
+  std::swap(rf_, sk_copy.rf_);
+  std::swap(curr_items_alloc_, sk_copy.curr_items_alloc_);
+  std::swap(filled_data_, sk_copy.filled_data_);
+  std::swap(data_, sk_copy.data_);
+  std::swap(weights_, sk_copy.weights_);
+  std::swap(num_marks_in_h_, sk_copy.num_marks_in_h_);
+  std::swap(marks_, sk_copy.marks_);
+  return *this;
+}
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>& var_opt_sketch<T,S,A>::operator=(var_opt_sketch&& other) {
+  std::swap(k_, other.k_);
+  std::swap(h_, other.h_);
+  std::swap(m_, other.m_);
+  std::swap(r_, other.r_);
+  std::swap(n_, other.n_);
+  std::swap(total_wt_r_, other.total_wt_r_);
+  std::swap(rf_, other.rf_);
+  std::swap(curr_items_alloc_, other.curr_items_alloc_);
+  std::swap(filled_data_, other.filled_data_);
+  std::swap(data_, other.data_);
+  std::swap(weights_, other.weights_);
+  std::swap(num_marks_in_h_, other.num_marks_in_h_);
+  std::swap(marks_, other.marks_);
+  return *this;
+}
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget, uint8_t preamble_longs, std::istream& is) :
+  k_(k), m_(0), rf_(rf) {
+
+  // second and third prelongs
+  is.read((char*)&n_, sizeof(uint64_t));
+  is.read((char*)&h_, sizeof(uint32_t));
+  is.read((char*)&r_, sizeof(uint32_t));
+
+  validate_and_set_current_size(preamble_longs);
+
+  // current_items_alloc_ is set but validate R region weight (4th prelong), if needed, before allocating
+  if (preamble_longs == PREAMBLE_LONGS_FULL) { 
+    is.read((char*)&total_wt_r_, sizeof(total_wt_r_));
+    if (isnan(total_wt_r_) || r_ == 0 || total_wt_r_ <= 0.0) {
+      throw std::invalid_argument("Possible corruption: deserializing in full mode but r = 0 or invalid R weight. "
+       "Found r = " + std::to_string(r_) + ", R region weight = " + std::to_string(total_wt_r_));
+    }
+  } else {
+    total_wt_r_ = 0.0;
+  }
+
+  allocate_data_arrays(curr_items_alloc_, is_gadget);
+
+  // read the first h_ weights
+  is.read((char*)weights_, h_ * sizeof(double));
+  for (size_t i = 0; i < h_; ++i) {
+    if (!(weights_[i] > 0.0)) {
+      throw std::invalid_argument("Possible corruption: Non-positive weight when deserializing: " + std::to_string(weights_[i]));
+    }
+  }
+
+  std::fill(&weights_[h_], &weights_[curr_items_alloc_], -1.0);
+
+  // read the first h_ marks as packed bytes iff we have a gadget
+  num_marks_in_h_ = 0;
+  if (is_gadget) {
+    uint8_t val = 0;
+    for (int i = 0; i < h_; ++i) {
+      if ((i & 0x7) == 0x0) { // should trigger on first iteration
+        is.read((char*)&val, sizeof(val));
+      }
+      marks_[i] = ((val >> (i & 0x7)) & 0x1) == 1;
+      num_marks_in_h_ += (marks_[i] ? 1 : 0);
+    }
+  }
+
+  // read the sample items, skipping the gap. Either h_ or r_ may be 0
+  S().deserialize(is, data_, h_); // aka &data_[0]
+  S().deserialize(is, &data_[h_ + 1], r_);
+}
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget, uint8_t preamble_longs,
+                                      const void* bytes, size_t size) : k_(k), m_(0), rf_(rf) {
+  // private constructor so we assume not called if sketch is empty
+  const char* ptr = static_cast<const char*>(bytes) + sizeof(uint64_t);
+
+  // second and third prelongs
+  ptr += copy_from_mem(ptr, &n_, sizeof(n_));
+  ptr += copy_from_mem(ptr, &h_, sizeof(h_));
+  ptr += copy_from_mem(ptr, &r_, sizeof(r_));
+
+  validate_and_set_current_size(preamble_longs);
+  
+  // current_items_alloc_ is set but validate R region weight (4th prelong), if needed, before allocating
+  if (preamble_longs == PREAMBLE_LONGS_FULL) {
+    ptr += copy_from_mem(ptr, &total_wt_r_, sizeof(total_wt_r_));
+    if (isnan(total_wt_r_) || r_ == 0 || total_wt_r_ <= 0.0) {
+      throw std::invalid_argument("Possible corruption: deserializing in full mode but r = 0 or invalid R weight. "
+       "Found r = " + std::to_string(r_) + ", R region weight = " + std::to_string(total_wt_r_));
+    }
+  } else {
+    total_wt_r_ = 0.0;
+  }
+
+  allocate_data_arrays(curr_items_alloc_, is_gadget);
+
+  // read the first h_ weights, fill in rest of array with -1.0
+  ptr += copy_from_mem(ptr, weights_, h_ * sizeof(double));
+  for (size_t i = 0; i < h_; ++i) {
+    if (!(weights_[i] > 0.0)) {
+      throw std::invalid_argument("Possible corruption: Non-positive weight when deserializing: " + std::to_string(weights_[i]));
+    }
+  }
+  std::fill(&weights_[h_], &weights_[curr_items_alloc_], -1.0);
+  
+  // read the first h_ marks as packed bytes iff we have a gadget
+  num_marks_in_h_ = 0;
+  if (is_gadget) {
+    uint8_t val = 0;
+    for (int i = 0; i < h_; ++i) {
+     if ((i & 0x7) == 0x0) { // should trigger on first iteration
+        ptr += copy_from_mem(ptr, &val, sizeof(val));
+      }
+      marks_[i] = ((val >> (i & 0x7)) & 0x1) == 1;
+      num_marks_in_h_ += (marks_[i] ? 1 : 0);
+    }
+  }
+
+  // read the sample items, skipping the gap. Either h_ or r_ may be 0
+  ptr += S().deserialize(ptr, data_, h_); // ala data_[0]
+  ptr += S().deserialize(ptr, &data_[h_ + 1], r_);
+}
+
+/*
+ * An empty sketch requires 8 bytes.
+ *
+ * <pre>
+ * Long || Start Byte Adr:
+ * Adr:
+ *      ||       0        |    1   |    2   |    3   |    4   |    5   |    6   |    7   |
+ *  0   || Preamble_Longs | SerVer | FamID  |  Flags |---------Max Res. Size (K)---------|
+ * </pre>
+ *
+ * A non-empty sketch requires 24 bytes of preamble for an under-full sample; once there are
+ * at least k items the sketch uses 32 bytes of preamble.
+ *
+ * The count of items seen is limited to 48 bits (~256 trillion) even though there are adjacent
+ * unused preamble bits. The acceptance probability for an item is a double in the range [0,1),
+ * limiting us to 53 bits of randomness due to details of the IEEE floating point format. To
+ * ensure meaningful probabilities as the items seen count approaches capacity, we intentionally
+ * use slightly fewer bits.
+ * 
+ * Following the header are weights for the heavy items, then marks in the event this is a gadget.
+ * The serialized items come last.
+ * 
+ * <pre>
+ * Long || Start Byte Adr:
+ * Adr:
+ *      ||       0        |    1   |    2   |    3   |    4   |    5   |    6   |    7   |
+ *  0   || Preamble_Longs | SerVer | FamID  |  Flags |---------Max Res. Size (K)---------|
+ *
+ *      ||       8        |    9   |   10   |   11   |   12   |   13   |   14   |   15   |
+ *  1   ||---------------------------Items Seen Count (N)--------------------------------|
+ *
+ *      ||      16        |   17   |   18   |   19   |   20   |   21   |   22   |   23   |
+ *  2   ||-------------Item Count in H---------------|-------Item Count in R-------------|
+ *
+ *      ||      24        |   25   |   26   |   27   |   28   |   29   |   30   |   31   |
+ *  3   ||-------------------------------Total Weight in R-------------------------------|
+ * </pre>
+ */
+
+// implementation for fixed-size arithmetic types (integral and floating point)
+template<typename T, typename S, typename A>
+template<typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
+size_t var_opt_sketch<T,S,A>::get_serialized_size_bytes() const {
+  if (is_empty()) { return PREAMBLE_LONGS_EMPTY << 3; }
+  size_t num_bytes = (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL) << 3;
+  num_bytes += h_ * sizeof(double);    // weights
+  if (marks_ != nullptr) {             // marks
+    num_bytes += (h_ / 8) + (h_ % 8 > 0);
+  }
+  num_bytes += (h_ + r_) * sizeof(TT); // the actual items
+  return num_bytes;
+}
+
+// implementation for all other types
+template<typename T, typename S, typename A>
+template<typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
+size_t var_opt_sketch<T,S,A>::get_serialized_size_bytes() const {
+  if (is_empty()) { return PREAMBLE_LONGS_EMPTY << 3; }
+  size_t num_bytes = (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL) << 3;
+  num_bytes += h_ * sizeof(double);    // weights
+  if (marks_ != nullptr) {             // marks
+    num_bytes += (h_ / 8) + (h_ % 8 > 0);
+  }
+  // must iterate over the items
+  for (auto& it: *this)
+    num_bytes += S().size_of_item(it.first);
+  return num_bytes;
+}
+
+template<typename T, typename S, typename A>
+std::vector<uint8_t, AllocU8<A>> var_opt_sketch<T,S,A>::serialize(unsigned header_size_bytes) const {
+  const size_t size = header_size_bytes + get_serialized_size_bytes();
+  std::vector<uint8_t, AllocU8<A>> bytes(size);
+  uint8_t* ptr = bytes.data() + header_size_bytes;
+
+  bool empty = is_empty();
+  uint8_t preLongs = (empty ? PREAMBLE_LONGS_EMPTY
+                                  : (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL));
+  uint8_t first_byte = (preLongs & 0x3F) | ((static_cast<uint8_t>(rf_)) << 6);
+  uint8_t flags = (marks_ != nullptr ? GADGET_FLAG_MASK : 0);
+
+  if (empty) {
+    flags |= EMPTY_FLAG_MASK;
+  }
+
+  // first prelong
+  uint8_t ser_ver(SER_VER);
+  uint8_t family(FAMILY_ID);
+  ptr += copy_to_mem(&first_byte, ptr, sizeof(uint8_t));
+  ptr += copy_to_mem(&ser_ver, ptr, sizeof(uint8_t));
+  ptr += copy_to_mem(&family, ptr, sizeof(uint8_t));
+  ptr += copy_to_mem(&flags, ptr, sizeof(uint8_t));
+  ptr += copy_to_mem(&k_, ptr, sizeof(uint32_t));
+
+  if (!empty) {
+    // second and third prelongs
+    ptr += copy_to_mem(&n_, ptr, sizeof(uint64_t));
+    ptr += copy_to_mem(&h_, ptr, sizeof(uint32_t));
+    ptr += copy_to_mem(&r_, ptr, sizeof(uint32_t));
+
+    // fourth prelong, if needed
+    if (r_ > 0) {
+      ptr += copy_to_mem(&total_wt_r_, ptr, sizeof(double));
+    }
+
+    // first h_ weights
+    ptr += copy_to_mem(weights_, ptr, h_ * sizeof(double));
+
+    // first h_ marks as packed bytes iff we have a gadget
+    if (marks_ != nullptr) {
+      uint8_t val = 0;
+      for (int i = 0; i < h_; ++i) {
+        if (marks_[i]) {
+          val |= 0x1 << (i & 0x7);
+        }
+
+        if ((i & 0x7) == 0x7) {
+          ptr += copy_to_mem(&val, ptr, sizeof(uint8_t));
+          val = 0;
+        }
+      }
+
+      // write out any remaining values
+      if ((h_ & 0x7) > 0) {
+        ptr += copy_to_mem(&val, ptr, sizeof(uint8_t));
+      }
+    }
+
+    // write the sample items, skipping the gap. Either h_ or r_ may be 0
+    ptr += S().serialize(ptr, data_, h_);
+    ptr += S().serialize(ptr, &data_[h_ + 1], r_);
+  }
+  
+  size_t bytes_written = ptr - bytes.data();
+  if (bytes_written != size) {
+    throw std::logic_error("serialized size mismatch: " + std::to_string(bytes_written) + " != " + std::to_string(size));
+  }
+
+  return bytes;
+}
+
+template<typename T, typename S, typename A>
+void var_opt_sketch<T,S,A>::serialize(std::ostream& os) const {
+  bool empty = (h_ == 0) && (r_ == 0);
+
+  uint8_t preLongs = (empty ? PREAMBLE_LONGS_EMPTY
+                                  : (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL));
+  uint8_t first_byte = (preLongs & 0x3F) | ((static_cast<uint8_t>(rf_)) << 6);
+  uint8_t flags = (marks_ != nullptr ? GADGET_FLAG_MASK : 0);
+
+  if (empty) {
+    flags |= EMPTY_FLAG_MASK;
+  }
+
+  // first prelong
+  uint8_t ser_ver(SER_VER);
+  uint8_t family(FAMILY_ID);
+  os.write((char*)&first_byte, sizeof(uint8_t));
+  os.write((char*)&ser_ver, sizeof(uint8_t));
+  os.write((char*)&family, sizeof(uint8_t));
+  os.write((char*)&flags, sizeof(uint8_t));
+  os.write((char*)&k_, sizeof(uint32_t));
+
+  if (!empty) {
+    // second and third prelongs
+    os.write((char*)&n_, sizeof(uint64_t));
+    os.write((char*)&h_, sizeof(uint32_t));
+    os.write((char*)&r_, sizeof(uint32_t));
+    
+    // fourth prelong, if needed
+    if (r_ > 0) {
+      os.write((char*)&total_wt_r_, sizeof(double));
+    }
+
+    // write the first h_ weights
+    os.write((char*)weights_, h_ * sizeof(double));
+
+    // write the first h_ marks as packed bytes iff we have a gadget
+    if (marks_ != nullptr) {
+      uint8_t val = 0;
+      for (int i = 0; i < h_; ++i) {
+        if (marks_[i]) {
+          val |= 0x1 << (i & 0x7);
+        }
+
+        if ((i & 0x7) == 0x7) {
+          os.write((char*)&val, sizeof(uint8_t));
+          val = 0;
+        }
+      }
+
+      // write out any remaining values
+      if ((h_ & 0x7) > 0) {
+        os.write((char*)&val, sizeof(uint8_t));
+      }
+    }
+
+    // write the sample items, skipping the gap. Either h_ or r_ may be 0
+    S().serialize(os, data_, h_);
+    S().serialize(os, &data_[h_ + 1], r_);
+  }
+}
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A> var_opt_sketch<T,S,A>::deserialize(const void* bytes, size_t size) {
+  const char* ptr = static_cast<const char*>(bytes);
+  uint8_t first_byte;
+  ptr += copy_from_mem(ptr, &first_byte, sizeof(first_byte));
+  uint8_t preamble_longs = first_byte & 0x3f;
+  resize_factor rf = static_cast<resize_factor>((first_byte >> 6) & 0x03);
+  uint8_t serial_version;
+  ptr += copy_from_mem(ptr, &serial_version, sizeof(serial_version));
+  uint8_t family_id;
+  ptr += copy_from_mem(ptr, &family_id, sizeof(family_id));
+  uint8_t flags;
+  ptr += copy_from_mem(ptr, &flags, sizeof(flags));
+  uint32_t k;
+  ptr += copy_from_mem(ptr, &k, sizeof(k));
+
+  check_preamble_longs(preamble_longs, flags);
+  check_family_and_serialization_version(family_id, serial_version);
+
+  bool is_empty = flags & EMPTY_FLAG_MASK;
+  bool is_gadget = flags & GADGET_FLAG_MASK;
+
+  return is_empty ? var_opt_sketch<T,S,A>(k, rf, is_gadget) : var_opt_sketch<T,S,A>(k, rf, is_gadget, preamble_longs, bytes, size);
+}
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A> var_opt_sketch<T,S,A>::deserialize(std::istream& is) {
+  uint8_t first_byte;
+  is.read((char*)&first_byte, sizeof(first_byte));
+  uint8_t preamble_longs = first_byte & 0x3f;
+  resize_factor rf = static_cast<resize_factor>((first_byte >> 6) & 0x03);
+  uint8_t serial_version;
+  is.read((char*)&serial_version, sizeof(serial_version));
+  uint8_t family_id;
+  is.read((char*)&family_id, sizeof(family_id));
+  uint8_t flags;
+  is.read((char*)&flags, sizeof(flags));
+  uint32_t k;
+  is.read((char*)&k, sizeof(k));
+
+  check_preamble_longs(preamble_longs, flags);
+  check_family_and_serialization_version(family_id, serial_version);
+
+  bool is_empty = flags & EMPTY_FLAG_MASK;
+  bool is_gadget = flags & GADGET_FLAG_MASK;
+
+  return is_empty ? var_opt_sketch<T,S,A>(k, rf, is_gadget) : var_opt_sketch<T,S,A>(k, rf, is_gadget, preamble_longs, is);
+}
+
+template<typename T, typename S, typename A>
+bool var_opt_sketch<T,S,A>::is_empty() const {
+  return (h_ == 0 && r_ == 0);
+}
+
+template<typename T, typename S, typename A>
+void var_opt_sketch<T,S,A>::reset() {
+  uint32_t prev_alloc = curr_items_alloc_;
+
+  uint32_t ceiling_lg_k = to_log_2(ceiling_power_of_2(k_));
+  int initial_lg_size = starting_sub_multiple(ceiling_lg_k, rf_, MIN_LG_ARR_ITEMS);
+  curr_items_alloc_ = get_adjusted_size(k_, 1 << initial_lg_size);
+  if (curr_items_alloc_ == k_) { // if full size, need to leave 1 for the gap
+    ++curr_items_alloc_;
+  }
+  
+  if (filled_data_) {
+    // destroy everything
+    for (size_t i = 0; i < curr_items_alloc_; ++i) 
+      A().destroy(data_ + i);      
+  } else {
+    // skip gap or anything unused at the end
+    for (size_t i = 0; i < h_; ++i)
+      A().destroy(data_+ i);
+    
+    for (size_t i = h_ + 1; i < curr_items_alloc_; ++i)
+      A().destroy(data_ + i);
+  }
+
+  if (curr_items_alloc_ < prev_alloc) {
+    bool is_gadget = (marks_ != nullptr);
+  
+    A().deallocate(data_, curr_items_alloc_);
+    AllocDouble().deallocate(weights_, curr_items_alloc_);
+  
+    if (marks_ != nullptr)
+      AllocBool().deallocate(marks_, curr_items_alloc_);
+
+    allocate_data_arrays(curr_items_alloc_, is_gadget);
+  }
+  
+  n_ = 0;
+  h_ = 0;
+  m_ = 0;
+  r_ = 0;
+  num_marks_in_h_ = 0;
+  total_wt_r_ = 0.0;
+  filled_data_ = false;
+}
+
+template<typename T, typename S, typename A>
+uint64_t var_opt_sketch<T,S,A>::get_n() const {
+  return n_;
+}
+
+template<typename T, typename S, typename A>
+uint32_t var_opt_sketch<T,S,A>::get_k() const {
+  return k_;
+}
+
+template<typename T, typename S, typename A>
+uint32_t var_opt_sketch<T,S,A>::get_num_samples() const {
+  uint32_t num_in_sketch = h_ + r_;
+  return (num_in_sketch < k_ ? num_in_sketch : k_);
+}
+
+template<typename T, typename S, typename A>
+void var_opt_sketch<T,S,A>::update(const T& item, double weight) {
+  update(item, weight, false);
+}
+
+/*
+template<typename T, typename S, typename A>
+void var_opt_sketch<T,S,A>::update(T&& item, double weight) {
+}
+*/
+
+template<typename T, typename S, typename A>
+std::ostream& var_opt_sketch<T,S,A>::to_stream(std::ostream& os) const {
+  os << "### VarOpt SUMMARY: " << std::endl;
+  os << "   k            : " << k_ << std::endl;
+  os << "   h            : " << h_ << std::endl;
+  os << "   r            : " << r_ << std::endl;
+  os << "   weight_r     : " << total_wt_r_ << std::endl;
+  os << "   Current size : " << curr_items_alloc_ << std::endl;
+  os << "   Resize factor: " << (1 << rf_) << std::endl;
+  os << "### END SKETCH SUMMARY" << std::endl;
+
+  return os;
+}
+
+template<typename T, typename S, typename A>
+std::ostream& var_opt_sketch<T,S,A>::items_to_stream(std::ostream& os) const {
+  os << "### Sketch Items" << std::endl;
+
+  uint32_t print_length = (n_ < k_ ? n_ : k_ + 1);
+  for (int i = 0; i < print_length; ++i) {
+    if (i == h_) {
+      os << i << ": GAP" << std::endl;
+    } else {
+      os << i << ": " << data_[i] << "\twt = " << weights_[i] << std::endl;
+    }
+  }
+
+  return os;
+}
+
+template <typename T, typename S, typename A>
+std::string var_opt_sketch<T,S,A>::to_string() const {
+  std::ostringstream ss;
+  to_stream(ss);
+  return ss.str();
+}
+
+template <typename T, typename S, typename A>
+std::string var_opt_sketch<T,S,A>::items_to_string() const {
+  std::ostringstream ss;
+  items_to_stream(ss);
+  return ss.str();
+}
+
+template<typename T, typename S, typename A>
+void var_opt_sketch<T,S,A>::update(const T& item, double weight, bool mark) {
+  if (weight <= 0.0) { 
+    throw std::invalid_argument("Item weights must be strictly positive. Found: "
+                                + std::to_string(weight));
+  }
+  ++n_;
+
+  if (r_ == 0) {
+    // exact mode
+    update_warmup_phase(item, weight, mark);
+  } else {
+    // sketch is in estimation mode so we can make the following check
+    assert(h_ == 0 || (peek_min() >= get_tau()));
+
+    // what tau would be if deletion candidates turn out to be R plus the new item
+    // note: (r_ + 1) - 1 is intentional
+    double hypothetical_tau = (weight + total_wt_r_) / ((r_ + 1) - 1);
+
+    // is new item's turn to be considered for reservoir?
+    double condition1 = (h_ == 0) || (weight <= peek_min());
+
+    // is new item light enough for reservoir?
+    double condition2 = weight < hypothetical_tau;
+  
+    if (condition1 && condition2) {
+      update_light(item, weight, mark);
+    } else if (r_ == 1) {
+      update_heavy_r_eq1(item, weight, mark);
+    } else {
+      update_heavy_general(item, weight, mark);
+    }
+  }
+}
+
+template<typename T, typename S, typename A>
+void var_opt_sketch<T,S,A>::update_warmup_phase(const T& item, double weight, bool mark) {
+  assert(r_ == 0);
+  assert(m_ == 0);
+  assert(h_ <= k_);
+
+  if (h_ >= curr_items_alloc_) {
+    grow_data_arrays();
+  }
+
+  // store items as they come in until full
+  A().construct(&data_[h_], T(item));
+  weights_[h_] = weight;
+  if (marks_ != nullptr) {
+    marks_[h_] = mark;
+  }
+  ++h_;
+  num_marks_in_h_ += mark ? 1 : 0;
+
+  // check if need to heapify
+  if (h_ > k_) {
+    transition_from_warmup();
+  }
+}
+
+/* In the "light" case the new item has weight <= old_tau, so
+   would appear to the right of the R items in a hypothetical reverse-sorted
+   list. It is easy to prove that it is light enough to be part of this
+   round's downsampling */
+template<typename T, typename S, typename A>
+void var_opt_sketch<T,S,A>::update_light(const T& item, double weight, bool mark) {
+  assert(r_ >= 1);
 
 Review comment:
   from (admittedly limited) conversations with people elsewhere, i suspect we're likely going to need to remove exceptions to make the library be considered more production-worthy

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org