You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by jm...@apache.org on 2020/01/27 22:36:58 UTC

[incubator-datasketches-cpp] branch sampling updated (e788651 -> 1ca5bad)

This is an automated email from the ASF dual-hosted git repository.

jmalkin pushed a change to branch sampling
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git.


    from e788651  [WIP, incomplete with a bunch of debug code] checkpointing varopt with basic sketch framework mostly coded. Lacks querying, unit tests, and unioning so lots left to do
     add 5afcdfd  added vector_bytes type alias for convenience
     add 8193d2a  added vector_bytes type alias for convenience
     add f491e69  Merge pull request #79 from apache/vector_alias
     add 5f1e947  small fix in debug print
     add 4b62cfc  moved implementation to separate files
     add 6e5e9a8  fixed some inconsistencies in creation, destruction and copying
     add 3d65f4b  fixed copy
     add 7daba8e  Merge pull request #80 from apache/kll_separated_impl
     new c459d83  Merge branch 'sampling' of https://github.com/apache/incubator-datasketches-cpp into sampling
     new 1ca5bad  [WIP] var opt union feature complete, very minimal testing

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 common/include/bounds_binomial_proportions.hpp     | 291 +++++++
 common/include/serde.hpp                           |   4 +-
 cpc/include/cpc_sketch.hpp                         |   3 +-
 fi/include/frequent_items_sketch.hpp               |   3 +-
 hll/include/HllSketch-internal.hpp                 |   2 +-
 hll/include/hll.hpp                                |  12 +-
 kll/CMakeLists.txt                                 |  11 +-
 kll/include/kll_helper.hpp                         | 267 +-----
 kll/include/kll_helper_impl.hpp                    | 319 +++++++
 kll/include/kll_quantile_calculator.hpp            | 163 +---
 kll/include/kll_quantile_calculator_impl.hpp       | 191 +++++
 kll/include/kll_sketch.hpp                         | 954 +--------------------
 .../{kll_sketch.hpp => kll_sketch_impl.hpp}        | 369 +-------
 kll/test/kll_sketch_validation.cpp                 |   6 +-
 sampling/include/var_opt_sketch.hpp                | 104 ++-
 sampling/include/var_opt_sketch_impl.hpp           | 420 +++++++--
 sampling/include/var_opt_union.hpp                 | 156 ++++
 sampling/include/var_opt_union_impl.hpp            | 366 ++++++++
 sampling/test/counting_allocator.hpp               | 109 ---
 sampling/test/var_opt_sketch_test.cpp              | 115 +--
 theta/include/theta_sketch.hpp                     |   9 +-
 21 files changed, 1895 insertions(+), 1979 deletions(-)
 create mode 100644 common/include/bounds_binomial_proportions.hpp
 create mode 100644 kll/include/kll_helper_impl.hpp
 create mode 100644 kll/include/kll_quantile_calculator_impl.hpp
 copy kll/include/{kll_sketch.hpp => kll_sketch_impl.hpp} (69%)
 create mode 100644 sampling/include/var_opt_union.hpp
 create mode 100644 sampling/include/var_opt_union_impl.hpp
 delete mode 100644 sampling/test/counting_allocator.hpp


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org


[incubator-datasketches-cpp] 01/02: Merge branch 'sampling' of https://github.com/apache/incubator-datasketches-cpp into sampling

Posted by jm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jmalkin pushed a commit to branch sampling
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git

commit c459d833a133b8743e0c8f0302aa1da87f10b614
Merge: 7daba8e e788651
Author: Jon Malkin <jm...@users.noreply.github.com>
AuthorDate: Mon Jan 27 14:32:34 2020 -0800

    Merge branch 'sampling' of https://github.com/apache/incubator-datasketches-cpp into sampling

 CMakeLists.txt                           |    3 +-
 NOTICE                                   |    4 +
 sampling/CMakeLists.txt                  |   50 ++
 sampling/include/var_opt_sketch.hpp      |  204 +++++
 sampling/include/var_opt_sketch_impl.hpp | 1252 ++++++++++++++++++++++++++++++
 sampling/test/CMakeLists.txt             |   46 ++
 sampling/test/counting_allocator.hpp     |  109 +++
 sampling/test/var_opt_sketch_test.cpp    |  204 +++++
 8 files changed, 1871 insertions(+), 1 deletion(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org


[incubator-datasketches-cpp] 02/02: [WIP] var opt union feature complete, very minimal testing

Posted by jm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jmalkin pushed a commit to branch sampling
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git

commit 1ca5bad43704d0acbf70f21801e9ec86d756fe71
Author: Jon Malkin <jm...@users.noreply.github.com>
AuthorDate: Mon Jan 27 14:35:46 2020 -0800

    [WIP] var opt union feature complete, very minimal testing
---
 common/include/bounds_binomial_proportions.hpp | 291 +++++++++++++++++
 sampling/include/var_opt_sketch.hpp            | 104 ++++--
 sampling/include/var_opt_sketch_impl.hpp       | 420 +++++++++++++++++++++----
 sampling/include/var_opt_union.hpp             | 156 +++++++++
 sampling/include/var_opt_union_impl.hpp        | 366 +++++++++++++++++++++
 sampling/test/counting_allocator.hpp           | 109 -------
 sampling/test/var_opt_sketch_test.cpp          | 115 ++-----
 7 files changed, 1286 insertions(+), 275 deletions(-)

diff --git a/common/include/bounds_binomial_proportions.hpp b/common/include/bounds_binomial_proportions.hpp
new file mode 100644
index 0000000..8b00cbd
--- /dev/null
+++ b/common/include/bounds_binomial_proportions.hpp
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _BOUNDS_BINOMIAL_PROPORTIONS_HPP_
+#define _BOUNDS_BINOMIAL_PROPORTIONS_HPP_
+
+#include <cmath>
+#include <stdexcept>
+
+namespace datasketches {
+
+/**
+ * Confidence intervals for binomial proportions.
+ *
+ * <p>This class computes an approximation to the Clopper-Pearson confidence interval
+ * for a binomial proportion. Exact Clopper-Pearson intervals are strictly
+ * conservative, but these approximations are not.</p>
+ *
+ * <p>The main inputs are numbers <i>n</i> and <i>k</i>, which are not the same as other things
+ * that are called <i>n</i> and <i>k</i> in our sketching library. There is also a third
+ * parameter, numStdDev, that specifies the desired confidence level.</p>
+ * <ul>
+ * <li><i>n</i> is the number of independent randomized trials. It is given and therefore known.
+ * </li>
+ * <li><i>p</i> is the probability of a trial being a success. It is unknown.</li>
+ * <li><i>k</i> is the number of trials (out of <i>n</i>) that turn out to be successes. It is
+ * a random variable governed by a binomial distribution. After any given
+ * batch of <i>n</i> independent trials, the random variable <i>k</i> has a specific
+ * value which is observed and is therefore known.</li>
+ * <li><i>pHat</i> = <i>k</i> / <i>n</i> is an unbiased estimate of the unknown success
+ * probability <i>p</i>.</li>
+ * </ul>
+ *
+ * <p>Alternatively, consider a coin with unknown heads probability <i>p</i>. Where
+ * <i>n</i> is the number of independent flips of that coin, and <i>k</i> is the number
+ * of times that the coin comes up heads during a given batch of <i>n</i> flips.
+ * This class computes a frequentist confidence interval [lowerBoundOnP, upperBoundOnP] for the
+ * unknown <i>p</i>.</p>
+ *
+ * <p>Conceptually, the desired confidence level is specified by a tail probability delta.</p>
+ *
+ * <p>Ideally, over a large ensemble of independent batches of trials,
+ * the fraction of batches in which the true <i>p</i> lies below lowerBoundOnP would be at most
+ * delta, and the fraction of batches in which the true <i>p</i> lies above upperBoundOnP
+ * would also be at most delta.
+ *
+ * <p>Setting aside the philosophical difficulties attaching to that statement, it isn't quite
+ * true because we are approximating the Clopper-Pearson interval.</p>
+ *
+ * <p>Finally, we point out that in this class's interface, the confidence parameter delta is
+ * not specified directly, but rather through a "number of standard deviations" numStdDev.
+ * The library effectively converts that to a delta via delta = normalCDF (-1.0 * numStdDev).</p>
+ *
+ * <p>It is perhaps worth emphasizing that the library is NOT merely adding and subtracting
+ * numStdDev standard deviations to the estimate. It is doing something better, that to some
+ * extent accounts for the fact that the binomial distribution has a non-gaussian shape.</p>
+ *
+ * <p>In particular, it is using an approximation to the inverse of the incomplete beta function
+ * that appears as formula 26.5.22 on page 945 of the "Handbook of Mathematical Functions"
+ * by Abramowitz and Stegun.</p>
+ *
+ * @author Kevin Lang
+ * @author Jon Malkin
+ */
+class bounds_binomial_proportions { // confidence intervals for binomial proportions
+
+public:
+  /**
+   * Computes lower bound of approximate Clopper-Pearson confidence interval for a binomial
+   * proportion.
+   *
+   * <p>Implementation Notes:<br>
+   * The approximateLowerBoundOnP is defined with respect to the right tail of the binomial
+   * distribution.</p>
+   * <ul>
+   * <li>We want to solve for the <i>p</i> for which sum<sub><i>j,k,n</i></sub>bino(<i>j;n,p</i>)
+   * = delta.</li>
+   * <li>We now restate that in terms of the left tail.</li>
+   * <li>We want to solve for the p for which sum<sub><i>j,0,(k-1)</i></sub>bino(<i>j;n,p</i>)
+   * = 1 - delta.</li>
+   * <li>Define <i>x</i> = 1-<i>p</i>.</li>
+   * <li>We want to solve for the <i>x</i> for which I<sub><i>x(n-k+1,k)</i></sub> = 1 - delta.</li>
+   * <li>We specify 1-delta via numStdDevs through the right tail of the standard normal
+   * distribution.</li>
+   * <li>Smaller values of numStdDevs correspond to bigger values of 1-delta and hence to smaller
+   * values of delta. In fact, usefully small values of delta correspond to negative values of
+   * numStdDevs.</li>
+   * <li>return <i>p</i> = 1-<i>x</i>.</li>
+   * </ul>
+   *
+   * @param n is the number of trials. Must be non-negative.
+   * @param k is the number of successes. Must be non-negative, and cannot exceed n.
+   * @param num_std_devs the number of standard deviations defining the confidence interval
+   * @return the lower bound of the approximate Clopper-Pearson confidence interval for the
+   * unknown success probability.
+   */
+  static double approximate_lower_bound_on_p(long n, long k, double num_std_devs) {
+    check_inputs(n, k);
+    if (n == 0) { return 0.0; } // the coin was never flipped, so we know nothing
+    else if (k == 0) { return 0.0; }
+    else if (k == 1) { return (exact_lower_bound_on_p_k_eq_1(n, delta_of_num_stdevs(num_std_devs))); }
+    else if (k == n) { return (exact_lower_bound_on_p_k_eq_n(n, delta_of_num_stdevs(num_std_devs))); }
+    else {
+      double x = abramowitz_stegun_formula_26p5p22((n - k) + 1, k, (-1.0 * num_std_devs));
+      return (1.0 - x); // which is p
+    }
+  }
+
+  /**
+   * Computes upper bound of approximate Clopper-Pearson confidence interval for a binomial
+   * proportion.
+   *
+   * <p>Implementation Notes:<br>
+   * The approximateUpperBoundOnP is defined with respect to the left tail of the binomial
+   * distribution.</p>
+   * <ul>
+   * <li>We want to solve for the <i>p</i> for which sum<sub><i>j,0,k</i></sub>bino(<i>j;n,p</i>)
+   * = delta.</li>
+   * <li>Define <i>x</i> = 1-<i>p</i>.</li>
+   * <li>We want to solve for the <i>x</i> for which I<sub><i>x(n-k,k+1)</i></sub> = delta.</li>
+   * <li>We specify delta via numStdDevs through the right tail of the standard normal
+   * distribution.</li>
+   * <li>Bigger values of numStdDevs correspond to smaller values of delta.</li>
+   * <li>return <i>p</i> = 1-<i>x</i>.</li>
+   * </ul>
+   * @param n is the number of trials. Must be non-negative.
+   * @param k is the number of successes. Must be non-negative, and cannot exceed <i>n</i>.
+   * @param num_std_devs the number of standard deviations defining the confidence interval
+   * @return the upper bound of the approximate Clopper-Pearson confidence interval for the
+   * unknown success probability.
+   */
+  static double approximate_upper_bound_on_p(long n, long k, double num_std_devs) {
+    check_inputs(n, k);
+    if (n == 0) { return 1.0; } // the coin was never flipped, so we know nothing
+    else if (k == n) { return 1.0; }
+    else if (k == (n - 1)) {
+      return (exactU_upper_bound_on_p_k_eq_minusone(n, delta_of_num_stdevs(num_std_devs)));
+    }
+    else if (k == 0) {
+      return (exact_upper_bound_on_p_k_eq_zero(n, delta_of_num_stdevs(num_std_devs)));
+    }
+    else {
+      double x = abramowitz_stegun_formula_26p5p22(n - k, k + 1, num_std_devs);
+      return (1.0 - x); // which is p
+    }
+  }
+
+  /**
+   * Computes an estimate of an unknown binomial proportion.
+   * @param n is the number of trials. Must be non-negative.
+   * @param k is the number of successes. Must be non-negative, and cannot exceed n.
+   * @return the estimate of the unknown binomial proportion.
+   */
+  static double estimate_unknown_p(long n, long k) {
+    check_inputs(n, k);
+    if (n == 0) { return 0.5; } // the coin was never flipped, so we know nothing
+    else { return ((double) k / (double) n); }
+  }
+
+  /**
+   * Computes an approximation to the erf() function.
+   * @param x is the input to the erf function
+   * @return returns erf(x), accurate to roughly 7 decimal digits.
+   */
+  static double erf(double x) {
+    if (x < 0.0) { return (-1.0 * (erf_of_nonneg(-1.0 * x))); }
+    else { return (erf_of_nonneg(x)); }
+  }
+
+  /**
+   * Computes an approximation to normal_cdf(x).
+   * @param x is the input to the normal_cdf function
+   * @return returns the approximation to normalCDF(x).
+   */
+   static double normal_cdf(double x) {
+    return (0.5 * (1.0 + (erf(x / (sqrt(2.0))))));
+  }
+
+private:
+  static void check_inputs(long n, long k) {
+    if (n < 0) { throw std::invalid_argument("N must be non-negative"); }
+    if (k < 0) { throw std::invalid_argument("K must be non-negative"); }
+    if (k > n) { throw std::invalid_argument("K cannot exceed N"); }
+  }
+
+  //@formatter:off
+  // Abramowitz and Stegun formula 7.1.28, p. 88; Claims accuracy of about 7 decimal digits */
+  static double erf_of_nonneg(double x) {
+    // The constants that appear below, formatted for easy checking against the book.
+    //    a1 = 0.07052 30784
+    //    a3 = 0.00927 05272
+    //    a5 = 0.00027 65672
+    //    a2 = 0.04228 20123
+    //    a4 = 0.00015 20143
+    //    a6 = 0.00004 30638
+    static const double a1 = 0.0705230784;
+    static const double a3 = 0.0092705272;
+    static const double a5 = 0.0002765672;
+    static const double a2 = 0.0422820123;
+    static const double a4 = 0.0001520143;
+    static const double a6 = 0.0000430638;
+    const double x2 = x * x; // x squared, x cubed, etc.
+    const double x3 = x2 * x;
+    const double x4 = x2 * x2;
+    const double x5 = x2 * x3;
+    const double x6 = x3 * x3;
+    const double sum = ( 1.0
+                 + (a1 * x)
+                 + (a2 * x2)
+                 + (a3 * x3)
+                 + (a4 * x4)
+                 + (a5 * x5)
+                 + (a6 * x6) );
+    const double sum2 = sum * sum; // raise the sum to the 16th power
+    const double sum4 = sum2 * sum2;
+    const double sum8 = sum4 * sum4;
+    const double sum16 = sum8 * sum8;
+    return (1.0 - (1.0 / sum16));
+  }
+
+  static double delta_of_num_stdevs(double kappa) {
+    return (normal_cdf(-1.0 * kappa));
+  }
+
+  //@formatter:on
+  // Formula 26.5.22 on page 945 of Abramowitz & Stegun, which is an approximation
+  // of the inverse of the incomplete beta function I_x(a,b) = delta
+  // viewed as a scalar function of x.
+  // In other words, we specify delta, and it gives us x (with a and b held constant).
+  // However, delta is specified in an indirect way through yp which
+  // is the number of stdDevs that leaves delta probability in the right
+  // tail of a standard gaussian distribution.
+
+  // We point out that the variable names correspond to those in the book,
+  // and it is worth keeping it that way so that it will always be easy to verify
+  // that the formula was typed in correctly.
+
+  static double abramowitz_stegun_formula_26p5p22(double a, double b,
+      double yp) {
+    const double b2m1 = (2.0 * b) - 1.0;
+    const double a2m1 = (2.0 * a) - 1.0;
+    const double lambda = ((yp * yp) - 3.0) / 6.0;
+    const double htmp = (1.0 / a2m1) + (1.0 / b2m1);
+    const double h = 2.0 / htmp;
+    const double term1 = (yp * (sqrt(h + lambda))) / h;
+    const double term2 = (1.0 / b2m1) - (1.0 / a2m1);
+    const double term3 = (lambda + (5.0 / 6.0)) - (2.0 / (3.0 * h));
+    const double w = term1 - (term2 * term3);
+    const double xp = a / (a + (b * (exp(2.0 * w))));
+    return xp;
+  }
+
+  // Formulas for some special cases.
+
+  static double exact_upper_bound_on_p_k_eq_zero(double n, double delta) {
+    return (1.0 - pow(delta, (1.0 / n)));
+  }
+
+  static double exact_lower_bound_on_p_k_eq_n(double n, double delta) {
+    return (pow(delta, (1.0 / n)));
+  }
+
+  static double exact_lower_bound_on_p_k_eq_1(double n, double delta) {
+    return (1.0 - pow((1.0 - delta), (1.0 / n)));
+  }
+
+  static double exactU_upper_bound_on_p_k_eq_minusone(double n, double delta) {
+    return (pow((1.0 - delta), (1.0 / n)));
+  }
+
+};
+
+}
+
+#endif // _BOUNDS_BINOMIAL_PROPORTIONS_HPP_
diff --git a/sampling/include/var_opt_sketch.hpp b/sampling/include/var_opt_sketch.hpp
index 83f89c7..bf22a42 100644
--- a/sampling/include/var_opt_sketch.hpp
+++ b/sampling/include/var_opt_sketch.hpp
@@ -22,19 +22,30 @@
 
 #include "serde.hpp"
 
-#include <vector>
 #include <iterator>
+#include <vector>
 
 namespace datasketches {
 
 template<typename A> using AllocU8 = typename std::allocator_traits<A>::template rebind_alloc<uint8_t>;
 
-int num_allocs = 0;
-
 /**
  * author Kevin Lang 
  * author Jon Malkin
  */
+
+/**
+ * A struct to hold the result of subset sum queries
+ */
+struct subset_summary {
+  const double lower_bound;
+  const double estimate;
+  const double upper_bound;
+  const double total_sketch_weight;
+};
+
+template <typename T, typename S, typename A> class var_opt_union; // forward declaration
+
 template <typename T, typename S = serde<T>, typename A = std::allocator<T>>
 class var_opt_sketch {
 
@@ -43,10 +54,15 @@ class var_opt_sketch {
     static const resize_factor DEFAULT_RESIZE_FACTOR = X8;
 
     var_opt_sketch(uint32_t k, resize_factor rf = DEFAULT_RESIZE_FACTOR);
-    static var_opt_sketch<T,S,A> deserialize(std::istream& is);
-    static var_opt_sketch<T,S,A> deserialize(const void* bytes, size_t size);
+    var_opt_sketch(const var_opt_sketch& other);
+    var_opt_sketch(var_opt_sketch&& other) noexcept;
+    static var_opt_sketch deserialize(std::istream& is);
+    static var_opt_sketch deserialize(const void* bytes, size_t size);
 
-    virtual ~var_opt_sketch();
+    ~var_opt_sketch();
+
+    var_opt_sketch& operator=(const var_opt_sketch& other);
+    var_opt_sketch& operator=(var_opt_sketch&& other);
 
     void update(const T& item, double weight=1.0);
     //void update(T&& item, double weight=1.0);
@@ -72,7 +88,11 @@ class var_opt_sketch {
     std::ostream& to_stream(std::ostream& os) const;
     std::string to_string() const;
 
-    //estimate_subset_sum()
+    subset_summary estimate_subset_sum(std::function<bool(T)> predicate) const;
+
+    class const_iterator;
+    const_iterator begin() const;
+    const_iterator end() const;
 
   private:
     typedef typename std::allocator_traits<A>::template rebind_alloc<double> AllocDouble;
@@ -88,6 +108,9 @@ class var_opt_sketch {
     static const uint8_t EMPTY_FLAG_MASK  = 4;
     static const uint8_t GADGET_FLAG_MASK = 128;
 
+    // Number of standard deviations to use for subset sum error bounds
+    constexpr static const double DEFAULT_KAPPA = 2.0;
+
     // TODO: should probably rearrange a bit to minimize gaps once aligned
     uint32_t k_;                    // max size of sketch, in items
 
@@ -98,7 +121,7 @@ class var_opt_sketch {
     uint64_t n_;                    // total number of items processed by sketch
     double total_wt_r_;             // total weight of items in reservoir-like area
 
-    const resize_factor rf_;        // resize factor
+    resize_factor rf_;              // resize factor
 
     uint32_t curr_items_alloc_;     // currently allocated array size
     bool filled_data_;              // true if we've explciitly set all entries in data_
@@ -128,6 +151,10 @@ class var_opt_sketch {
     var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget, uint8_t preamble_longs, std::istream& is);
     var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget, uint8_t preamble_longs, const void* bytes, size_t size);
 
+    friend class var_opt_union<T,S,A>;
+    var_opt_sketch(const var_opt_sketch& other, bool as_sketch, uint64_t adjusted_n);
+    var_opt_sketch(T* data, double* weights, size_t len, uint32_t k, uint64_t n, uint32_t h_count, uint32_t r_count, double total_wt_r);
+
     // internal-use-only updates
     void update(const T& item, double weight, bool mark);
     void update_warmup_phase(const T& item, double weight, bool mark);
@@ -139,9 +166,9 @@ class var_opt_sketch {
     double peek_min() const;
     bool is_marked(int idx) const;
     
-    int pick_random_slot_in_r() const;
-    int choose_delete_slot(double wt_cand, int num_cand) const;
-    int choose_weighted_delete_slot(double wt_cand, int num_cand) const;
+    uint32_t pick_random_slot_in_r() const;
+    uint32_t choose_delete_slot(double wt_cand, int num_cand) const;
+    uint32_t choose_weighted_delete_slot(double wt_cand, int num_cand) const;
 
     void transition_from_warmup();
     void convert_to_heap();
@@ -151,6 +178,7 @@ class var_opt_sketch {
     void pop_min_to_m_region();
     void grow_candidate_set(double wt_cands, int num_cands);    
     void decrease_k_by_1();
+    void strip_marks();
     void force_set_k(int k); // used to resolve union gadget into sketch
     void downsample_candidate_set(double wt_cands, int num_cands);
     void swap_values(int src, int dst);
@@ -162,20 +190,19 @@ class var_opt_sketch {
     static void check_family_and_serialization_version(uint8_t family_id, uint8_t ser_ver);
     
     // things to move to common utils and share among sketches
-    static int get_adjusted_size(int max_size, int resize_target);
-    static int starting_sub_multiple(int lg_target, int lg_rf, int lg_min);
+    static uint32_t get_adjusted_size(int max_size, int resize_target);
+    static uint32_t starting_sub_multiple(int lg_target, int lg_rf, int lg_min);
+    static double pseudo_hypergeometric_ub_on_p(uint64_t n, uint32_t k, double sampling_rate);
+    static double pseudo_hypergeometric_lb_on_p(uint64_t n, uint32_t k, double sampling_rate);
     static bool is_power_of_2(uint32_t v);
     static uint32_t to_log_2(uint32_t v);
     static uint32_t count_trailing_zeros(uint32_t v);
     static uint32_t ceiling_power_of_2(uint32_t n);
-    static int next_int(int max_value);
+    static uint32_t next_int(uint32_t max_value);
     static double next_double_exclude_zero();
-
-    class const_iterator;
-    const_iterator begin() const;
-    const_iterator end() const;
 };
 
+/*
 template<typename T, typename S, typename A>
 class var_opt_sketch<T, S, A>::const_iterator: public std::iterator<std::input_iterator_tag, T> {
 public:
@@ -189,12 +216,49 @@ public:
 private:
   const T* items;
   const double* weights;
+  const bool* marks;
   const uint32_t h_count;
   const uint32_t r_count;
+  const double total_wt_r;
   const double r_item_wt;
+  double cum_weight; // used for weight correction in R
+  uint32_t final_idx;
   uint32_t index;
-  const_iterator(const T* items, const double* weights, const uint32_t h_count, const uint32_t r_count,
-                 const double total_wt_r, bool use_end=false);
+  const bool get_mark() const;
+  const_iterator(const T* items, const double* weights, const bool* marks_,
+                 const uint32_t h_count, const uint32_t r_count, const double total_wt_r, bool use_end=false);
+};
+*/
+
+template<typename T, typename S, typename A>
+class var_opt_sketch<T, S, A>::const_iterator: public std::iterator<std::input_iterator_tag, T> {
+public:
+  const_iterator(const const_iterator& other);
+  const_iterator& operator++();
+  const_iterator& operator++(int);
+  bool operator==(const const_iterator& other) const;
+  bool operator!=(const const_iterator& other) const;
+  const std::pair<const T&, const double> operator*() const;
+
+private:
+  friend class var_opt_sketch<T,S,A>;
+  friend class var_opt_union<T,S,A>;
+
+  // default iterator over full sketch
+  const_iterator(const var_opt_sketch<T,S,A>& sk, bool is_end);
+  
+  // iterates over only one of the H or R region, optionally applying weight correction
+  // to R region (can correct for numerical precision issues)
+  const_iterator(const var_opt_sketch<T,S,A>& sk, bool is_end, bool use_r_region, bool weight_corr);
+
+  const bool get_mark() const;
+
+  const var_opt_sketch<T,S,A>* sk_;
+  double cum_r_weight_; // used for weight correction
+  double r_item_wt_;
+  size_t idx_;
+  size_t final_idx_;
+  bool weight_correction_;
 };
 
 } // namespace datasketches
diff --git a/sampling/include/var_opt_sketch_impl.hpp b/sampling/include/var_opt_sketch_impl.hpp
index 9fac697..26d4af3 100644
--- a/sampling/include/var_opt_sketch_impl.hpp
+++ b/sampling/include/var_opt_sketch_impl.hpp
@@ -22,6 +22,7 @@
 
 #include "var_opt_sketch.hpp"
 #include "serde.hpp"
+#include "bounds_binomial_proportions.hpp"
 
 #include <memory>
 #include <iostream>
@@ -42,6 +43,101 @@ var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf) :
   var_opt_sketch<T,S,A>(k, rf, false) {}
 
 template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::var_opt_sketch(const var_opt_sketch& other) :
+  k_(other.k_),
+  h_(other.h_),
+  m_(other.m_),
+  r_(other.r_),
+  n_(other.n_),
+  total_wt_r_(other.total_wt_r_),
+  rf_(other.rf_),
+  curr_items_alloc_(other.curr_items_alloc_),
+  filled_data_(other.filled_data_),
+  data_(nullptr),
+  weights_(nullptr),
+  num_marks_in_h_(other.num_marks_in_h_),
+  marks_(nullptr)
+  {
+    data_ = A().allocate(curr_items_alloc_);
+    std::copy(&other.data_[0], &other.data_[curr_items_alloc_], data_);
+    
+    weights_ = AllocDouble().allocate(curr_items_alloc_);
+    std::copy(&other.weights_[0], &other.weights_[curr_items_alloc_], weights_);
+    
+    if (other.marks_ != nullptr) {
+      marks_ = AllocBool().allocate(curr_items_alloc_);
+      std::copy(&other.marks_[0], &other.marks_[curr_items_alloc_], marks_);
+    }
+  }
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::var_opt_sketch(const var_opt_sketch& other, bool as_sketch, uint64_t adjusted_n) :
+  k_(other.k_),
+  h_(other.h_),
+  m_(other.m_),
+  r_(other.r_),
+  n_(adjusted_n),
+  total_wt_r_(other.total_wt_r_),
+  rf_(other.rf_),
+  curr_items_alloc_(other.curr_items_alloc_),
+  filled_data_(other.filled_data_),
+  data_(nullptr),
+  weights_(nullptr),
+  num_marks_in_h_(other.num_marks_in_h_),
+  marks_(nullptr)
+  {
+    data_ = A().allocate(curr_items_alloc_);
+    std::copy(&other.data_[0], &other.data_[curr_items_alloc_], data_);
+    
+    weights_ = AllocDouble().allocate(curr_items_alloc_);
+    std::copy(&other.weights_[0], &other.weights_[curr_items_alloc_], weights_);
+    
+    if (!as_sketch && other.marks_ != nullptr) {
+      marks_ = AllocBool().allocate(curr_items_alloc_);
+      std::copy(&other.marks_[0], &other.marks_[curr_items_alloc_], marks_);
+    }
+  }
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::var_opt_sketch(T* data, double* weights, size_t len,
+                                      uint32_t k, uint64_t n, uint32_t h_count, uint32_t r_count, double total_wt_r) :
+  k_(k),
+  h_(h_count),
+  m_(0),
+  r_(r_count),
+  n_(n),
+  total_wt_r_(total_wt_r),
+  rf_(DEFAULT_RESIZE_FACTOR),
+  curr_items_alloc_(len),
+  filled_data_(n > k),
+  data_(data),
+  weights_(weights),
+  num_marks_in_h_(0),
+  marks_(nullptr)
+  {}
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::var_opt_sketch(var_opt_sketch&& other) noexcept :
+  k_(other.k_),
+  h_(other.h_),
+  m_(other.m_),
+  r_(other.r_),
+  n_(other.n_),
+  total_wt_r_(other.total_wt_r_),
+  rf_(other.rf_),
+  curr_items_alloc_(other.curr_items_alloc_),
+  filled_data_(other.filled_data_),
+  data_(other.data_),
+  weights_(other.weights_),
+  num_marks_in_h_(other.num_marks_in_h_),
+  marks_(other.marks_)
+  {
+    other.data_ = nullptr;
+    other.weights_ = nullptr;
+    other.marks_ = nullptr;
+  }
+
+template<typename T, typename S, typename A>
 var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadget) :
   k_(k), h_(0), m_(0), r_(0), n_(0), total_wt_r_(0.0), rf_(rf) {
   if (k < 1) {
@@ -61,27 +157,67 @@ var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadg
 
 template<typename T, typename S, typename A>
 var_opt_sketch<T,S,A>::~var_opt_sketch() {
-  if (filled_data_) {
-    // destroy everything
-    for (size_t i = 0; i < curr_items_alloc_; ++i) {
-      A().destroy(data_ + i);      
-    }
-  } else {
-    // skip gap or anything unused at the end
-    for (size_t i = 0; i < h_; ++i) {
-      A().destroy(data_+ i);
-    }
+  if (data_ != nullptr) {
+    if (filled_data_) {
+      // destroy everything
+      for (size_t i = 0; i < curr_items_alloc_; ++i) {
+        A().destroy(data_ + i);      
+      }
+    } else {
+      // skip gap or anything unused at the end
+      for (size_t i = 0; i < h_; ++i) {
+        A().destroy(data_+ i);
+      }
     
-    for (size_t i = h_ + 1; i < curr_items_alloc_; ++i) {
-      A().destroy(data_ + i);
+      for (size_t i = h_ + 1; i < curr_items_alloc_; ++i) {
+        A().destroy(data_ + i);
+      }
     }
+    A().deallocate(data_, curr_items_alloc_);
   }
-  A().deallocate(data_, curr_items_alloc_);
-  AllocDouble().deallocate(weights_, curr_items_alloc_);
+
+  if (weights_ != nullptr)
+    AllocDouble().deallocate(weights_, curr_items_alloc_);
   
-  if (marks_ != nullptr) {
+  if (marks_ != nullptr)
     AllocBool().deallocate(marks_, curr_items_alloc_);
-  }
+}
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>& var_opt_sketch<T,S,A>::operator=(const var_opt_sketch& other) {
+  var_opt_sketch<T,S,A> sk_copy(other);
+  std::swap(k_, sk_copy.k_);
+  std::swap(h_, sk_copy.h_);
+  std::swap(m_, sk_copy.m_);
+  std::swap(r_, sk_copy.r_);
+  std::swap(n_, sk_copy.n_);
+  std::swap(total_wt_r_, sk_copy.total_wt_r_);
+  std::swap(rf_, sk_copy.rf_);
+  std::swap(curr_items_alloc_, sk_copy.curr_items_alloc_);
+  std::swap(filled_data_, sk_copy.filled_data_);
+  std::swap(data_, sk_copy.data_);
+  std::swap(weights_, sk_copy.weights_);
+  std::swap(num_marks_in_h_, sk_copy.num_marks_in_h_);
+  std::swap(marks_, sk_copy.marks_);
+  return *this;
+}
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>& var_opt_sketch<T,S,A>::operator=(var_opt_sketch&& other) {
+  std::swap(k_, other.k_);
+  std::swap(h_, other.h_);
+  std::swap(m_, other.m_);
+  std::swap(r_, other.r_);
+  std::swap(n_, other.n_);
+  std::swap(total_wt_r_, other.total_wt_r_);
+  std::swap(rf_, other.rf_);
+  std::swap(curr_items_alloc_, other.curr_items_alloc_);
+  std::swap(filled_data_, other.filled_data_);
+  std::swap(data_, other.data_);
+  std::swap(weights_, other.weights_);
+  std::swap(num_marks_in_h_, other.num_marks_in_h_);
+  std::swap(marks_, other.marks_);
+  return *this;
 }
 
 template<typename T, typename S, typename A>
@@ -129,7 +265,7 @@ var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadg
     is.read((char*)&total_wt_r_, sizeof(total_wt_r_));
     if (isnan(total_wt_r_) || r_ == 0 || total_wt_r_ <= 0.0) {
       throw std::invalid_argument("Possible corruption: deserializing in full mode but r = 0 or invalid R weight. "
-       "Found r = " + std::to_string(r_) + ", R regionw weight = " + std::to_string(total_wt_r_));
+       "Found r = " + std::to_string(r_) + ", R region weight = " + std::to_string(total_wt_r_));
     }
 
     curr_items_alloc_ = k_ + 1;
@@ -206,7 +342,7 @@ var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadg
     ptr += copy_from_mem(ptr, &total_wt_r_, sizeof(total_wt_r_));
     if (isnan(total_wt_r_) || r_ == 0 || total_wt_r_ <= 0.0) {
       throw std::invalid_argument("Possible corruption: deserializing in full mode but r = 0 or invalid R weight. "
-       "Found r = " + std::to_string(r_) + ", R regionw weight = " + std::to_string(total_wt_r_));
+       "Found r = " + std::to_string(r_) + ", R region weight = " + std::to_string(total_wt_r_));
     }
 
     curr_items_alloc_ = k_ + 1;
@@ -279,7 +415,6 @@ var_opt_sketch<T,S,A>::var_opt_sketch(uint32_t k, resize_factor rf, bool is_gadg
 template<typename T, typename S, typename A>
 template<typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
 size_t var_opt_sketch<T,S,A>::get_serialized_size_bytes() const {
-  std::cerr << "fixed-size version!";
   if (is_empty()) { return PREAMBLE_LONGS_EMPTY << 3; }
   size_t num_bytes = (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL) << 3;
   num_bytes += h_ * sizeof(double);    // weights
@@ -294,7 +429,6 @@ size_t var_opt_sketch<T,S,A>::get_serialized_size_bytes() const {
 template<typename T, typename S, typename A>
 template<typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
 size_t var_opt_sketch<T,S,A>::get_serialized_size_bytes() const {
-  std::cerr << "other version!";
   if (is_empty()) { return PREAMBLE_LONGS_EMPTY << 3; }
   size_t num_bytes = (r_ == 0 ? PREAMBLE_LONGS_WARMUP : PREAMBLE_LONGS_FULL) << 3;
   num_bytes += h_ * sizeof(double);    // weights
@@ -414,9 +548,6 @@ void var_opt_sketch<T,S,A>::serialize(std::ostream& os) const {
 
     // write the first h_ weights
     os.write((char*)weights_, h_ * sizeof(double));
-    //for (int i = 0; i < h_; ++i) {
-    //  os.write((char*)&weights_[i], sizeof(double));
-    //}
 
     // write the first h_ marks as packed bytes iff we have a gadget
     if (marks_ != nullptr) {
@@ -648,7 +779,6 @@ void var_opt_sketch<T,S,A>::update_warmup_phase(const T& item, double weight, bo
   }
 
   // store items as they come in until full
-  std::cerr << "h_=" << h_ << ":\t" << item << "\t(alloc: " << curr_items_alloc_ << ")\n";
   A().construct(&data_[h_], T(item));
   weights_[h_] = weight;
   if (marks_ != nullptr) {
@@ -724,9 +854,72 @@ void var_opt_sketch<T,S,A>::update_heavy_r_eq1(const T& item, double weight, boo
   grow_candidate_set(weights_[m_slot] + total_wt_r_, 2);
 }
 
+/**
+ * Decreases sketch's value of k by 1, updating stored values as needed.
+ *
+ * <p>Subject to certain pre-conditions, decreasing k causes tau to increase. This fact is used by
+ * the unioning algorithm to force "marked" items out of H and into the reservoir region.</p>
+ */
+template<typename T, typename S, typename A>
+void var_opt_sketch<T,S,A>::decrease_k_by_1() {
+  if (k_ <= 1) {
+    throw std::logic_error("Cannot decrease k below 1 in union");
+  }
+
+  if ((h_ == 0) && (r_ == 0)) {
+    // exact mode, but no data yet; this reduction is somewhat gratuitous
+    --k_;
+  } else if ((h_ > 0) && (r_ == 0)) {
+    // exact mode, but we have some data
+    --k_;
+    if (h_ > k_) {
+      transition_from_warmup();
+    }
+  } else if ((h_ > 0) && (r_ > 0)) {
+    // reservoir mode, but we have some exact samples.
+    // Our strategy will be to pull an item out of H (which we are allowed to do since it's
+    // still just data), reduce k, and then re-insert the item
+
+    // first, slide the R zone to the left by 1, temporarily filling the gap
+    uint32_t old_gap_idx = h_;
+    uint32_t old_final_r_idx = (h_ + 1 + r_) - 1;
+
+    assert(old_final_r_idx == k_);
+    swap_values(old_final_r_idx, old_gap_idx);
+
+    // now we pull an item out of H; any item is ok, but if we grab the rightmost and then
+    // reduce h_, the heap invariant will be preserved (and the gap will be restored), plus
+    // the push() of the item that will probably happen later will be cheap.
+
+    uint32_t pulled_idx = h_ - 1;
+    T pulled_item = data_[pulled_idx];
+    double pulled_weight = weights_[pulled_idx];
+    bool pulled_mark = marks_[pulled_idx];
+
+    if (pulled_mark) { --num_marks_in_h_; }
+    weights_[pulled_idx] = -1.0; // to make bugs easier to spot
+
+    --h_;
+    --k_;
+    --n_; // will be re-incremented with the update
+
+    update(pulled_item, pulled_weight, pulled_mark);
+  } else if ((h_ == 0) && (r_ > 0)) {
+    // pure reservoir mode, so can simply eject a randomly chosen sample from the reservoir
+    assert(r_ >= 2);
+
+    uint32_t r_idx_to_delete = 1 + next_int(r_); // 1 for the gap
+    uint32_t rightmost_r_idx = (1 + r_) - 1;
+    swap_values(r_idx_to_delete, rightmost_r_idx);
+    weights_[rightmost_r_idx] = -1.0;
+
+    --k_;
+    --r_;
+  }
+}
+
 template<typename T, typename S, typename A>
 void var_opt_sketch<T,S,A>::allocate_data_arrays(uint32_t tgt_size, bool use_marks) {
-  std::cerr << "allocate_data_arrays(" << tgt_size << ", " << use_marks << ")\n";
   filled_data_ = false;
 
   data_ = A().allocate(tgt_size);
@@ -813,12 +1006,11 @@ void var_opt_sketch<T,S,A>::convert_to_heap() {
     restore_towards_leaves(j);
   }
 
-  // TODO: Remove this block
   // validates heap, used for initial debugging
-  for (int j = h_ - 1; j >= 1; --j) {
-    int p = ((j + 1) / 2) - 1;
-    assert(weights_[p] <= weights_[j]);
-  }
+  //for (int j = h_ - 1; j >= 1; --j) {
+  //  int p = ((j + 1) / 2) - 1;
+  //  assert(weights_[p] <= weights_[j]);
+  //}
 }
 
 template<typename T, typename S, typename A>
@@ -984,7 +1176,7 @@ void var_opt_sketch<T,S,A>::downsample_candidate_set(double wt_cands, int num_ca
 }
 
 template<typename T, typename S, typename A>
-int var_opt_sketch<T,S,A>::choose_delete_slot(double wt_cands, int num_cands) const {
+uint32_t var_opt_sketch<T,S,A>::choose_delete_slot(double wt_cands, int num_cands) const {
   assert(r_ > 0);
 
   if (m_ == 0) {
@@ -1012,7 +1204,7 @@ int var_opt_sketch<T,S,A>::choose_delete_slot(double wt_cands, int num_cands) co
 }
 
 template<typename T, typename S, typename A>
-int var_opt_sketch<T,S,A>::choose_weighted_delete_slot(double wt_cands, int num_cands) const {
+uint32_t var_opt_sketch<T,S,A>::choose_weighted_delete_slot(double wt_cands, int num_cands) const {
   assert(m_ >= 1);
 
   int offset = h_;
@@ -1036,7 +1228,7 @@ int var_opt_sketch<T,S,A>::choose_weighted_delete_slot(double wt_cands, int num_
 }
 
 template<typename T, typename S, typename A>
-int var_opt_sketch<T,S,A>::pick_random_slot_in_r() const {
+uint32_t var_opt_sketch<T,S,A>::pick_random_slot_in_r() const {
   assert(r_ > 0);
   int offset = h_ + m_;
   if (r_ == 1) {
@@ -1053,7 +1245,7 @@ double var_opt_sketch<T,S,A>::peek_min() const {
 }
 
 template<typename T, typename S, typename A>
-bool var_opt_sketch<T,S,A>::is_marked(int idx) const {
+inline bool var_opt_sketch<T,S,A>::is_marked(int idx) const {
   return marks_ == nullptr ? false : marks_[idx];
 }
 
@@ -1062,6 +1254,13 @@ double var_opt_sketch<T,S,A>::get_tau() const {
   return r_ == 0 ? std::nan("1") : (total_wt_r_ / r_);
 }
 
+template<typename T, typename S, typename A>
+void var_opt_sketch<T,S,A>::strip_marks() {
+  assert(marks_ != nullptr);
+  num_marks_in_h_ = 0;
+  AllocBool().deallocate(marks_, curr_items_alloc_);
+  marks_ = nullptr;
+}
 
 template<typename T, typename S, typename A>
 void var_opt_sketch<T,S,A>::check_preamble_longs(uint8_t preamble_longs, uint8_t flags) {
@@ -1099,44 +1298,127 @@ void var_opt_sketch<T,S,A>::check_family_and_serialization_version(uint8_t famil
     + std::to_string(FAMILY) + ". Found: " + std::to_string(family_id));
 }
 
+template<typename T, typename S, typename A>
+subset_summary var_opt_sketch<T, S, A>::estimate_subset_sum(std::function<bool(T)> predicate) const {
+  if (n_ == 0) {
+    return {0.0, 0.0, 0.0, 0.0};
+  }
+
+  double total_wt_h = 0.0;
+  double h_true_wt = 0.0;
+  size_t idx = 0;
+  for (; idx < h_; ++idx) {
+    double wt = weights_[idx];
+    total_wt_h += wt;
+    if (predicate(data_[idx])) {
+      h_true_wt += wt;
+    }
+  }
+
+  // if only heavy items, we have an exact answre
+  if (r_ == 0) {
+    return {h_true_wt, h_true_wt, h_true_wt, h_true_wt};
+  }
+
+  uint64_t num_samples = n_ - h_;
+  assert(num_samples > 0);
+  double effective_sampling_rate = r_ / static_cast<double>(num_samples);
+  assert(effective_sampling_rate >= 0.0);
+  assert(effective_sampling_rate <= 1.0);
+
+  size_t r_true_count = 0;
+  ++idx; // skip the gap
+  for (; idx < (k_ + 1); ++idx) {
+    if (predicate(data_[idx])) {
+      ++r_true_count;
+    }
+  }
+
+  double lb_true_fraction = pseudo_hypergeometric_lb_on_p(r_, r_true_count, effective_sampling_rate);
+  double estimated_true_fraction = (1.0 * r_true_count) / r_;
+  double ub_true_fraction = pseudo_hypergeometric_ub_on_p(r_, r_true_count, effective_sampling_rate);
+  
+  return {  h_true_wt + (total_wt_r_ * lb_true_fraction),
+            h_true_wt + (total_wt_r_ * estimated_true_fraction),
+            h_true_wt + (total_wt_r_ * ub_true_fraction),
+            total_wt_h + total_wt_r_
+         };
+}
 
 template<typename T, typename S, typename A>
 typename var_opt_sketch<T, S, A>::const_iterator var_opt_sketch<T, S, A>::begin() const {
-  return var_opt_sketch<T, S, A>::const_iterator(data_, weights_, h_, r_, total_wt_r_, false);
+  return var_opt_sketch<T, S, A>::const_iterator(*this, false);
 }
 
 template<typename T, typename S, typename A>
 typename var_opt_sketch<T, S, A>::const_iterator var_opt_sketch<T, S, A>::end() const {
-  return var_opt_sketch<T, S, A>::const_iterator(data_, weights_, h_, r_, total_wt_r_, true);
+  return var_opt_sketch<T, S, A>::const_iterator(*this, true);
 }
 
 // -------- var_opt_sketch::const_iterator implementation ---------
 
 template<typename T, typename S, typename A>
-var_opt_sketch<T,S,A>::const_iterator::const_iterator(const T* items, const double* weights,
-                                                      const uint32_t h_count, const uint32_t r_count,
-                                                      const double total_wt_r, bool use_end) :
-items(items), weights(weights), h_count(h_count), r_count(r_count), r_item_wt(r_count > 0 ? total_wt_r/r_count : -1.0) {
+var_opt_sketch<T,S,A>::const_iterator::const_iterator(const var_opt_sketch<T,S,A>& sk, bool is_end) :
+  sk_(&sk),
+  cum_r_weight_(0.0),
+  r_item_wt_(sk.get_tau()),
+  weight_correction_(false)
+{
   // index logic easier to read if not inline
-  if (use_end) {
-    index = (r_count > 0 ? h_count + r_count + 1 : h_count);
+  final_idx_ = (sk.r_ > 0 ? sk.h_ + sk.r_ + 1 : sk.h_);
+  if (is_end) {
+    idx_ = final_idx_;
+    sk_ = nullptr;
   } else {
-    index = (h_count == 0 && r_count > 0 ? 1 : 0); // skip the gap if at the start
+    idx_ = (sk.h_ == 0 && sk.r_ > 0 ? 1 : 0); // skip if gap is at start
   }
+
+  // should only apply if sketch is empty
+  if (idx_ == final_idx_) { sk_ = nullptr; }
 }
 
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A>::const_iterator::const_iterator(const var_opt_sketch<T,S,A>& sk, bool is_end, bool use_r_region, bool weight_corr) :
+  sk_(&sk),
+  cum_r_weight_(0.0),
+  r_item_wt_(sk.get_tau()),
+  weight_correction_(weight_corr)
+{
+  if (use_r_region) {
+    idx_ = sk.h_ + 1 + (is_end ? sk.r_ : 0);
+    final_idx_ = sk.h_ + 1 + sk.r_;
+  } else { // H region
+    // gap at start only if h_ == 0, so index always starts at 0
+    idx_ = (is_end ? sk.h_ : 0);
+    final_idx_ = sk.h_;
+  }
+  
+  // unlike in full iterator case, may happen even if sketch is not empty
+  if (idx_ == final_idx_) { sk_ = nullptr; }
+}
+
+
 template<typename T,  typename S, typename A>
-var_opt_sketch<T, S, A>::const_iterator::const_iterator(const const_iterator& other):
-items(other.items), weights(other.weights), h_count(other.h_count), r_count(other.r_count), r_item_wt(other.r_item_wt), index(other.index)
+var_opt_sketch<T, S, A>::const_iterator::const_iterator(const const_iterator& other) :
+  sk_(other.sk_),
+  cum_r_weight_(other.cum_r_weight_),
+  r_item_wt_(other.r_item_wt_),
+  idx_(other.idx_),
+  final_idx_(other.final_idx_),
+  weight_correction_(other.weight_correction_)
 {}
 
 template<typename T,  typename S, typename A>
 typename var_opt_sketch<T, S, A>::const_iterator& var_opt_sketch<T, S, A>::const_iterator::operator++() {
-  ++index;
-  // check for the gap
-  if (index == h_count && r_count > 0) {
-    ++index;
+  ++idx_;
+  
+  if (idx_ == final_idx_) {
+    sk_ = nullptr;
+    return *this;
+  } else if (idx_ == sk_->h_ && sk_->r_ > 0) { // check for the gap
+    ++idx_;
   }
+  if (idx_ > sk_->h_) { cum_r_weight_ += r_item_wt_; }
   return *this;
 }
 
@@ -1149,8 +1431,9 @@ typename var_opt_sketch<T, S, A>::const_iterator& var_opt_sketch<T, S, A>::const
 
 template<typename T, typename S, typename A>
 bool var_opt_sketch<T, S, A>::const_iterator::operator==(const const_iterator& other) const {
-  if (items != other.items || h_count != other.h_count || r_count != other.r_count) return false;
-  return index == other.index;
+  if (sk_ != other.sk_) return false;
+  if (sk_ == nullptr) return true; // end (and we know other.sk_ is also null)
+  return idx_ == other.idx_;
 }
 
 template<typename T, typename S, typename A>
@@ -1160,7 +1443,20 @@ bool var_opt_sketch<T, S, A>::const_iterator::operator!=(const const_iterator& o
 
 template<typename T, typename S, typename A>
 const std::pair<const T&, const double> var_opt_sketch<T, S, A>::const_iterator::operator*() const {
-  return std::pair<const T&, const double>(items[index], weights[index]);
+  double wt;
+  if (idx_ < sk_->h_) {
+    wt = sk_->weights_[idx_];
+  } else if (weight_correction_ && idx_ == final_idx_ - 1) {
+    wt = sk_->total_wt_r_ - cum_r_weight_;
+  } else {
+    wt = r_item_wt_;
+  }
+  return std::pair<const T&, const double>(sk_->data_[idx_], wt);
+}
+
+template<typename T, typename S, typename A>
+const bool var_opt_sketch<T, S, A>::const_iterator::get_mark() const {
+  return sk_->marks_ == nullptr ? false : sk_->marks_[idx_];
 }
 
 
@@ -1177,7 +1473,7 @@ namespace random_utils {
  * If so, returns max sampling size, otherwise passes through target size.
  */
 template<typename T, typename S, typename A>
-int var_opt_sketch<T,S,A>::get_adjusted_size(int max_size, int resize_target) {
+uint32_t var_opt_sketch<T,S,A>::get_adjusted_size(int max_size, int resize_target) {
   if (max_size - (resize_target << 1) < 0L) {
     return max_size;
   }
@@ -1185,13 +1481,25 @@ int var_opt_sketch<T,S,A>::get_adjusted_size(int max_size, int resize_target) {
 }
 
 template<typename T, typename S, typename A>
-int var_opt_sketch<T,S,A>::starting_sub_multiple(int lg_target, int lg_rf, int lg_min) {
+uint32_t var_opt_sketch<T,S,A>::starting_sub_multiple(int lg_target, int lg_rf, int lg_min) {
   return (lg_target <= lg_min)
           ? lg_min : (lg_rf == 0) ? lg_target
           : (lg_target - lg_min) % lg_rf + lg_min;
 }
 
 template<typename T, typename S, typename A>
+double var_opt_sketch<T,S,A>::pseudo_hypergeometric_ub_on_p(uint64_t n, uint32_t k, double sampling_rate) {
+  double adjusted_kappa = DEFAULT_KAPPA * sqrt(1 - sampling_rate);
+  return bounds_binomial_proportions::approximate_upper_bound_on_p(n, k, adjusted_kappa);
+}
+
+template<typename T, typename S, typename A>
+double var_opt_sketch<T,S,A>::pseudo_hypergeometric_lb_on_p(uint64_t n, uint32_t k, double sampling_rate) {
+  double adjusted_kappa = DEFAULT_KAPPA * sqrt(1 - sampling_rate);
+  return bounds_binomial_proportions::approximate_lower_bound_on_p(n, k, adjusted_kappa);
+}
+
+template<typename T, typename S, typename A>
 bool var_opt_sketch<T,S,A>::is_power_of_2(uint32_t v) {
   return v && !(v & (v - 1));
 }
@@ -1207,8 +1515,8 @@ uint32_t var_opt_sketch<T,S,A>::to_log_2(uint32_t v) {
 
 // Returns an integer in the range [0, max_value) -- excludes max_value
 template<typename T, typename S, typename A>
-int var_opt_sketch<T,S,A>::next_int(int max_value) {
-  std::uniform_int_distribution<> dist(0, max_value - 1);
+uint32_t var_opt_sketch<T,S,A>::next_int(uint32_t max_value) {
+  std::uniform_int_distribution<uint32_t> dist(0, max_value - 1);
   return dist(random_utils::rand);
 }
 
diff --git a/sampling/include/var_opt_union.hpp b/sampling/include/var_opt_union.hpp
new file mode 100644
index 0000000..a1a24ff
--- /dev/null
+++ b/sampling/include/var_opt_union.hpp
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _VAR_OPT_UNION_HPP_
+#define _VAR_OPT_UNION_HPP_
+
+#include "var_opt_sketch.hpp"
+#include "serde.hpp"
+
+#include <vector>
+
+namespace datasketches {
+
+template<typename A> using AllocU8 = typename std::allocator_traits<A>::template rebind_alloc<uint8_t>;
+
+/**
+ * author Kevin Lang 
+ * author Jon Malkin
+ */
+
+template <typename T, typename S = serde<T>, typename A = std::allocator<T>>
+class var_opt_union {
+
+public:
+  explicit var_opt_union(uint32_t max_k);
+  var_opt_union(const var_opt_union& other);
+  var_opt_union(var_opt_union&& other) noexcept;
+  //static var_opt_union deserialize(std::istream& is);
+  //static var_opt_union deserialize(const void* bytes, size_t size);
+    
+  ~var_opt_union();
+
+  var_opt_union& operator=(const var_opt_union& other);
+  var_opt_union& operator=(var_opt_union&& other);
+
+  void update(var_opt_sketch<T,S,A>& sk);
+  //void update(var_opt_sketch<T,S,A>>& sk);
+
+  void reset();
+
+  /**
+   * Gets the varopt sketch resulting from the union of any input sketches.
+   *
+   * @return A varopt sketch
+   */
+  var_opt_sketch<T,S,A> get_result() const;
+  
+  //std::vector<uint8_t, AllocU8<A>> serialize(unsigned header_size_bytes = 0) const;
+  //void serialize(std::ostream& os) const;
+
+  std::ostream& to_stream(std::ostream& os) const;
+  std::string to_string() const;
+
+
+private:
+  typedef typename std::allocator_traits<A>::template rebind_alloc<var_opt_sketch<T,S,A>> AllocSketch;
+
+  uint64_t n_; // cumulative over all input sketches
+
+  // outer tau is the largest tau of any input sketch
+  double outer_tau_numer_; // total weight of all input R-zones where tau = outer_tau
+
+  // total cardinality of the same R-zones, or zero if no input sketch was in estimation mode
+  uint64_t outer_tau_denom_;
+
+  const uint32_t max_k_;
+
+  var_opt_sketch<T,S,A> gadget_;
+
+  /*
+   IMPORTANT NOTE: the "gadget" in the union object appears to be a varopt sketch,
+   but in fact is NOT because it doesn't satisfy the mathematical definition
+   of a varopt sketch of the concatenated input streams. Therefore it could be different
+   from a true varopt sketch with that value of K, in which case it could easily provide
+   worse estimation accuracy for subset-sum queries.
+
+   This should not surprise you; the approximation guarantees of varopt sketches
+   do not apply to things that merely resemble varopt sketches.
+
+   However, even though the gadget is not a varopt sketch, the result
+   of the unioning process IS a varopt sketch. It is constructed by a
+   somewhat complicated "resolution" process which determines the largest K
+   that a valid varopt sketch could have given the available information,
+   then constructs a varopt sketch of that size and returns it.
+
+   However, the gadget itself is not touched during the resolution process,
+   and additional sketches could subsequently be merged into the union,
+   at which point a varopt result could again be requested.
+   */
+
+  /*
+   Explanation of "marked items" in the union's gadget:
+
+   The boolean value "true" in an pair indicates that the item
+   came from an input sketch's R zone, so it is already the result of sampling.
+
+   Therefore it must not wind up in the H zone of the final result, because
+   that would imply that the item is "exact".
+
+   However, it is okay for a marked item to hang out in the gadget's H zone for a while.
+
+   And once the item has moved to the gadget's R zone, the mark is never checked again,
+   so no effort is made to ensure that its value is preserved or even makes sense.
+   */
+
+  /*
+   Note: if the computer could perform exact real-valued arithmetic, the union could finalize
+   its result by reducing k until inner_tau > outer_tau. [Due to the vagaries of floating point
+   arithmetic, we won't attempt to detect and specially handle the inner_tau = outer_tau special
+   case.]
+
+   In fact, we won't even look at tau while while reducing k. Instead the logic will be based
+   on the more robust integer quantity num_marks_in_h_ in the gadget. It is conceivable that due
+   to round-off error we could end up with inner_tau slightly less than outer_tau, but that should
+   be fairly harmless since we will have achieved our goal of getting the marked items out of H.
+
+   Also, you might be wondering why we are bothering to maintain the numerator and denominator
+   separately instead of just having a single variable outer_tau. This allows us (in certain
+   cases) to add an input's entire R-zone weight into the result sketch, as opposed to subdividing
+   it then adding it back up. That would be a source of numerical inaccuracy. And even
+   more importantly, this design choice allows us to exactly re-construct the input sketch
+   when there is only one of them.
+   */
+  void merge_into(const var_opt_sketch<T,S,A>& sk);
+
+  double get_outer_tau() const;
+
+  var_opt_sketch<T,S,A> simple_gadget_coercer() const;
+
+  bool there_exist_unmarked_h_items_lighter_than_target(double threshold) const;
+  bool detect_and_handle_subcase_of_pseudo_exact(var_opt_sketch<T,S,A>& sk) const;
+  void mark_moving_gadget_coercer(var_opt_sketch<T,S,A>& sk) const;
+  void migrate_marked_items_by_decreasing_k(var_opt_sketch<T,S,A>& sk) const;
+};
+
+}
+
+#include "var_opt_union_impl.hpp"
+
+#endif // _VAR_OPT_UNION_HPP_
\ No newline at end of file
diff --git a/sampling/include/var_opt_union_impl.hpp b/sampling/include/var_opt_union_impl.hpp
new file mode 100644
index 0000000..9fe2323
--- /dev/null
+++ b/sampling/include/var_opt_union_impl.hpp
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _VAR_OPT_UNION_IMPL_HPP_
+#define _VAR_OPT_UNION_IMPL_HPP_
+
+#include "var_opt_union.hpp"
+
+#include <cmath>
+
+namespace datasketches {
+
+template<typename T, typename S, typename A>
+var_opt_union<T,S,A>::var_opt_union(uint32_t max_k) :
+  n_(0),
+  outer_tau_numer_(0),
+  outer_tau_denom_(0.0),
+  max_k_(max_k),
+  gadget_(max_k, var_opt_sketch<T,S,A>::DEFAULT_RESIZE_FACTOR, true)
+{}
+
+template<typename T, typename S, typename A>
+var_opt_union<T,S,A>::var_opt_union(const var_opt_union& other) :
+  n_(other.n_),
+  outer_tau_numer_(other.outer_tau_numer_),
+  outer_tau_denom_(other.outer_tau_denom_),
+  max_k_(other.max_k_),
+  gadget_(other.gadget)
+{}
+
+template<typename T, typename S, typename A>
+var_opt_union<T,S,A>::var_opt_union(var_opt_union&& other) noexcept :
+  n_(other.n_),
+  outer_tau_numer_(other.outer_tau_numer_),
+  outer_tau_denom_(other.outer_tau_denom_),
+  max_k_(other.max_k_)
+{
+  gadget_ = std::move(other.gadget_);
+}
+
+template<typename T, typename S, typename A>
+var_opt_union<T,S,A>::~var_opt_union() {}
+
+template<typename T, typename S, typename A>
+var_opt_union<T,S,A>& var_opt_union<T,S,A>::operator=(const var_opt_union& other) {
+  var_opt_union<T,S,A> union_copy(other);
+  std::swap(n_, union_copy.n_);
+  std::swap(outer_tau_numer_, union_copy.outer_tau_numer_);
+  std::swap(outer_tau_numer_, union_copy.outer_tau_denom_);
+  std::swap(max_k_, union_copy.max_k_);
+  std::swap(gadget_, union_copy.gadget_);
+  return *this;
+}
+
+template<typename T, typename S, typename A>
+var_opt_union<T,S,A>& var_opt_union<T,S,A>::operator=(var_opt_union&& other) {
+  std::swap(n_, other.n_);
+  std::swap(outer_tau_numer_, other.outer_tau_numer_);
+  std::swap(outer_tau_numer_, other.outer_tau_denom_);
+  std::swap(max_k_, other.max_k_);
+  std::swap(gadget_, other.gadget_);
+  return *this;
+}
+
+template<typename T, typename S, typename A>
+void var_opt_union<T,S,A>::reset() {
+  if (gadget_ != nullptr)
+    gadget_.reset();
+}
+
+template<typename T, typename S, typename A>
+std::ostream& var_opt_union<T,S,A>::to_stream(std::ostream& os) const {
+  os << "### VarOpt Union SUMMARY: " << std::endl;
+  os << " . n             : " << n_ << std::endl;
+  os << "   Max k         : " << max_k_ << std::endl;
+  os << "   Gadget Summary: " << std::endl;
+  gadget_.to_stream(os);
+  os << "### END VarOpt Union SUMMARY: " << std::endl;
+
+  return os;
+}
+
+template<typename T, typename S, typename A>
+std::string var_opt_union<T,S,A>::to_string() const {
+  std::ostringstream ss;
+  to_stream(ss);
+  return ss.str();
+}
+
+template<typename T, typename S, typename A>
+void var_opt_union<T,S,A>::update(var_opt_sketch<T,S,A>& sk) {
+  merge_into(sk);
+}
+
+template<typename T, typename S, typename A>
+double var_opt_union<T,S,A>::get_outer_tau() const {
+  if (outer_tau_denom_ == 0) {
+    return 0.0;
+  } else {
+    return outer_tau_numer_ / outer_tau_denom_;
+  }
+}
+
+template<typename T, typename S, typename A>
+void var_opt_union<T,S,A>::merge_into(const var_opt_sketch<T,S,A>& sketch) {
+  if (sketch.n_ == 0) {
+    return;
+  }
+
+  n_ += sketch.n_;
+
+  // H region iterator
+  typename var_opt_sketch<T,S,A>::const_iterator h_itr(sketch, false, false, false);
+  typename var_opt_sketch<T,S,A>::const_iterator h_end(sketch, true, false, false);
+  while (h_itr != h_end) {
+    std::pair<const T&, const double> sample = *h_itr;
+    gadget_.update(sample.first, sample.second, false);
+    ++h_itr;
+  }
+
+  // Weight-correcitng R region iterator
+  typename var_opt_sketch<T,S,A>::const_iterator r_itr(sketch, false, true, true);
+  typename var_opt_sketch<T,S,A>::const_iterator r_end(sketch, true, true, true);
+  while (r_itr != r_end) {
+    std::pair<const T&, const double> sample = *r_itr;
+    gadget_.update(sample.first, sample.second, true);
+    ++r_itr;
+  }
+
+  // resolve tau
+  if (sketch.r_ > 0) {
+    double sketch_tau = sketch.get_tau();
+    double outer_tau = get_outer_tau();
+
+    if (outer_tau_denom_ == 0) {
+      // detect first estimation mode sketch and grab its tau
+      outer_tau_numer_ = sketch.total_wt_r_;
+      outer_tau_denom_ = sketch.r_;
+    } else if (sketch_tau > outer_tau) {
+      // switch to a bigger value of outer_tau
+      outer_tau_numer_ = sketch.total_wt_r_;
+      outer_tau_denom_ = sketch.r_;
+    } else if (sketch_tau == outer_tau) {
+      // Ok if previous equality test isn't quite perfect. Mistakes in either direction should
+      // be fairly benign.
+      // Without conceptually changing outer_tau, update number and denominator. In particular,
+      // add the total weight of the incoming reservoir to the running total.
+      outer_tau_numer_ += sketch.total_wt_r_;
+      outer_tau_denom_ += sketch.r_;
+    }
+
+    // do nothing if sketch's tau is smaller than outer_tau
+  }
+}
+
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A> var_opt_union<T,S,A>::get_result() const {
+  // If no marked items in H, gadget is already valid mathematically. We can return what is
+  // basically just a copy of the gadget.
+  if (gadget_.num_marks_in_h_ == 0) {
+    return simple_gadget_coercer();
+  } else {
+    // Copy of gadget. This may produce needless copying in the
+    // pseudo-exact case below, but should simplify the code without
+    // needing to make the gadget a pointer
+    var_opt_sketch<T,S,A> gcopy(gadget_, false, n_);
+
+    // At this point, we know that marked items are present in H. So:
+    //   1. Result will necessarily be in estimation mode
+    //   2. Marked items currently in H need to be absorbed into reservoir (R)
+    bool is_pseudo_exact = detect_and_handle_subcase_of_pseudo_exact(gcopy);
+    if (!is_pseudo_exact) {
+      // continue with main logic
+      migrate_marked_items_by_decreasing_k(gcopy);
+    }
+    // sub-case was already detected and handled, so return the result
+    return gcopy;
+  }
+}
+
+/**
+ * When there are no marked items in H, the gadget is mathematically equivalent to a valid
+ * varopt sketch. This method simply returns a copy (without perserving marks).
+ *
+ * @return A shallow copy of the gadget as valid varopt sketch
+ */
+template<typename T, typename S, typename A>
+var_opt_sketch<T,S,A> var_opt_union<T,S,A>::simple_gadget_coercer() const {
+  assert(gadget_.num_marks_in_h_ == 0);
+  return var_opt_sketch<T,S,A>(gadget_, true, n_);
+}
+
+// this is a condition checked in detect_and_handle_subcase_of_pseudo_exact()
+template<typename T, typename S, typename A>
+bool var_opt_union<T,S,A>::there_exist_unmarked_h_items_lighter_than_target(double threshold) const {
+  for (int i = 0; i < gadget_.h_; ++i) {
+    if ((gadget_.weights_[i] < threshold) && !gadget_.marks_[i]) {
+      return true;
+    }
+  }
+  return false;
+}
+
+template<typename T, typename S, typename A>
+bool var_opt_union<T,S,A>::detect_and_handle_subcase_of_pseudo_exact(var_opt_sketch<T,S,A>& sk) const {
+  // gadget is seemingly exact
+  bool condition1 = gadget_.r_ == 0;
+
+  // but there are marked items in H, so only _pseudo_ exact
+  bool condition2 = gadget_.num_marks_in_h_ > 0;
+
+  // if gadget is pseudo-exact and the number of marks equals outer_tau_denom, then we can deduce
+  // from the bookkeeping logic of mergeInto() that all estimation mode input sketches must
+  // have had the same tau, so we can throw all of the marked items into a common reservoir.
+  bool condition3 = gadget_.num_marks_in_h_ == outer_tau_denom_;
+
+  if (!(condition1 && condition2 && condition3)) {
+    return false;
+  } else {
+
+    // explicitly enforce rule that items in H should not be lighter than the sketch's tau
+    bool anti_condition4 = there_exist_unmarked_h_items_lighter_than_target(gadget_.get_tau());
+    if (anti_condition4) {
+      return false;
+    } else {
+      // conditions 1 through 4 hold
+      mark_moving_gadget_coercer(sk);
+      return true;
+    }
+  }
+}
+
+/**
+ * This coercer directly transfers marked items from the gadget's H into the result's R.
+ * Deciding whether that is a valid thing to do is the responsibility of the caller. Currently,
+ * this is only used for a subcase of pseudo-exact, but later it might be used by other
+ * subcases as well.
+ *
+ * @param sk Copy of the gadget, modified with marked items moved to the reservoir
+ */
+template<typename T, typename S, typename A>
+void var_opt_union<T,S,A>::mark_moving_gadget_coercer(var_opt_sketch<T,S,A>& sk) const {
+  uint32_t result_k = gadget_.h_ + gadget_.r_;
+
+  uint32_t result_h = 0;
+  uint32_t result_r = 0;
+  size_t next_r_pos = result_k; // = (result_k+1)-1, to fill R region from back to front
+
+  typedef typename std::allocator_traits<A>::template rebind_alloc<double> AllocDouble;
+  double* wts = AllocDouble().allocate(result_k + 1);
+  T* data     = A().allocate(result_k + 1);
+    
+  // insert R region items, ignoring weights
+  // Currently (May 2017) this next block is unreachable; this coercer is used only in the
+  // pseudo-exact case in which case there are no items natively in R, only marked items in H
+  // that will be moved into R as part of the coercion process.
+  // Addedndum (Jan 2020): Cleanup at end of method assumes R count is 0
+  size_t final_idx = gadget_.get_num_samples();
+  for (size_t idx = gadget_.h_ + 1; idx <= final_idx; ++idx) {
+    data[next_r_pos] = gadget_.data_[idx];
+    wts[next_r_pos]  = gadget_.weights_[idx];
+    ++result_r;
+    --next_r_pos;
+  }
+  
+  double transferred_weight = 0;
+
+  // insert H region items
+  for (size_t idx = 0; idx < gadget_.h_; ++idx) {
+    if (gadget_.marks_[idx]) {
+      data[next_r_pos] = gadget_.data_[idx];
+      wts[next_r_pos] = -1.0;
+      transferred_weight += gadget_.weights_[idx];
+      ++result_r;
+      --next_r_pos;
+    } else {
+      data[result_h] = gadget_.data_[idx];
+      wts[result_h] = gadget_.weights_[idx];
+      ++result_h;
+    }
+  }
+
+  assert((result_h + result_r) == result_k);
+  assert(fabs(transferred_weight - outer_tau_numer_) < 1e-10);
+
+  double result_r_weight = gadget_.total_wt_r_ + transferred_weight;
+  uint64_t result_n = n_;
+    
+  // explicitly set weight value for the gap
+  //data[result_h] = nullptr;  invalid not a pointer
+  wts[result_h] = -1.0;
+
+  // clean up arrays in input sketch, replace with new values
+  typedef typename std::allocator_traits<A>::template rebind_alloc<bool> AllocBool;
+  AllocBool().deallocate(sk.marks_, sk.curr_items_alloc_);
+  AllocDouble().deallocate(sk.weights_, sk.curr_items_alloc_);
+  for (size_t i = 0; i < result_k; ++i) { A().destroy(sk.data_ + i); } // assumes everything in H region, no gap
+  A().deallocate(sk.data_, sk.curr_items_alloc_);
+
+  sk.data_ = data;
+  sk.weights_ = wts;
+  sk.marks_ = nullptr;
+  sk.num_marks_in_h_ = 0;
+  sk.curr_items_alloc_ = result_k + 1;
+  sk.k_ = result_k;
+  sk.n_ = result_n;
+  sk.h_ = result_h;
+  sk.r_ = result_r;
+  sk.total_wt_r_ = result_r_weight;
+}
+
+// this is basically a continuation of get_result(), but modifying the input gadget copy
+template<typename T, typename S, typename A>
+void var_opt_union<T,S,A>::migrate_marked_items_by_decreasing_k(var_opt_sketch<T,S,A>& gcopy) const {
+  uint32_t r_count = gcopy.r_;
+  uint32_t h_count = gcopy.h_;
+  uint32_t k = gcopy.k_;
+
+  assert(gcopy.num_marks_in_h_ > 0); // ensured by caller
+  // either full (of samples), or in pseudo-exact mode, or both
+  assert((r_count == 0) || (k == (h_count + r_count)));
+
+  // if non-full and pseudo-exact, change k so that gcopy is full
+  if ((r_count == 0) && (h_count < k)) {
+    gcopy.k_ = h_count; // may leve extra space allocated but that's ok
+  }
+
+  // Now k equals the number of samples, so reducing k will increase tau.
+  // Also, we know that there are at least 2 samples because 0 or 1 would have been handled
+  // by the earlier logic in get_result()
+  assert(gcopy.k_ >= 2);
+  gcopy.decrease_k_by_1();
+
+  // gcopy is now in estimation mode, just like the final result must be (due to marked items)
+  assert(gcopy.r_ > 0);
+  assert(gcopy.get_tau() > 0.0);
+
+  // keep reducing k until all marked items have been absorbed into the reservoir
+  while (gcopy.num_marks_in_h_ > 0) {
+    assert(gcopy.k_ >= 2); // because h_ and r_ are both at least 1
+    gcopy.decrease_k_by_1();
+  }
+
+  gcopy.strip_marks();
+}
+
+
+} // namespace datasketches
+
+#endif // _VAR_OPT_UNION_IMPL_HPP_
diff --git a/sampling/test/counting_allocator.hpp b/sampling/test/counting_allocator.hpp
deleted file mode 100644
index e4c546f..0000000
--- a/sampling/test/counting_allocator.hpp
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#ifndef COUNTING_ALLOCATOR_H
-#define COUNTING_ALLOCATOR_H
-
-#include <new>
-#include <utility>
-
-namespace datasketches {
-
-// this relies on global variables to track allocated memory
-extern long long int total_allocated_memory;
-extern long long int total_objects_constructed;
-
-template <class T> class counting_allocator {
-public:
-  typedef T                 value_type;
-  typedef value_type*       pointer;
-  typedef const value_type* const_pointer;
-  typedef value_type&       reference;
-  typedef const value_type& const_reference;
-  typedef std::size_t       size_type;
-  typedef std::ptrdiff_t    difference_type;
-
-  template <class U>
-  struct rebind { typedef counting_allocator<U> other; };
-
-  counting_allocator() {}
-  counting_allocator(const counting_allocator&) {}
-  template <class U>
-  counting_allocator(const counting_allocator<U>&) {}
-  ~counting_allocator() {}
-
-  pointer address(reference x) const { return &x; }
-  const_pointer address(const_reference x) const {
-    return &x;
-  }
-
-  pointer allocate(size_type n, const_pointer = 0) {
-    void* p = malloc(n * sizeof(T));
-    if (!p) throw std::bad_alloc();
-    total_allocated_memory += n * sizeof(T);
-    return static_cast<pointer>(p);
-  }
-
-  void deallocate(pointer p, size_type n) {
-    if (p) free(p);
-    total_allocated_memory -= n * sizeof(T);
-  }
-
-  size_type max_size() const {
-    return static_cast<size_type>(-1) / sizeof(T);
-  }
-
-  template<typename... Args>
-  void construct(pointer p, Args&&... args) {
-    new(p) value_type(std::forward<Args>(args)...);
-    ++total_objects_constructed;
-  }
-  void destroy(pointer p) {
-    p->~value_type();
-    --total_objects_constructed;
-  }
-
-private:
-  void operator=(const counting_allocator&);
-};
-
-template<> class counting_allocator<void> {
-public:
-  typedef void        value_type;
-  typedef void*       pointer;
-  typedef const void* const_pointer;
-
-  template <class U>
-  struct rebind { typedef counting_allocator<U> other; };
-};
-
-
-template <class T>
-inline bool operator==(const counting_allocator<T>&, const counting_allocator<T>&) {
-  return true;
-}
-
-template <class T>
-inline bool operator!=(const counting_allocator<T>&, const counting_allocator<T>&) {
-  return false;
-}
-
-}
-
-#endif
diff --git a/sampling/test/var_opt_sketch_test.cpp b/sampling/test/var_opt_sketch_test.cpp
index ba75ba3..7251eb7 100644
--- a/sampling/test/var_opt_sketch_test.cpp
+++ b/sampling/test/var_opt_sketch_test.cpp
@@ -23,6 +23,7 @@
 #include "counting_allocator.hpp"
 
 #include <var_opt_sketch.hpp>
+#include <var_opt_union.hpp>
 
 #include <string>
 #include <sstream>
@@ -36,16 +37,35 @@ static std::string testBinaryInputPath = "test/";
 
 namespace datasketches {
 
-long long int total_allocated_memory;
-long long int total_objects_constructed;
-
 class var_opt_sketch_test: public CppUnit::TestFixture {
 
   CPPUNIT_TEST_SUITE(var_opt_sketch_test);
   CPPUNIT_TEST(empty);
-  CPPUNIT_TEST(mem_test);
+  CPPUNIT_TEST(vo_union);
   CPPUNIT_TEST_SUITE_END();
 
+  void vo_union() {
+    int k = 10;
+    var_opt_sketch<int> sk(k), sk2(k+3);
+
+    for (int i = 0; i < 10*k; ++i) {
+      sk.update(i);
+      sk2.update(i);
+    }
+    sk.update(-1, 10000.0);
+    sk2.update(-2, 4000.0);
+    std::cerr << sk.to_string() << std::endl;
+
+    var_opt_union<int> vou(k+3);
+    std::cerr << vou.to_string() << std::endl;
+    vou.update(sk);
+    vou.update(sk2);
+    std::cerr << vou.to_string() << std::endl;
+
+    var_opt_sketch<int> r = vou.get_result();
+    std::cerr << "-----------------------" << std::endl << r.to_string() << std::endl;
+  }
+
   void empty() {
     int k = 10;
 
@@ -64,7 +84,7 @@ class var_opt_sketch_test: public CppUnit::TestFixture {
     var_opt_sketch<int> sk2 = var_opt_sketch<int>::deserialize(ss);
     std::cout << sk2.to_string() << std::endl;;
     }
-    std::cerr << "num allocs: " << num_allocs << "\n";
+
     {
     var_opt_sketch<std::string> sk(k);
     std::cout << "Expected size: " << sk.get_serialized_size_bytes() << std::endl;
@@ -112,91 +132,6 @@ class var_opt_sketch_test: public CppUnit::TestFixture {
     }
   }
 
-  void print_mem_stats() {
-    std::cout << "construct(): " << total_objects_constructed << std::endl;
-    std::cout << "memory used: " << total_allocated_memory << std::endl;
-  }
-
-  void mem_test() {
-    int k = 10;
-
-    total_allocated_memory = 0;
-    total_objects_constructed = 0;
-
-    typedef var_opt_sketch<std::string, serde<std::string>, counting_allocator<std::string>> var_opt_sketch_a;
-    var_opt_sketch_a* s = new (counting_allocator<var_opt_sketch_a>().allocate(1)) var_opt_sketch_a(k);
-  
-    std::string x[26];
-    x[0]  = std::string("a");
-    x[1]  = std::string("b");
-    x[2]  = std::string("c");
-    x[3]  = std::string("d");
-    x[4]  = std::string("e");
-    x[5]  = std::string("f");
-    x[6]  = std::string("g");
-    x[7]  = std::string("h");
-    x[8]  = std::string("i");
-    x[9]  = std::string("j");
-    x[10] = std::string("k");
-    x[11] = std::string("l");
-    x[12] = std::string("m");
-    x[13] = std::string("n");
-    x[14] = std::string("o");
-    x[15] = std::string("p");
-    x[16] = std::string("q");
-    x[17] = std::string("r");
-    x[18] = std::string("s");
-    x[19] = std::string("t");
-    x[20] = std::string("u");
-    x[21] = std::string("v");
-    x[22] = std::string("w");
-    x[23] = std::string("x");
-    x[24] = std::string("y");
-    x[25] = std::string("z");
-
-    for (int i=0; i <5; ++i)
-      s->update(x[i]);
-    print_mem_stats();
-
-    for (int i=5; i <11; ++i)
-      s->update(x[i]);
-    print_mem_stats();
-
-    s->update(x[11], 10000);
-    print_mem_stats();
-
-    for (int i=12; i <26; ++i)
-      s->update(x[i]);
-    print_mem_stats();
-
-    std::stringstream ss(std::ios::in | std::ios::out | std::ios::binary);
-    s->serialize(ss);
-    std::cout << s->to_string() << std::endl;
-    print_mem_stats();
-
-    counting_allocator<var_opt_sketch_a>().destroy(s);
-    counting_allocator<var_opt_sketch_a>().deallocate(s, 1);
-    print_mem_stats();
-
-    {
-      auto sk = var_opt_sketch_a::deserialize(ss);
-      print_mem_stats();
-
-      sk.update("qrs");
-      print_mem_stats();
-
-      sk.update("zyx");
-      print_mem_stats();
-
-      std::cout << sk.to_string() << std::endl;
-
-      auto vec = sk.serialize();
-      var_opt_sketch_a sk2 = var_opt_sketch_a::deserialize(vec.data(), vec.size());
-      std::cout << sk2.to_string() << std::endl;
-    }
-    print_mem_stats();
-  }
-
 };
 
 CPPUNIT_TEST_SUITE_REGISTRATION(var_opt_sketch_test);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org