You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by jm...@apache.org on 2022/01/05 23:46:01 UTC

[datasketches-cpp] 01/01: WIP initial commit of classic quantiles. incomplete, but the basic algo does seem to work.

This is an automated email from the ASF dual-hosted git repository.

jmalkin pushed a commit to branch quantiles
in repository https://gitbox.apache.org/repos/asf/datasketches-cpp.git

commit e7fd82cc465fe716a1163a5030c4ad94a727ac99
Author: Jon Malkin <jm...@users.noreply.github.com>
AuthorDate: Wed Jan 5 15:45:47 2022 -0800

    WIP initial commit of classic quantiles. incomplete, but the basic algo does seem to work.
---
 CMakeLists.txt                              |   3 +-
 common/include/common_defs.hpp              |   6 +
 quantiles/CMakeLists.txt                    |  48 ++
 quantiles/include/quantiles_sketch.hpp      | 587 +++++++++++++++++++++
 quantiles/include/quantiles_sketch_impl.hpp | 672 +++++++++++++++++++++++
 quantiles/test/CMakeLists.txt               |  45 ++
 quantiles/test/quantiles_sketch_test.cpp    | 792 ++++++++++++++++++++++++++++
 7 files changed, 2152 insertions(+), 1 deletion(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9cd8d4a..39a8710 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -106,12 +106,13 @@ add_subdirectory(theta)
 add_subdirectory(sampling)
 add_subdirectory(tuple)
 add_subdirectory(req)
+add_subdirectory(quantiles)
 
 if (WITH_PYTHON)
   add_subdirectory(python)
 endif()
 
-target_link_libraries(datasketches INTERFACE hll cpc kll fi theta sampling)
+target_link_libraries(datasketches INTERFACE hll cpc kll fi theta sampling req quantiles)
 
 if (COVERAGE)
   find_program(LCOV_PATH NAMES "lcov")
diff --git a/common/include/common_defs.hpp b/common/include/common_defs.hpp
index dadcaac..fa3a5ee 100644
--- a/common/include/common_defs.hpp
+++ b/common/include/common_defs.hpp
@@ -24,6 +24,7 @@
 #include <string>
 #include <memory>
 #include <iostream>
+#include <random>
 
 namespace datasketches {
 
@@ -34,6 +35,11 @@ enum resize_factor { X1 = 0, X2, X4, X8 };
 template<typename A> using AllocChar = typename std::allocator_traits<A>::template rebind_alloc<char>;
 template<typename A> using string = std::basic_string<char, std::char_traits<char>, AllocChar<A>>;
 
+// random bit
+static std::independent_bits_engine<std::mt19937, 1, uint32_t>
+  random_bit(static_cast<uint32_t>(std::chrono::system_clock::now().time_since_epoch().count()));
+
+
 // utility function to hide unused compiler warning
 // usually has no additional cost
 template<typename T> void unused(T&&...) {}
diff --git a/quantiles/CMakeLists.txt b/quantiles/CMakeLists.txt
new file mode 100755
index 0000000..ca9b583
--- /dev/null
+++ b/quantiles/CMakeLists.txt
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_library(quantiles INTERFACE)
+
+add_library(${PROJECT_NAME}::QUANTILES ALIAS quantiles)
+
+if (BUILD_TESTS)
+  add_subdirectory(test)
+endif()
+
+target_include_directories(quantiles
+  INTERFACE
+    $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
+    $<INSTALL_INTERFACE:$<INSTALL_PREFIX>/include>
+)
+
+target_link_libraries(quantiles INTERFACE common)
+target_compile_features(quantiles INTERFACE cxx_std_11)
+
+install(TARGETS quantiles
+  EXPORT ${PROJECT_NAME}
+)
+
+install(FILES
+    include/quantiles_sketch.hpp
+    include/quantiles_sketch_impl.hpp
+    #include/quantiles_level.hpp
+    #include/quantiles_level_impl.hpp
+    include/quantiles_helper.hpp
+    include/quantiles_helper_impl.hpp
+    include/quantiles_quantile_calculator.hpp
+    include/quantiles_quantile_calculator_impl.hpp
+  DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/DataSketches")
diff --git a/quantiles/include/quantiles_sketch.hpp b/quantiles/include/quantiles_sketch.hpp
new file mode 100644
index 0000000..3587a94
--- /dev/null
+++ b/quantiles/include/quantiles_sketch.hpp
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _QUANTILES_SKETCH_HPP_
+#define _QUANTILES_SKETCH_HPP_
+
+#include <functional>
+#include <memory>
+#include <vector>
+
+#include "../../req/include/req_quantile_calculator.hpp"
+#include "common_defs.hpp"
+#include "serde.hpp"
+
+namespace datasketches {
+
+/**
+ * This is a stochastic streaming sketch that enables near-real time analysis of the
+ * approximate distribution of real values from a very large stream in a single pass.
+ * The analysis is obtained using a getQuantiles(*) function or its inverse functions the
+ * Probability Mass Function from getPMF(*) and the Cumulative Distribution Function from getCDF(*).
+ *
+ * <p>Consider a large stream of one million values such as packet sizes coming into a network node.
+ * The absolute rank of any specific size value is simply its index in the hypothetical sorted
+ * array of values.
+ * The normalized rank (or fractional rank) is the absolute rank divided by the stream size,
+ * in this case one million.
+ * The value corresponding to the normalized rank of 0.5 represents the 50th percentile or median
+ * value of the distribution, or getQuantile(0.5).  Similarly, the 95th percentile is obtained from
+ * getQuantile(0.95). Using the getQuantiles(0.0, 1.0) will return the min and max values seen by
+ * the sketch.</p>
+ *
+ * <p>From the min and max values, for example, 1 and 1000 bytes,
+ * you can obtain the PMF from getPMF(100, 500, 900) that will result in an array of
+ * 4 fractional values such as {.4, .3, .2, .1}, which means that
+ * <ul>
+ * <li>40% of the values were &lt; 100,</li>
+ * <li>30% of the values were &ge; 100 and &lt; 500,</li>
+ * <li>20% of the values were &ge; 500 and &lt; 900, and</li>
+ * <li>10% of the values were &ge; 900.</li>
+ * </ul>
+ * A frequency histogram can be obtained by simply multiplying these fractions by getN(),
+ * which is the total count of values received.
+ * The getCDF(*) works similarly, but produces the cumulative distribution instead.
+ *
+ * <p>As of November 2021, this implementation produces serialized sketches which are binary-compatible
+ * with the equivalent Java implementation only when template parameter T = double
+ * (64-bit double precision values).
+
+ * 
+ * <p>The accuracy of this sketch is a function of the configured value <i>k</i>, which also affects
+ * the overall size of the sketch. Accuracy of this quantile sketch is always with respect to
+ * the normalized rank.  A <i>k</i> of 128 produces a normalized, rank error of about 1.7%.
+ * For example, the median value returned from getQuantile(0.5) will be between the actual values
+ * from the hypothetically sorted array of input values at normalized ranks of 0.483 and 0.517, with
+ * a confidence of about 99%.</p>
+ *
+ * <pre>
+Table Guide for DoublesSketch Size in Bytes and Approximate Error:
+          K =&gt; |      16      32      64     128     256     512   1,024
+    ~ Error =&gt; | 12.145%  6.359%  3.317%  1.725%  0.894%  0.463%  0.239%
+             N | Size in Bytes -&gt;
+------------------------------------------------------------------------
+             0 |       8       8       8       8       8       8       8
+             1 |      72      72      72      72      72      72      72
+             3 |      72      72      72      72      72      72      72
+             7 |     104     104     104     104     104     104     104
+            15 |     168     168     168     168     168     168     168
+            31 |     296     296     296     296     296     296     296
+            63 |     424     552     552     552     552     552     552
+           127 |     552     808   1,064   1,064   1,064   1,064   1,064
+           255 |     680   1,064   1,576   2,088   2,088   2,088   2,088
+           511 |     808   1,320   2,088   3,112   4,136   4,136   4,136
+         1,023 |     936   1,576   2,600   4,136   6,184   8,232   8,232
+         2,047 |   1,064   1,832   3,112   5,160   8,232  12,328  16,424
+         4,095 |   1,192   2,088   3,624   6,184  10,280  16,424  24,616
+         8,191 |   1,320   2,344   4,136   7,208  12,328  20,520  32,808
+        16,383 |   1,448   2,600   4,648   8,232  14,376  24,616  41,000
+        32,767 |   1,576   2,856   5,160   9,256  16,424  28,712  49,192
+        65,535 |   1,704   3,112   5,672  10,280  18,472  32,808  57,384
+       131,071 |   1,832   3,368   6,184  11,304  20,520  36,904  65,576
+       262,143 |   1,960   3,624   6,696  12,328  22,568  41,000  73,768
+       524,287 |   2,088   3,880   7,208  13,352  24,616  45,096  81,960
+     1,048,575 |   2,216   4,136   7,720  14,376  26,664  49,192  90,152
+     2,097,151 |   2,344   4,392   8,232  15,400  28,712  53,288  98,344
+     4,194,303 |   2,472   4,648   8,744  16,424  30,760  57,384 106,536
+     8,388,607 |   2,600   4,904   9,256  17,448  32,808  61,480 114,728
+    16,777,215 |   2,728   5,160   9,768  18,472  34,856  65,576 122,920
+    33,554,431 |   2,856   5,416  10,280  19,496  36,904  69,672 131,112
+    67,108,863 |   2,984   5,672  10,792  20,520  38,952  73,768 139,304
+   134,217,727 |   3,112   5,928  11,304  21,544  41,000  77,864 147,496
+   268,435,455 |   3,240   6,184  11,816  22,568  43,048  81,960 155,688
+   536,870,911 |   3,368   6,440  12,328  23,592  45,096  86,056 163,880
+ 1,073,741,823 |   3,496   6,696  12,840  24,616  47,144  90,152 172,072
+ 2,147,483,647 |   3,624   6,952  13,352  25,640  49,192  94,248 180,264
+ 4,294,967,295 |   3,752   7,208  13,864  26,664  51,240  98,344 188,456
+
+ * </pre>
+
+ * <p>There is more documentation available on
+ * <a href="https://datasketches.apache.org">datasketches.apache.org</a>.</p>
+ *
+ * <p>This is an implementation of the Low Discrepancy Mergeable Quantiles Sketch, using double
+ * values, described in section 3.2 of the journal version of the paper "Mergeable Summaries"
+ * by Agarwal, Cormode, Huang, Phillips, Wei, and Yi.
+ * <a href="http://dblp.org/rec/html/journals/tods/AgarwalCHPWY13"></a></p>
+ *
+ * <p>This algorithm is independent of the distribution of values, which can be anywhere in the
+ * range of the IEEE-754 64-bit doubles.
+ *
+ * <p>This algorithm intentionally inserts randomness into the sampling process for values that
+ * ultimately get retained in the sketch. The results produced by this algorithm are not
+ * deterministic. For example, if the same stream is inserted into two different instances of this
+ * sketch, the answers obtained from the two sketches may not be be identical.</p>
+ *
+ * <p>Similarly, there may be directional inconsistencies. For example, the resulting array of
+ * values obtained from getQuantiles(fractions[]) input into the reverse directional query
+ * getPMF(splitPoints[]) may not result in the original fractional values.</p>
+ *
+ * @author Kevin Lang
+ * @author Lee Rhodes
+ * @author Alexander Saydakov
+ * @author Jon Malkin
+ */
+
+namespace quantiles_constants {
+  const uint16_t DEFAULT_K = 128;
+  const uint16_t MIN_K = 2;
+  const uint16_t MAX_K = 1 << 15;
+}
+
+template <typename T,
+          typename Comparator = std::less<T>,
+          typename SerDe = serde<T>,
+          typename Allocator = std::allocator<T>>
+class quantiles_sketch {
+public:
+  using C = Comparator;
+  using S = SerDe;
+  using A = Allocator;
+  using AllocDouble = typename std::allocator_traits<Allocator>::template rebind_alloc<double>;
+  using vector_double = std::vector<double, AllocDouble>;
+
+  explicit quantiles_sketch(uint16_t k = quantiles_constants::DEFAULT_K, const Allocator& allocator = Allocator());
+  quantiles_sketch(const quantiles_sketch& other);
+  quantiles_sketch(quantiles_sketch&& other) noexcept;
+  ~quantiles_sketch();
+  quantiles_sketch& operator=(const quantiles_sketch& other);
+  quantiles_sketch& operator=(quantiles_sketch&& other) noexcept;
+
+  /**
+   * Updates this sketch with the given data item.
+   * @param value an item from a stream of items
+   */
+  template<typename FwdT>
+  void update(FwdT&& value);
+
+  /**
+   * Merges another sketch into this one.
+   * @param other sketch to merge into this one
+   */
+  template<typename FwdSk>
+  void merge(FwdSk&& other);
+
+  /**
+   * Returns true if this sketch is empty.
+   * @return empty flag
+   */
+  bool is_empty() const;
+
+  /**
+   * Returns configured parameter k
+   * @return parameter k
+   */
+  uint16_t get_k() const;
+
+  /**
+   * Returns the length of the input stream.
+   * @return stream length
+   */
+  uint64_t get_n() const;
+
+  /**
+   * Returns the number of retained items (samples) in the sketch.
+   * @return the number of retained items
+   */
+  uint32_t get_num_retained() const;
+
+  /**
+   * Returns true if this sketch is in estimation mode.
+   * @return estimation mode flag
+   */
+  bool is_estimation_mode() const;
+
+  /**
+   * Returns the min value of the stream.
+   * For floating point types: if the sketch is empty this returns NaN.
+   * For other types: if the sketch is empty this throws runtime_error.
+   * @return the min value of the stream
+   */
+  const T& get_min_value() const;
+
+  /**
+   * Returns the max value of the stream.
+   * For floating point types: if the sketch is empty this returns NaN.
+   * For other types: if the sketch is empty this throws runtime_error.
+   * @return the max value of the stream
+   */
+  const T& get_max_value() const;
+
+  /**
+   * Returns an approximation to the value of the data item
+   * that would be preceded by the given fraction of a hypothetical sorted
+   * version of the input stream so far.
+   * <p>
+   * Note that this method has a fairly large overhead (microseconds instead of nanoseconds)
+   * so it should not be called multiple times to get different quantiles from the same
+   * sketch. Instead use get_quantiles(), which pays the overhead only once.
+   * <p>
+   * For floating point types: if the sketch is empty this returns NaN.
+   * For other types: if the sketch is empty this throws runtime_error.
+   *
+   * @param rank the specified fractional position in the hypothetical sorted stream.
+   * These are also called normalized ranks or fractional ranks.
+   * If rank = 0.0, the true minimum value of the stream is returned.
+   * If rank = 1.0, the true maximum value of the stream is returned.
+   *
+   * @return the approximation to the value at the given rank
+   */
+  template<bool inclusive = false>
+  const T& get_quantile(double rank) const;
+
+  /**
+   * This is a more efficient multiple-query version of get_quantile().
+   * <p>
+   * This returns an array that could have been generated by using get_quantile() for each
+   * fractional rank separately, but would be very inefficient.
+   * This method incurs the internal set-up overhead once and obtains multiple quantile values in
+   * a single query. It is strongly recommend that this method be used instead of multiple calls
+   * to get_quantile().
+   *
+   * <p>If the sketch is empty this returns an empty vector.
+   *
+   * @param fractions given array of fractional positions in the hypothetical sorted stream.
+   * These are also called normalized ranks or fractional ranks.
+   * These fractions must be in the interval [0.0, 1.0], inclusive.
+   *
+   * @return array of approximations to the given fractions in the same order as given fractions
+   * in the input array.
+   */
+  template<bool inclusive = false>
+  std::vector<T, Allocator> get_quantiles(const double* fractions, uint32_t size) const;
+
+  /**
+   * This is a multiple-query version of get_quantile() that allows the caller to
+   * specify the number of evenly-spaced fractional ranks.
+   *
+   * <p>If the sketch is empty this returns an empty vector.
+   *
+   * @param num an integer that specifies the number of evenly-spaced fractional ranks.
+   * This must be an integer greater than 0. A value of 1 will return the min value.
+   * A value of 2 will return the min and the max value. A value of 3 will return the min,
+   * the median and the max value, etc.
+   *
+   * @return array of approximations to the given number of evenly-spaced fractional ranks.
+   */
+  template<bool inclusive = false>
+  std::vector<T, Allocator> get_quantiles(uint32_t num) const;
+
+  /**
+   * Returns an approximation to the normalized (fractional) rank of the given value from 0 to 1,
+   * inclusive.
+   *
+   * <p>The resulting approximation has a probabilistic guarantee that can be obtained from the
+   * get_normalized_rank_error(false) function.
+   *
+   * <p>If the sketch is empty this returns NaN.
+   *
+   * @param value to be ranked
+   * @return an approximate rank of the given value
+   */
+  template<bool inclusive = false>
+  double get_rank(const T& value) const;
+
+  /**
+   * Returns an approximation to the Probability Mass Function (PMF) of the input stream
+   * given a set of split points (values).
+   *
+   * <p>The resulting approximations have a probabilistic guarantee that can be obtained from the
+   * get_normalized_rank_error(true) function.
+   *
+   * <p>If the sketch is empty this returns an empty vector.
+   *
+   * @param split_points an array of <i>m</i> unique, monotonically increasing values
+   * that divide the input domain into <i>m+1</i> consecutive disjoint intervals.
+   * The definition of an "interval" is inclusive of the left split point (or minimum value) and
+   * exclusive of the right split point, with the exception that the last interval will include
+   * the maximum value.
+   * It is not necessary to include either the min or max values in these split points.
+   *
+   * @return an array of m+1 doubles each of which is an approximation
+   * to the fraction of the input stream values (the mass) that fall into one of those intervals.
+   * The definition of an "interval" is inclusive of the left split point and exclusive of the right
+   * split point, with the exception that the last interval will include maximum value.
+   */
+  template<bool inclusive = false>
+  vector_double get_PMF(const T* split_points, uint32_t size) const;
+
+  /**
+   * Returns an approximation to the Cumulative Distribution Function (CDF), which is the
+   * cumulative analog of the PMF, of the input stream given a set of split points (values).
+   *
+   * <p>The resulting approximations have a probabilistic guarantee that can be obtained from the
+   * get_normalized_rank_error(false) function.
+   *
+   * <p>If the sketch is empty this returns an empty vector.
+   *
+   * @param split_points an array of <i>m</i> unique, monotonically increasing values
+   * that divide the input domain into <i>m+1</i> consecutive disjoint intervals.
+   * The definition of an "interval" is inclusive of the left split point (or minimum value) and
+   * exclusive of the right split point, with the exception that the last interval will include
+   * the maximum value.
+   * It is not necessary to include either the min or max values in these split points.
+   *
+   * @return an array of m+1 double values, which are a consecutive approximation to the CDF
+   * of the input stream given the split_points. The value at array position j of the returned
+   * CDF array is the sum of the returned values in positions 0 through j of the returned PMF
+   * array.
+   */
+  template<bool inclusive = false>
+  vector_double get_CDF(const T* split_points, uint32_t size) const;
+
+  /**
+   * Computes size needed to serialize the current state of the sketch.
+   * This version is for fixed-size arithmetic types (integral and floating point).
+   * @return size in bytes needed to serialize this sketch
+   */
+  template<typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
+  size_t get_serialized_size_bytes() const;
+
+  /**
+   * Computes size needed to serialize the current state of the sketch.
+   * This version is for all other types and can be expensive since every item needs to be looked at.
+   * @return size in bytes needed to serialize this sketch
+   */
+  template<typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
+  size_t get_serialized_size_bytes() const;
+
+  /**
+   * This method serializes the sketch into a given stream in a binary form
+   * @param os output stream
+   */
+  void serialize(std::ostream& os) const;
+
+  // This is a convenience alias for users
+  // The type returned by the following serialize method
+  using vector_bytes = std::vector<uint8_t, typename std::allocator_traits<Allocator>::template rebind_alloc<uint8_t>>;
+
+  /**
+   * This method serializes the sketch as a vector of bytes.
+   * An optional header can be reserved in front of the sketch.
+   * It is a blank space of a given size.
+   * This header is used in Datasketches PostgreSQL extension.
+   * @param header_size_bytes space to reserve in front of the sketch
+   */
+  vector_bytes serialize(unsigned header_size_bytes = 0) const;
+
+  /**
+   * This method deserializes a sketch from a given stream.
+   * @param is input stream
+   * @return an instance of a sketch
+   */
+  static quantiles_sketch<T, C, S, A> deserialize(std::istream& is, const A& allocator = A());
+
+  /**
+   * This method deserializes a sketch from a given array of bytes.
+   * @param bytes pointer to the array of bytes
+   * @param size the size of the array
+   * @return an instance of a sketch
+   */
+  static quantiles_sketch<T, C, S, A> deserialize(const void* bytes, size_t size, const A& allocator = A());
+
+  /**
+   * Gets the normalized rank error for this sketch. Constants were derived as the best fit to 99 percentile
+   * empirically measured max error in thousands of trials.
+   * @param is_pmf if true, returns the "double-sided" normalized rank error for the get_PMF() function.
+   *               Otherwise, it is the "single-sided" normalized rank error for all the other queries.
+   * @return the normalized rank error for the sketch
+   */
+  double get_normalized_rank_error(bool is_pmf) const;
+
+  /**
+   * Gets the normalized rank error given k and pmf. Constants were derived as the best fit to 99 percentile
+   * empirically measured max error in thousands of trials.
+   * @param k  the configuration parameter
+   * @param is_pmf if true, returns the "double-sided" normalized rank error for the get_PMF() function.
+   *               Otherwise, it is the "single-sided" normalized rank error for all the other queries.
+   * @return the normalized rank error for the given parameters
+   */
+  static double get_normalized_rank_error(uint16_t k, bool is_pmf);
+
+  /**
+   * Prints a summary of the sketch.
+   * @param print_levels if true include information about levels
+   * @param print_items if true include sketch data
+   */
+  string<A> to_string(bool print_levels = false, bool print_items = false) const;
+
+  class const_iterator;
+  const_iterator begin() const;
+  const_iterator end() const;
+
+private:
+  using Level = std::vector<T, Allocator>;
+  using AllocLevel = typename std::allocator_traits<Allocator>::template rebind_alloc<Level>;
+
+  // TODO: FIX THIS!
+  /* Serialized sketch layout:
+   *  Adr:
+   *      ||    7    |   6   |    5   |    4   |    3   |    2    |    1   |      0       |
+   *  0   || unused  |   M   |--------K--------|  Flags |  FamID  | SerVer | PreambleInts |
+   *      ||   15    |   14  |   13   |   12   |   11   |   10    |    9   |      8       |
+   *  1   ||-----------------------------------N------------------------------------------|
+   *      ||   23    |   22  |   21   |   20   |   19   |    18   |   17   |      16      |
+   *  2   ||---------------data----------------|-unused-|numLevels|-------min K-----------|
+   */
+
+  static const size_t EMPTY_SIZE_BYTES = 8;
+  static const size_t DATA_START_SINGLE_ITEM = 8;
+  static const size_t DATA_START = 20;
+
+  static const uint8_t SERIAL_VERSION_2 = 1;
+  static const uint8_t SERIAL_VERSION_3 = 2;
+  static const uint8_t FAMILY = 15;
+
+  enum flags { IS_EMPTY, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM };
+
+  static const uint8_t PREAMBLE_LONGS_SHORT = 1; // for empty and single item
+  static const uint8_t PREAMBLE_LONGS_FULL = 2;
+
+  Allocator allocator_;
+  uint16_t k_;
+  uint64_t n_;
+  uint64_t bit_pattern_;
+  Level base_buffer_;
+  std::vector<Level, AllocLevel> levels_;
+  T* min_value_;
+  T* max_value_;
+
+  using QuantileCalculator = req_quantile_calculator<T, Comparator, Allocator>;
+  using AllocCalc = typename std::allocator_traits<Allocator>::template rebind_alloc<QuantileCalculator>;
+  class calculator_deleter;
+  using QuantileCalculatorPtr = typename std::unique_ptr<QuantileCalculator, calculator_deleter>;
+  template<bool inclusive>
+  QuantileCalculatorPtr get_quantile_calculator() const;
+
+  // for deserialization
+  class item_deleter;
+  class items_deleter;
+  quantiles_sketch(uint16_t k, uint16_t base_buffer_count, uint32_t items_size, uint64_t n, uint8_t bit_pattern,
+      std::unique_ptr<T, items_deleter> items, std::unique_ptr<T, item_deleter> min_value,
+      std::unique_ptr<T, item_deleter> max_value);
+
+  void grow_base_buffer();
+  void process_full_base_buffer();
+
+  // returns true if size adjusted, else false
+  bool grow_levels_if_needed();
+
+  // buffers should be pre-sized to target capacity as appropriate
+  static void in_place_propagate_carry(uint8_t starting_level, Level& buf_size_k,
+                                       Level& buf_size_2k, bool apply_as_update,
+                                       quantiles_sketch<T,C,S,A>& sketch);
+  static void zip_buffer(Level& buf_in, Level& buf_out);
+  static void merge_two_size_k_buffers(Level& arr_in_1, Level& arr_in_2, Level& arr_out);
+
+  vector_double get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const;
+
+  static void check_preamble_longs(uint8_t preamble_ints, uint8_t flags_byte);
+  static void check_serial_version(uint8_t serial_version);
+  static void check_family_id(uint8_t family_id);
+
+  static uint32_t compute_retained_items(uint16_t k, uint64_t n);
+  static uint32_t compute_base_buffer_items(uint16_t k, uint64_t n);
+  static uint64_t compute_bit_pattern(uint16_t k, uint64_t n);
+  static uint32_t compute_valid_levels(uint64_t bit_pattern);
+  static uint8_t compute_levels_needed(uint16_t k, uint64_t n);
+
+  /**
+   * Returns the zero-based bit position of the lowest zero bit of <i>bits</i> starting at
+   * <i>startingBit</i>. If input is all ones, this returns 64.
+   * @param bits the input bits as a long
+   * @param starting_bit the zero-based starting bit position. Only the low 6 bits are used.
+   * @return the zero-based bit position of the lowest zero bit starting at <i>startingBit</i>.
+   */
+  static int lowest_zero_bit_starting_at(uint64_t bits, uint8_t starting_bit);
+
+  // implementations for floating point types
+  template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
+  static const TT& get_invalid_value() {
+    static TT value = std::numeric_limits<TT>::quiet_NaN();
+    return value;
+  }
+
+  template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
+  static inline bool check_update_value(TT value) {
+    return !std::isnan(value);
+  }
+
+  template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
+  static inline void check_split_points(const T* values, uint32_t size) {
+    for (uint32_t i = 0; i < size ; i++) {
+      if (std::isnan(values[i])) {
+        throw std::invalid_argument("Values must not be NaN");
+      }
+      if ((i < (size - 1)) && !(Comparator()(values[i], values[i + 1]))) {
+        throw std::invalid_argument("Values must be unique and monotonically increasing");
+      }
+    }
+  }
+
+  // implementations for all other types
+  template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
+  static const TT& get_invalid_value() {
+    throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values");
+  }
+
+  template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
+  static inline bool check_update_value(TT) {
+    return true;
+  }
+
+  template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
+  static inline void check_split_points(const T* values, uint32_t size) {
+    for (uint32_t i = 0; i < size ; i++) {
+      if ((i < (size - 1)) && !(Comparator()(values[i], values[i + 1]))) {
+        throw std::invalid_argument("Values must be unique and monotonically increasing");
+      }
+    }
+  }
+};
+
+
+template<typename T, typename C, typename S, typename A>
+class quantiles_sketch<T, C, S, A>::const_iterator: public std::iterator<std::input_iterator_tag, T> {
+public:
+  const_iterator& operator++();
+  const_iterator& operator++(int);
+  bool operator==(const const_iterator& other) const;
+  bool operator!=(const const_iterator& other) const;
+  std::pair<const T&, const uint64_t> operator*() const;
+private:
+  friend class quantiles_sketch<T, C, S, A>;
+  using Level = std::vector<T, A>;
+  using AllocLevel = typename std::allocator_traits<A>::template rebind_alloc<Level>;
+  Level base_buffer_;
+  std::vector<Level, AllocLevel> levels_;
+  int level_;
+  uint32_t index_;
+  uint16_t k_;
+  uint16_t bb_count_;
+  uint64_t bit_pattern_;
+  uint64_t weight_;
+  const_iterator(const Level& base_buffer, const std::vector<Level, AllocLevel>& levels, uint16_t k, uint64_t n, bool is_end);
+};
+
+} /* namespace datasketches */
+
+#include "quantiles_sketch_impl.hpp"
+
+#endif // _QUANTILES_SKETCH_HPP_
diff --git a/quantiles/include/quantiles_sketch_impl.hpp b/quantiles/include/quantiles_sketch_impl.hpp
new file mode 100644
index 0000000..5e1bbfc
--- /dev/null
+++ b/quantiles/include/quantiles_sketch_impl.hpp
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _QUANTILES_SKETCH_IMPL_HPP_
+#define _QUANTILES_SKETCH_IMPL_HPP_
+
+#include <cmath>
+#include <algorithm>
+#include <stdexcept>
+
+#include "common_defs.hpp"
+#include "count_zeros.hpp"
+#include "quantiles_sketch.hpp"
+
+namespace datasketches {
+
+template<typename T, typename C, typename S, typename A>
+quantiles_sketch<T, C, S, A>::quantiles_sketch(uint16_t k, const A& allocator):
+allocator_(allocator),
+k_(k),
+n_(0),
+bit_pattern_(0),
+base_buffer_(allocator_),
+levels_(allocator_),
+min_value_(nullptr),
+max_value_(nullptr)
+{
+  if (k < quantiles_constants::MIN_K || k > quantiles_constants::MAX_K) {
+    throw std::invalid_argument("K must be >= " + std::to_string(quantiles_constants::MIN_K) + " and <= " + std::to_string(quantiles_constants::MAX_K) + ": " + std::to_string(k));
+  }
+  base_buffer_.reserve(2 * std::min(quantiles_constants::MIN_K, k));
+}
+
+template<typename T, typename C, typename S, typename A>
+quantiles_sketch<T, C, S, A>::quantiles_sketch(const quantiles_sketch& other):
+allocator_(other.allocator_),
+k_(other.k_),
+n_(other.n_),
+bit_pattern_(other.bit_pattern_),
+base_buffer_(other.base_buffer_),
+levels_(other.levels_),
+min_value_(nullptr),
+max_value_(nullptr)
+{
+  if (other.min_value_ != nullptr) min_value_ = new (allocator_.allocate(1)) T(*other.min_value_);
+  if (other.max_value_ != nullptr) max_value_ = new (allocator_.allocate(1)) T(*other.max_value_);
+}
+
+template<typename T, typename C, typename S, typename A>
+quantiles_sketch<T, C, S, A>::quantiles_sketch(quantiles_sketch&& other) noexcept:
+allocator_(other.allocator_),
+k_(other.k_),
+n_(other.n_),
+bit_pattern_(other.bit_pattern_),
+base_buffer_(std::move(other.base_buffer_)),
+levels_(std::move(other.levels_)),
+min_value_(other.min_value_),
+max_value_(other.max_value_)
+{
+  other.min_value_ = nullptr;
+  other.max_value_ = nullptr;
+}
+
+template<typename T, typename C, typename S, typename A>
+quantiles_sketch<T, C, S, A>& quantiles_sketch<T, C, S, A>::operator=(const quantiles_sketch& other) {
+  quantiles_sketch<T, C, S, A> copy(other);
+  std::swap(allocator_, copy.allocator_);
+  std::swap(k_, copy.k_);
+  std::swap(n_, copy.n_);
+  std::swap(bit_pattern_, copy.bit_pattern_);
+  std::swap(base_buffer_, copy.base_buffer_);
+  std::swap(levels_, copy.levels_);
+  std::swap(min_value_, copy.min_value_);
+  std::swap(max_value_, copy.max_value_);
+  return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+quantiles_sketch<T, C, S, A>& quantiles_sketch<T, C, S, A>::operator=(quantiles_sketch&& other) noexcept {
+  std::swap(allocator_, other.allocator_);
+  std::swap(k_, other.k_);
+  std::swap(n_, other.n_);
+  std::swap(bit_pattern_, other.bit_pattern_);
+  std::swap(base_buffer_, other.base_buffer_);
+  std::swap(levels_, other.levels_);
+  std::swap(min_value_, other.min_value_);
+  std::swap(max_value_, other.max_value_);
+  return *this;
+}
+
+
+template<typename T, typename C, typename S, typename A>
+quantiles_sketch<T, C, S, A>::~quantiles_sketch() {
+  if (min_value_ != nullptr) {
+    min_value_->~T();
+    allocator_.deallocate(min_value_, 1);
+  }
+  if (max_value_ != nullptr) {
+    max_value_->~T();
+    allocator_.deallocate(max_value_, 1);
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+template<typename FwdT>
+void quantiles_sketch<T, C, S, A>::update(FwdT&& item) {
+  if (!check_update_value(item)) { return; }
+  if (is_empty()) {
+    min_value_ = new (allocator_.allocate(1)) T(item);
+    max_value_ = new (allocator_.allocate(1)) T(item);
+  } else {
+    if (C()(item, *min_value_)) *min_value_ = item;
+    if (C()(*max_value_, item)) *max_value_ = item;
+  }
+
+  // if exceed capacity, grow until size 2k -- assumes eager processing
+  if (base_buffer_.size() + 1 > base_buffer_.capacity())
+    grow_base_buffer();
+
+  base_buffer_.push_back(std::forward<FwdT>(item));
+  ++n_;
+  
+  if (base_buffer_.size() == 2 * k_)
+    process_full_base_buffer();
+}
+
+template<typename T, typename C, typename S, typename A>
+string<A> quantiles_sketch<T, C, S, A>::to_string(bool print_levels, bool print_items) const {
+  // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
+  // The stream does not support passing an allocator instance, and alternatives are complicated.
+  std::ostringstream os;
+  os << "### Quantiles Sketch summary:" << std::endl;
+  os << "   K              : " << k_ << std::endl;
+  os << "   N              : " << n_ << std::endl;
+  os << "   Epsilon        : " << std::setprecision(3) << get_normalized_rank_error(false) * 100 << "%" << std::endl;
+  os << "   Epsilon PMF    : " << get_normalized_rank_error(true) * 100 << "%" << std::endl;
+  os << "   Empty          : " << (is_empty() ? "true" : "false") << std::endl;
+  os << "   Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
+  os << "   Levels (w/o BB): " << levels_.size() << std::endl;
+  os << "   Used Levels    : " << compute_valid_levels(bit_pattern_) << std::endl;
+  os << "   Retained items : " << get_num_retained() << std::endl;
+  os << "   Storage bytes  : " << get_serialized_size_bytes() << std::endl;
+  if (!is_empty()) {
+    os << "   Min value      : " << *min_value_ << std::endl;
+    os << "   Max value      : " << *max_value_ << std::endl;
+  }
+  os << "### End sketch summary" << std::endl;
+
+  if (print_levels) {
+    os << "### Quantiles Sketch levels:" << std::endl;
+    os << "   index: items in use" << std::endl;
+    os << "   BB: " << base_buffer_.size() << std::endl;
+    for (uint8_t i = 0; i < levels_.size(); i++) {
+      os << "   " << static_cast<unsigned int>(i) << ": " << levels_[i].size() << std::endl;
+    }
+    os << "### End sketch levels" << std::endl;
+  }
+
+  if (print_items) {
+    os << "### Quantiles Sketch data:" << std::endl;
+    uint8_t level = 0;
+    os << " BB:" << std::endl;
+    for (const T& item : base_buffer_) {
+      os << "    " << std::to_string(item) << std::endl;
+    }
+    for (uint8_t i = 0; i < levels_.size(); ++i) {
+      os << " level " << static_cast<unsigned int>(level) << ":" << std::endl;
+      for (const T& item : levels_[i]) {
+        os << "   " << std::to_string(item) << std::endl;
+      }
+    }
+    os << "### End sketch data" << std::endl;
+  }
+  return string<A>(os.str().c_str(), allocator_);
+}
+
+template<typename T, typename C, typename S, typename A>
+uint16_t quantiles_sketch<T, C, S, A>::get_k() const {
+  return k_;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint64_t quantiles_sketch<T, C, S, A>::get_n() const {
+  return n_;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool quantiles_sketch<T, C, S, A>::is_empty() const {
+  return n_ == 0;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool quantiles_sketch<T, C, S, A>::is_estimation_mode() const {
+  return bit_pattern_ != 0;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t quantiles_sketch<T, C, S, A>::get_num_retained() const {
+  return compute_retained_items(k_, n_);
+}
+
+template<typename T, typename C, typename S, typename A>
+const T& quantiles_sketch<T, C, S, A>::get_min_value() const {
+  if (is_empty()) return get_invalid_value();
+  return *min_value_;
+}
+
+template<typename T, typename C, typename S, typename A>
+const T& quantiles_sketch<T, C, S, A>::get_max_value() const {
+  if (is_empty()) return get_invalid_value();
+  return *max_value_;
+}
+
+// implementation for fixed-size arithmetic types (integral and floating point)
+template<typename T, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
+size_t quantiles_sketch<T, C, S, A>::get_serialized_size_bytes() const {
+  if (is_empty()) { return EMPTY_SIZE_BYTES; }
+  return DATA_START + ((get_num_retained() + 2) * sizeof(TT));
+}
+
+// implementation for all other types
+template<typename T, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
+size_t quantiles_sketch<T, C, S, A>::get_serialized_size_bytes() const {
+  if (is_empty()) { return EMPTY_SIZE_BYTES; }
+  size_t size = DATA_START;
+  size += S().size_of_item(*min_value_);
+  size += S().size_of_item(*max_value_);
+  for (auto it: *this) size += S().size_of_item(it.first);
+  return size;
+}
+
+template<typename T, typename C, typename S, typename A>
+double quantiles_sketch<T, C, S, A>::get_normalized_rank_error(bool is_pmf) const {
+  return get_normalized_rank_error(k_, is_pmf);
+}
+
+template<typename T, typename C, typename S, typename A>
+double quantiles_sketch<T, C, S, A>::get_normalized_rank_error(uint16_t k, bool is_pmf) {
+  return is_pmf
+      ? 1.854 / std::pow(k, 0.9657)
+      : 1.576 / std::pow(k, 0.9726);
+}
+
+template<typename T, typename C, typename S, typename A>
+class quantiles_sketch<T, C, S, A>::calculator_deleter {
+  public:
+  calculator_deleter(const AllocCalc& allocator): allocator_(allocator) {}
+  void operator() (QuantileCalculator* ptr) {
+    if (ptr != nullptr) {
+      ptr->~QuantileCalculator();
+      allocator_.deallocate(ptr, 1);
+    }
+  }
+  private:
+  AllocCalc allocator_;
+};
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+auto quantiles_sketch<T, C, S, A>::get_quantile_calculator() const -> QuantileCalculatorPtr {
+  // allow side effect of sorting the base buffer
+  std::sort(const_cast<Level&>(base_buffer_).begin(), const_cast<Level&>(base_buffer_).end(), C());
+
+  AllocCalc ac(allocator_);
+  QuantileCalculatorPtr quantile_calculator(
+    new (ac.allocate(1)) req_quantile_calculator<T, C, A>(n_, ac),
+    calculator_deleter(ac)
+  );
+
+  uint64_t lg_weight = 0;
+  quantile_calculator->add(base_buffer_.data(), base_buffer_.data() + base_buffer_.size(), lg_weight);
+  for (auto& level : levels_) {
+    ++lg_weight;
+    if (level.empty()) { continue; }
+    quantile_calculator->add(level.data(), level.data() + k_, lg_weight);
+  }
+  quantile_calculator->template convert_to_cummulative<inclusive>();
+  return quantile_calculator;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+const T& quantiles_sketch<T, C, S, A>::get_quantile(double rank) const {
+  if (is_empty()) return get_invalid_value();
+  if (rank == 0.0) return *min_value_;
+  if (rank == 1.0) return *max_value_;
+  if ((rank < 0.0) || (rank > 1.0)) {
+    throw std::invalid_argument("Rank cannot be less than zero or greater than 1.0");
+  }
+  return *(get_quantile_calculator<inclusive>()->get_quantile(rank));
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+std::vector<T, A> quantiles_sketch<T, C, S, A>::get_quantiles(const double* ranks, uint32_t size) const {
+  std::vector<T, A> quantiles(allocator_);
+  if (is_empty()) return quantiles;
+  QuantileCalculatorPtr quantile_calculator(nullptr, calculator_deleter(allocator_));
+  quantiles.reserve(size);
+  for (uint32_t i = 0; i < size; ++i) {
+    const double rank = ranks[i];
+    if ((rank < 0.0) || (rank > 1.0)) {
+      throw std::invalid_argument("rank cannot be less than zero or greater than 1.0");
+    }
+    if      (rank == 0.0) quantiles.push_back(*min_value_);
+    else if (rank == 1.0) quantiles.push_back(*max_value_);
+    else {
+      if (!quantile_calculator) {
+        // has side effect of sorting level zero if needed
+        quantile_calculator = const_cast<quantiles_sketch*>(this)->get_quantile_calculator<inclusive>();
+      }
+      quantiles.push_back(*(quantile_calculator->get_quantile(rank)));
+    }
+  }
+  return quantiles;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+std::vector<T, A> quantiles_sketch<T, C, S, A>::get_quantiles(uint32_t num) const {
+  if (is_empty()) return std::vector<T, A>(allocator_);
+  if (num == 0) {
+    throw std::invalid_argument("num must be > 0");
+  }
+  vector_double fractions(num, 0, allocator_);
+  fractions[0] = 0.0;
+  for (size_t i = 1; i < num; i++) {
+    fractions[i] = static_cast<double>(i) / (num - 1);
+  }
+  if (num > 1) {
+    fractions[num - 1] = 1.0;
+  }
+  return get_quantiles<inclusive>(fractions.data(), num);
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+double quantiles_sketch<T, C, S, A>::get_rank(const T& value) const {
+  if (is_empty()) return std::numeric_limits<double>::quiet_NaN();
+  uint64_t weight = 1;
+  uint64_t total = 0;
+  if (inclusive) {
+    for (const T &item: base_buffer_) {
+      if (!C()(value, item)) {
+        total += weight;
+      }
+    }
+  } else {
+    for (const T &item: base_buffer_) {
+      if (C()(item, value)) {
+        total += weight;
+      }
+    }
+  }
+
+  weight *= 2;
+  for (uint8_t level = 0; level < levels_.size(); ++level, weight *= 2) {
+    if (levels_[level].empty()) { continue; }
+    const T* data = levels_[level].data();
+    if (inclusive) {
+      for (uint16_t i = 0; i < k_; ++i) {
+        if (C()(value, data[i]))
+          break; // levels are sorted, no point comparing further
+        else
+          total += weight;
+      }
+    } else {
+      for (uint16_t i = 0; i < k_; ++i) {
+        if (C()(data[i], value))
+          total += weight;
+        else
+          break; // levels are sorted, no point comparing further
+      }
+    }
+  }
+  return (double) total / n_;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+auto quantiles_sketch<T, C, S, A>::get_PMF(const T* split_points, uint32_t size) const -> vector_double {
+  auto buckets = get_CDF<inclusive>(split_points, size);
+  if (is_empty()) return buckets;
+  for (uint32_t i = size; i > 0; --i) {
+    buckets[i] -= buckets[i - 1];
+  }
+  return buckets;
+}
+
+template<typename T, typename C, typename S, typename A>
+template<bool inclusive>
+auto quantiles_sketch<T, C, S, A>::get_CDF(const T* split_points, uint32_t size) const -> vector_double {
+  vector_double buckets(allocator_);
+  if (is_empty()) return buckets;
+  check_split_points(split_points, size);
+  buckets.reserve(size + 1);
+  for (uint32_t i = 0; i < size; ++i) buckets.push_back(get_rank<inclusive>(split_points[i]));
+  buckets.push_back(1);
+  return buckets;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t quantiles_sketch<T, C, S, A>::compute_retained_items(const uint16_t k, const uint64_t n) {
+  uint32_t bb_count = compute_base_buffer_items(k, n);
+  uint64_t bit_pattern = compute_bit_pattern(k, n);
+  uint32_t valid_levels = compute_valid_levels(bit_pattern);
+  return bb_count + (k * valid_levels);
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t quantiles_sketch<T, C, S, A>::compute_base_buffer_items(const uint16_t k, const uint64_t n) {
+  return n % (static_cast<uint64_t>(2) * k);
+}
+
+template<typename T, typename C, typename S, typename A>
+uint64_t quantiles_sketch<T, C, S, A>::compute_bit_pattern(const uint16_t k, const uint64_t n) {
+  return n / (static_cast<uint64_t>(2) * k);
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t quantiles_sketch<T, C, S, A>::compute_valid_levels(const uint64_t bit_pattern) {
+  // TODO: Java's Long.bitCount() probably uses a better method
+  uint64_t bp = bit_pattern;
+  uint32_t count = 0;
+  while (bp > 0) {
+    if ((bp & 0x01) == 1) ++count;
+    bp >>= 1;
+  }
+  return count;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint8_t quantiles_sketch<T, C, S, A>::compute_levels_needed(const uint16_t k, const uint64_t n) {
+  return static_cast<uint8_t>(64U) - count_leading_zeros_in_u64(n / (2 * k));
+}
+
+
+template <typename T, typename C, typename S, typename A>
+typename quantiles_sketch<T, C, S, A>::const_iterator quantiles_sketch<T, C, S, A>::begin() const {
+  return quantiles_sketch<T, C, S, A>::const_iterator(base_buffer_, levels_, k_, n_, false);
+}
+
+template <typename T, typename C, typename S, typename A>
+typename quantiles_sketch<T, C, S, A>::const_iterator quantiles_sketch<T, C, S, A>::end() const {
+  return quantiles_sketch<T, C, S, A>::const_iterator(base_buffer_, levels_, k_, n_, true);
+}
+
+template<typename T, typename C, typename S, typename A>
+void quantiles_sketch<T, C, S, A>::grow_base_buffer() {
+  size_t new_size = std::max(std::min(static_cast<size_t>(2 * k_), 2 * base_buffer_.size()), static_cast<size_t>(1));
+  base_buffer_.reserve(new_size);
+}
+
+template<typename T, typename C, typename S, typename A>
+void quantiles_sketch<T, C, S, A>::process_full_base_buffer() {
+  // make sure there will be enough levels for the propagation
+  grow_levels_if_needed(); // note: n_ was already incremented by update() before this
+
+  std::sort(base_buffer_.begin(), base_buffer_.end(), C());
+  in_place_propagate_carry(0,
+                           levels_[0], // unused here, but 0 is guaranted to exist
+                           base_buffer_,
+                           true, *this);
+  base_buffer_.clear();                       
+  assert(n_ / (2 * k_) == bit_pattern_);  // internal consistency check
+}
+
+template<typename T, typename C, typename S, typename A>
+bool quantiles_sketch<T, C, S, A>::grow_levels_if_needed() {
+  uint8_t levels_needed = compute_levels_needed(k_, n_);
+  if (levels_needed == 0)
+    return false; // don't need levels and might have small base buffer. Possible during merges.
+
+  // from here on, assume full size base buffer (2k) and at least one additional level
+  if (levels_needed <= levels_.size())
+    return false;
+
+  Level empty_level(allocator_);
+  empty_level.reserve(k_);
+  levels_.push_back(std::move(empty_level));
+  return true;
+}
+
+template<typename T, typename C, typename S, typename A>
+void quantiles_sketch<T, C, S, A>::in_place_propagate_carry(uint8_t starting_level,
+                                                            Level& buf_size_k, Level& buf_size_2k, 
+                                                            bool apply_as_update,
+                                                            quantiles_sketch<T,C,S,A>& sketch) {
+  const uint64_t bit_pattern = sketch.bit_pattern_;
+  const int k = sketch.k_;
+
+  uint8_t ending_level = lowest_zero_bit_starting_at(bit_pattern, starting_level);
+
+  if (apply_as_update) {
+    // update version of computation
+    // its is okay for buf_size_k to be null in this case
+    zip_buffer(buf_size_2k, sketch.levels_[ending_level]);
+  } else {
+    // merge_into version of computation
+    std::move(&buf_size_k[0], &buf_size_k[0] + k, &sketch.levels_[ending_level][0]);
+  }
+
+  for (uint64_t lvl = starting_level; lvl < ending_level; lvl++) {
+    assert((bit_pattern & (static_cast<uint64_t>(1) << lvl)) > 0); // internal consistency check
+    merge_two_size_k_buffers(
+        sketch.levels_[lvl],
+        sketch.levels_[ending_level],
+        buf_size_2k);
+    sketch.levels_[lvl].clear();
+    sketch.levels_[ending_level].clear();
+    zip_buffer(buf_size_2k, sketch.levels_[ending_level]);
+    // to release the discarded objects
+    //Arrays.fill(levelsArr, (2 + lvl) * k, (2 + lvl + 1) * k, null);
+    sketch.levels_[lvl].clear();
+  } // end of loop over lower levels
+
+  // update bit pattern with binary-arithmetic ripple carry
+  sketch.bit_pattern_ = bit_pattern + (static_cast<uint64_t>(1) << starting_level);
+}
+
+template<typename T, typename C, typename S, typename A>
+void quantiles_sketch<T, C, S, A>::zip_buffer(Level& buf_in, Level& buf_out) {
+#ifdef QUANTILES_VALIDATION
+  static uint32_t next_offset = 0;
+  uint32_t rand_offset = next_offset;
+  next_offset = 1 - next_offset;
+#else
+  uint32_t rand_offset = random_bit();
+#endif
+  assert(buf_in.size() == 2 * buf_out.capacity());
+  size_t k = buf_out.capacity();
+  for (uint32_t i = rand_offset, o = 0; o < k; i += 2, ++o) {
+    buf_out.push_back(std::move(buf_in[i]));
+  }
+  buf_in.clear();
+}
+
+template<typename T, typename C, typename S, typename A>
+void quantiles_sketch<T, C, S, A>::merge_two_size_k_buffers(Level& src_1, Level& src_2, Level& dst) {
+  assert(src_1.size() == src_2.size());
+  assert(src_1.size() * 2 == dst.capacity());
+
+  auto end1 = src_1.end(), end2 = src_2.end();
+  auto it1 = src_1.begin(), it2 = src_2.begin();
+  
+  // TODO: probably actually doing copies given Level&?
+  while (it1 != end1 && it2 != end2) {
+    if (C()(*it1, *it2) < 0) {
+      dst.push_back(std::move(*it2++));
+    } else {
+      dst.push_back(std::move(*it1++));
+    }
+  }
+
+  if (it1 != end1) {
+    dst.insert(dst.end(), it1, end1);
+  } else {
+    assert(it2 != end2);
+    dst.insert(dst.end(), it2, end2);
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+int quantiles_sketch<T, C, S, A>::lowest_zero_bit_starting_at(uint64_t bits, uint8_t starting_bit) {
+  uint8_t pos = starting_bit & 0X3F;
+  uint64_t my_bits = bits >> pos;
+
+  while ((my_bits & static_cast<uint64_t>(1)) != 0) {
+    my_bits >>= 1;
+    pos++;
+  }
+  return pos;
+}
+
+
+
+// quantiles_sketch::const_iterator implementation
+
+template<typename T, typename C, typename S, typename A>
+quantiles_sketch<T, C, S, A>::const_iterator::const_iterator(const Level& base_buffer,
+                                                             const std::vector<Level, AllocLevel>& levels,
+                                                             uint16_t k,
+                                                             uint64_t n,
+                                                             bool is_end):
+base_buffer_(base_buffer),
+levels_(levels),
+level_(-1),
+index_(0),
+k_(k),
+bb_count_(compute_base_buffer_items(k, n)),
+bit_pattern_(compute_bit_pattern(k, n)),
+weight_(1)
+{
+  if (is_end) {
+    // if exact mode: index_ = n is end
+    // if sampling, level_ = max_level + 1 and index_ = 0 is end
+    if (bit_pattern_ == 0) // only a valid check for exact mode in constructor
+      index_ = static_cast<uint32_t>(n);
+    else
+      level_ = levels_.size();
+  } else { // find first non-empty item
+    if (bb_count_ == 0 && bit_pattern_ > 0) {
+      level_ = 0;
+      weight_ = 2;
+      while ((bit_pattern_ & 0x01) == 0) {
+        weight_ *= 2;
+        ++level_;
+        bit_pattern_ >>= 1;
+      }
+    }
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+typename quantiles_sketch<T, C, S, A>::const_iterator& quantiles_sketch<T, C, S, A>::const_iterator::operator++() {
+  ++index_;
+
+  if ((level_ == -1 && index_ == base_buffer_.size() && levels_.size() > 0) || (level_ >= 0 && index_ == k_)) { // go to the next non-empty level
+    index_ = 0;
+    do {
+      ++level_;
+      if (level_ > 0) bit_pattern_ = bit_pattern_ >> 1;
+      if (bit_pattern_ == 0) return *this;
+      weight_ *= 2;
+    } while ((bit_pattern_ & static_cast<uint64_t>(1)) == 0);
+  }
+  return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+typename quantiles_sketch<T, C, S, A>::const_iterator& quantiles_sketch<T, C, S, A>::const_iterator::operator++(int) {
+  const_iterator tmp(*this);
+  operator++();
+  return tmp;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool quantiles_sketch<T, C, S, A>::const_iterator::operator==(const const_iterator& other) const {
+  return level_ == other.level_ && index_ == other.index_;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool quantiles_sketch<T, C, S, A>::const_iterator::operator!=(const const_iterator& other) const {
+  return !operator==(other);
+}
+
+template<typename T, typename C, typename S, typename A>
+std::pair<const T&, const uint64_t> quantiles_sketch<T, C, S, A>::const_iterator::operator*() const {
+  return std::pair<const T&, const uint64_t>(level_ == -1 ? base_buffer_[index_] : levels_[level_][index_], weight_);
+}
+
+} /* namespace datasketches */
+
+#endif // _QUANTILES_SKETCH_IMPL_HPP_
\ No newline at end of file
diff --git a/quantiles/test/CMakeLists.txt b/quantiles/test/CMakeLists.txt
new file mode 100644
index 0000000..c63a5a1
--- /dev/null
+++ b/quantiles/test/CMakeLists.txt
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+add_executable(quantiles_test)
+
+target_link_libraries(quantiles_test quantiles common_test)
+
+set_target_properties(quantiles_test PROPERTIES
+  CXX_STANDARD 11
+  CXX_STANDARD_REQUIRED YES
+)
+
+file(TO_CMAKE_PATH "${CMAKE_CURRENT_SOURCE_DIR}" QUANTILES_TEST_BINARY_PATH)
+string(APPEND QUANTILES_TEST_BINARY_PATH "/")
+target_compile_definitions(quantiles_test
+  PRIVATE
+    TEST_BINARY_INPUT_PATH="${QUANTILES_TEST_BINARY_PATH}"
+)
+
+add_test(
+  NAME quantiles_test
+  COMMAND quantiles_test
+)
+
+target_sources(quantiles_test
+  PRIVATE
+    quantiles_sketch_test.cpp
+    #quantiles_sketch_custom_type_test.cpp
+    #quantiles_sketch_validation.cpp
+    #kolmogorov_smirnov_test.cpp
+)
diff --git a/quantiles/test/quantiles_sketch_test.cpp b/quantiles/test/quantiles_sketch_test.cpp
new file mode 100644
index 0000000..c660ef0
--- /dev/null
+++ b/quantiles/test/quantiles_sketch_test.cpp
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <catch.hpp>
+#include <cmath>
+#include <cstring>
+#include <sstream>
+#include <fstream>
+
+#include <quantiles_sketch.hpp>
+#include <test_allocator.hpp>
+
+namespace datasketches {
+
+static const double RANK_EPS_FOR_K_128 = 0.01725;
+static const double NUMERIC_NOISE_TOLERANCE = 1E-6;
+
+#ifdef TEST_BINARY_INPUT_PATH
+static std::string testBinaryInputPath = TEST_BINARY_INPUT_PATH;
+#else
+static std::string testBinaryInputPath = "test/";
+#endif
+
+// typical usage would be just quantiles_sketch<float> or quantiles_sketch<std::string>, but here we use test_allocator
+using quantiles_float_sketch = quantiles_sketch<float, std::less<float>, serde<float>, test_allocator<float>>;
+// let std::string use the default allocator for simplicity, otherwise we need to define "less" and "serde"
+using quantiles_string_sketch = quantiles_sketch<std::string, std::less<std::string>, serde<std::string>, test_allocator<std::string>>;
+
+TEST_CASE("quantiles sketch", "[quantiles_sketch]") {
+
+  // setup
+  test_allocator_total_bytes = 0;
+
+  SECTION("k limits") {
+    quantiles_float_sketch sketch1(quantiles_constants::MIN_K, 0); // this should work
+    quantiles_float_sketch sketch2(quantiles_constants::MAX_K, 0); // this should work
+    REQUIRE_THROWS_AS(new quantiles_float_sketch(quantiles_constants::MIN_K - 1, 0), std::invalid_argument);
+    // MAX_K + 1 makes no sense because k is uint16_t
+  }
+
+  SECTION("empty") {
+    quantiles_float_sketch sketch(128, 0);
+    REQUIRE(sketch.is_empty());
+    REQUIRE_FALSE(sketch.is_estimation_mode());
+    REQUIRE(sketch.get_n() == 0);
+    REQUIRE(sketch.get_num_retained() == 0);
+    REQUIRE(std::isnan(sketch.get_rank(0)));
+    REQUIRE(std::isnan(sketch.get_min_value()));
+    REQUIRE(std::isnan(sketch.get_max_value()));
+    REQUIRE(std::isnan(sketch.get_quantile(0.5)));
+    const double fractions[3] {0, 0.5, 1};
+    REQUIRE(sketch.get_quantiles(fractions, 3).empty());
+    const float split_points[1] {0};
+    REQUIRE(sketch.get_PMF(split_points, 1).empty());
+    REQUIRE(sketch.get_CDF(split_points, 1).empty());
+
+    for (auto it: sketch) {
+      (void) it; // to suppress "unused" warning
+      FAIL("should be no iterations over an empty sketch");
+    }
+  }
+
+  SECTION("get bad quantile") {
+    quantiles_float_sketch sketch(200, 0);
+    sketch.update(0); // has to be non-empty to reach the check
+    REQUIRE_THROWS_AS(sketch.get_quantile(-1), std::invalid_argument);
+  }
+
+  SECTION("one item") {
+    quantiles_float_sketch sketch(128, 0);
+    sketch.update(1.0f);
+    REQUIRE_FALSE(sketch.is_empty());
+    REQUIRE_FALSE(sketch.is_estimation_mode());
+    REQUIRE(sketch.get_n() == 1);
+    REQUIRE(sketch.get_num_retained() == 1);
+    REQUIRE(sketch.get_rank(1.0f) == 0.0);
+    REQUIRE(sketch.get_rank(2.0f) == 1.0);
+    REQUIRE(sketch.get_min_value() == 1.0);
+    REQUIRE(sketch.get_max_value() == 1.0);
+    REQUIRE(sketch.get_quantile(0.5) == 1.0);
+    const double fractions[3] {0, 0.5, 1};
+    auto quantiles = sketch.get_quantiles(fractions, 3);
+    REQUIRE(quantiles.size() == 3);
+    REQUIRE(quantiles[0] == 1.0);
+    REQUIRE(quantiles[1] == 1.0);
+    REQUIRE(quantiles[2] == 1.0);
+
+    int count = 0;
+    for (auto it: sketch) {
+      REQUIRE(it.second == 1);
+      ++count;
+    }
+    REQUIRE(count == 1);
+  }
+
+  SECTION("NaN") {
+    quantiles_float_sketch sketch(200, 0);
+    sketch.update(std::numeric_limits<float>::quiet_NaN());
+    REQUIRE(sketch.is_empty());
+
+    sketch.update(0);
+    sketch.update(std::numeric_limits<float>::quiet_NaN());
+    REQUIRE(sketch.get_n() == 1);
+  }
+
+
+  SECTION("sampling mode") {
+    const uint16_t k = 10;
+    const uint32_t n = 16 * (2 * k) + 1;
+    quantiles_float_sketch sk(k, 0);
+    for (uint32_t i = 0; i < n; ++i) {
+      sk.update(static_cast<float>(i));
+    }
+  }
+
+  SECTION("many items, exact mode") {
+    const uint32_t n = 127;
+    quantiles_float_sketch sketch(n + 1, 0);
+    for (uint32_t i = 0; i < n; i++) {
+      sketch.update(static_cast<float>(i));
+      REQUIRE(sketch.get_n() == i + 1);
+    }
+    REQUIRE_FALSE(sketch.is_empty());
+    REQUIRE_FALSE(sketch.is_estimation_mode());
+    REQUIRE(sketch.get_num_retained() == n);
+    REQUIRE(sketch.get_min_value() == 0.0);
+    REQUIRE(sketch.get_quantile(0) == 0.0);
+    REQUIRE(sketch.get_max_value() == n - 1);
+    REQUIRE(sketch.get_quantile(1) == n - 1);
+
+    int count = 0;
+    for (auto it: sketch) {
+      REQUIRE(it.second == 1);
+      ++count;
+    }
+    REQUIRE(count == n);
+
+    const double fractions[3] {0, 0.5, 1};
+    auto quantiles = sketch.get_quantiles(fractions, 3);
+    REQUIRE(quantiles.size() == 3);
+    REQUIRE(quantiles[0] == 0.0);
+    REQUIRE(quantiles[1] == static_cast<float>(n / 2));
+    REQUIRE(quantiles[2] == n - 1 );
+
+    for (uint32_t i = 0; i < n; i++) {
+      const double trueRank = (double) i / n;
+      REQUIRE(sketch.get_rank(static_cast<float>(i)) == trueRank);
+    }
+
+    // the alternative method must produce the same result
+    auto quantiles2 = sketch.get_quantiles(3);
+    REQUIRE(quantiles2.size() == 3);
+    REQUIRE(quantiles[0] == quantiles2[0]);
+    REQUIRE(quantiles[1] == quantiles2[1]);
+    REQUIRE(quantiles[2] == quantiles2[2]);
+  }
+
+  SECTION("10 items") {
+    quantiles_float_sketch sketch(200, 0);
+    sketch.update(1.0f);
+    sketch.update(2.0f);
+    sketch.update(3.0f);
+    sketch.update(4.0f);
+    sketch.update(5.0f);
+    sketch.update(6.0f);
+    sketch.update(7.0f);
+    sketch.update(8.0f);
+    sketch.update(9.0f);
+    sketch.update(10.0f);
+    REQUIRE(sketch.get_quantile(0) == 1.0);
+    REQUIRE(sketch.get_quantile(0.5) == 6.0);
+    REQUIRE(sketch.get_quantile(0.99) == 10.0);
+    REQUIRE(sketch.get_quantile(1) == 10.0);
+  }
+
+  SECTION("100 items") {
+    quantiles_float_sketch sketch(200, 0);
+    for (int i = 0; i < 100; ++i) sketch.update(static_cast<float>(i));
+    REQUIRE(sketch.get_quantile(0) == 0);
+    REQUIRE(sketch.get_quantile(0.01) == 1);
+    REQUIRE(sketch.get_quantile(0.5) == 50);
+    REQUIRE(sketch.get_quantile(0.99) == 99.0);
+    REQUIRE(sketch.get_quantile(1) == 99.0);
+  }
+
+  SECTION("many items, estimation mode") {
+    quantiles_float_sketch sketch(128, 0);
+    const int n = 1000000;
+    for (int i = 0; i < n; i++) {
+      sketch.update(static_cast<float>(i));
+      REQUIRE(sketch.get_n() == static_cast<uint64_t>(i + 1));
+    }
+    REQUIRE_FALSE(sketch.is_empty());
+    REQUIRE(sketch.is_estimation_mode());
+    REQUIRE(sketch.get_min_value() == 0.0); // min value is exact
+    REQUIRE(sketch.get_quantile(0) == 0.0); // min value is exact
+    REQUIRE(sketch.get_max_value() == n - 1); // max value is exact
+    REQUIRE(sketch.get_quantile(1) == n - 1); // max value is exact
+
+    // test rank
+    for (int i = 0; i < n; i++) {
+      const double trueRank = (double) i / n;
+      REQUIRE(sketch.get_rank(static_cast<float>(i)) == Approx(trueRank).margin(RANK_EPS_FOR_K_128));
+    }
+
+    // test quantiles at every 0.1 percentage point
+    double fractions[1001];
+    double reverse_fractions[1001]; // check that ordering does not matter
+    for (int i = 0; i < 1001; i++) {
+      fractions[i] = (double) i / 1000;
+      reverse_fractions[1000 - i] = fractions[i];
+    }
+    auto quantiles = sketch.get_quantiles(fractions, 1001);
+    auto reverse_quantiles = sketch.get_quantiles(reverse_fractions, 1001);
+    float previous_quantile(0);
+    for (int i = 0; i < 1001; i++) {
+      // expensive in a loop, just to check the equivalence here, not advised for real code
+      const float quantile = sketch.get_quantile(fractions[i]);
+      REQUIRE(quantiles[i] == quantile);
+      REQUIRE(reverse_quantiles[1000 - i] == quantile);
+      REQUIRE(previous_quantile <= quantile);
+      previous_quantile = quantile;
+    }
+
+    std::cout << sketch.to_string();
+
+    uint32_t count = 0;
+    uint64_t total_weight = 0;
+    for (auto it: sketch) {
+      ++count;
+      total_weight += it.second;
+    }
+    REQUIRE(count == sketch.get_num_retained());
+    REQUIRE(total_weight == sketch.get_n());
+  }
+
+  SECTION("consistency between get_rank and get_PMF/CDF") {
+    quantiles_float_sketch sketch(200, 0);
+    const int n = 1000;
+    float values[n];
+    for (int i = 0; i < n; i++) {
+      sketch.update(static_cast<float>(i));
+      values[i] = static_cast<float>(i);
+    }
+
+    const auto ranks(sketch.get_CDF(values, n));
+    const auto pmf(sketch.get_PMF(values, n));
+
+    double subtotal_pmf(0);
+    for (int i = 0; i < n; i++) {
+      if (sketch.get_rank(values[i]) != ranks[i]) {
+        std::cerr << "checking rank vs CDF for value " << i << std::endl;
+        REQUIRE(sketch.get_rank(values[i]) == ranks[i]);
+      }
+      subtotal_pmf += pmf[i];
+      if (abs(ranks[i] - subtotal_pmf) > NUMERIC_NOISE_TOLERANCE) {
+        std::cerr << "CDF vs PMF for value " << i << std::endl;
+        REQUIRE(ranks[i] == Approx(subtotal_pmf).margin(NUMERIC_NOISE_TOLERANCE));
+      }
+    }
+  }
+/*
+  SECTION("deserialize from java") {
+    std::ifstream is;
+    is.exceptions(std::ios::failbit | std::ios::badbit);
+    is.open(testBinaryInputPath + "kll_sketch_from_java.sk", std::ios::binary);
+    auto sketch = kll_float_sketch::deserialize(is, test_allocator<float>(0));
+    REQUIRE_FALSE(sketch.is_empty());
+    REQUIRE(sketch.is_estimation_mode());
+    REQUIRE(sketch.get_n() == 1000000);
+    REQUIRE(sketch.get_num_retained() == 614);
+    REQUIRE(sketch.get_min_value() == 0.0);
+    REQUIRE(sketch.get_max_value() == 999999.0);
+  }
+
+  SECTION("stream serialize deserialize empty") {
+    kll_float_sketch sketch(200, 0);
+    std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+    sketch.serialize(s);
+    REQUIRE(static_cast<size_t>(s.tellp()) == sketch.get_serialized_size_bytes());
+    auto sketch2 = kll_float_sketch::deserialize(s, test_allocator<float>(0));
+    REQUIRE(static_cast<size_t>(s.tellp()) == sketch2.get_serialized_size_bytes());
+    REQUIRE(s.tellg() == s.tellp());
+    REQUIRE(sketch2.is_empty() == sketch.is_empty());
+    REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+    REQUIRE(sketch2.get_n() == sketch.get_n());
+    REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+    REQUIRE(std::isnan(sketch2.get_min_value()));
+    REQUIRE(std::isnan(sketch2.get_max_value()));
+    REQUIRE(sketch2.get_normalized_rank_error(false) == sketch.get_normalized_rank_error(false));
+    REQUIRE(sketch2.get_normalized_rank_error(true) == sketch.get_normalized_rank_error(true));
+  }
+
+  SECTION("bytes serialize deserialize empty") {
+    kll_float_sketch sketch(200, 0);
+    auto bytes = sketch.serialize();
+    auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), 0);
+    REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+    REQUIRE(sketch2.is_empty() == sketch.is_empty());
+    REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+    REQUIRE(sketch2.get_n() == sketch.get_n());
+    REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+    REQUIRE(std::isnan(sketch2.get_min_value()));
+    REQUIRE(std::isnan(sketch2.get_max_value()));
+    REQUIRE(sketch2.get_normalized_rank_error(false) == sketch.get_normalized_rank_error(false));
+    REQUIRE(sketch2.get_normalized_rank_error(true) == sketch.get_normalized_rank_error(true));
+  }
+
+  SECTION("stream serialize deserialize one item") {
+    kll_float_sketch sketch(200, 0);
+    sketch.update(1.0f);
+    std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+    sketch.serialize(s);
+    REQUIRE(static_cast<size_t>(s.tellp()) == sketch.get_serialized_size_bytes());
+    auto sketch2 = kll_float_sketch::deserialize(s, test_allocator<float>(0));
+    REQUIRE(static_cast<size_t>(s.tellp()) == sketch2.get_serialized_size_bytes());
+    REQUIRE(s.tellg() == s.tellp());
+    REQUIRE_FALSE(sketch2.is_empty());
+    REQUIRE_FALSE(sketch2.is_estimation_mode());
+    REQUIRE(sketch2.get_n() == 1);
+    REQUIRE(sketch2.get_num_retained() == 1);
+    REQUIRE(sketch2.get_min_value() == 1.0);
+    REQUIRE(sketch2.get_max_value() == 1.0);
+    REQUIRE(sketch2.get_quantile(0.5) == 1.0);
+    REQUIRE(sketch2.get_rank(1) == 0.0);
+    REQUIRE(sketch2.get_rank(2) == 1.0);
+  }
+
+  SECTION("bytes serialize deserialize one item") {
+    kll_float_sketch sketch(200, 0);
+    sketch.update(1.0f);
+    auto bytes = sketch.serialize();
+    REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+    auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), 0);
+    REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+    REQUIRE_FALSE(sketch2.is_empty());
+    REQUIRE_FALSE(sketch2.is_estimation_mode());
+    REQUIRE(sketch2.get_n() == 1);
+    REQUIRE(sketch2.get_num_retained() == 1);
+    REQUIRE(sketch2.get_min_value() == 1.0);
+    REQUIRE(sketch2.get_max_value() == 1.0);
+    REQUIRE(sketch2.get_quantile(0.5) == 1.0);
+    REQUIRE(sketch2.get_rank(1) == 0.0);
+    REQUIRE(sketch2.get_rank(2) == 1.0);
+  }
+
+  SECTION("deserialize one item v1") {
+    std::ifstream is;
+    is.exceptions(std::ios::failbit | std::ios::badbit);
+    is.open(testBinaryInputPath + "kll_sketch_float_one_item_v1.sk", std::ios::binary);
+    auto sketch = kll_float_sketch::deserialize(is, test_allocator<float>(0));
+    REQUIRE_FALSE(sketch.is_empty());
+    REQUIRE_FALSE(sketch.is_estimation_mode());
+    REQUIRE(sketch.get_n() == 1);
+    REQUIRE(sketch.get_num_retained() == 1);
+    REQUIRE(sketch.get_min_value() == 1.0);
+    REQUIRE(sketch.get_max_value() == 1.0);
+  }
+
+  SECTION("stream serialize deserialize three items") {
+    kll_float_sketch sketch(200, 0);
+    sketch.update(1.0f);
+    sketch.update(2.0f);
+    sketch.update(3.0f);
+    std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+    sketch.serialize(s);
+    REQUIRE(static_cast<size_t>(s.tellp()) == sketch.get_serialized_size_bytes());
+    auto sketch2 = kll_float_sketch::deserialize(s, test_allocator<float>(0));
+    REQUIRE(static_cast<size_t>(s.tellp()) == sketch2.get_serialized_size_bytes());
+    REQUIRE(s.tellg() == s.tellp());
+    REQUIRE_FALSE(sketch2.is_empty());
+    REQUIRE_FALSE(sketch2.is_estimation_mode());
+    REQUIRE(sketch2.get_n() == 3);
+    REQUIRE(sketch2.get_num_retained() == 3);
+    REQUIRE(sketch2.get_min_value() == 1.0);
+    REQUIRE(sketch2.get_max_value() == 3.0);
+  }
+
+  SECTION("bytes serialize deserialize three items") {
+    kll_float_sketch sketch(200, 0);
+    sketch.update(1.0f);
+    sketch.update(2.0f);
+    sketch.update(3.0f);
+    auto bytes = sketch.serialize();
+    REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+    auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), 0);
+    REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+    REQUIRE_FALSE(sketch2.is_empty());
+    REQUIRE_FALSE(sketch2.is_estimation_mode());
+    REQUIRE(sketch2.get_n() == 3);
+    REQUIRE(sketch2.get_num_retained() == 3);
+    REQUIRE(sketch2.get_min_value() == 1.0);
+    REQUIRE(sketch2.get_max_value() == 3.0);
+  }
+
+  SECTION("stream serialize deserialize many floats") {
+    kll_float_sketch sketch(200, 0);
+    const int n = 1000;
+    for (int i = 0; i < n; i++) sketch.update(static_cast<float>(i));
+    std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+    sketch.serialize(s);
+    REQUIRE(static_cast<size_t>(s.tellp()) == sketch.get_serialized_size_bytes());
+    auto sketch2 = kll_float_sketch::deserialize(s, test_allocator<float>(0));
+    REQUIRE(static_cast<size_t>(s.tellp()) == sketch2.get_serialized_size_bytes());
+    REQUIRE(s.tellg() == s.tellp());
+    REQUIRE(sketch2.is_empty() == sketch.is_empty());
+    REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+    REQUIRE(sketch2.get_n() == sketch.get_n());
+    REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+    REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+    REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+    REQUIRE(sketch2.get_normalized_rank_error(false) == sketch.get_normalized_rank_error(false));
+    REQUIRE(sketch2.get_normalized_rank_error(true) == sketch.get_normalized_rank_error(true));
+    REQUIRE(sketch2.get_quantile(0.5) == sketch.get_quantile(0.5));
+    REQUIRE(sketch2.get_rank(0) == sketch.get_rank(0));
+    REQUIRE(sketch2.get_rank(static_cast<float>(n)) == sketch.get_rank(static_cast<float>(n)));
+  }
+
+  SECTION("bytes serialize deserialize many floats") {
+    kll_float_sketch sketch(200, 0);
+    const int n = 1000;
+    for (int i = 0; i < n; i++) sketch.update(static_cast<float>(i));
+    auto bytes = sketch.serialize();
+    REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+    auto sketch2 = kll_float_sketch::deserialize(bytes.data(), bytes.size(), 0);
+    REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+    REQUIRE(sketch2.is_empty() == sketch.is_empty());
+    REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+    REQUIRE(sketch2.get_n() == sketch.get_n());
+    REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+    REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+    REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+    REQUIRE(sketch2.get_normalized_rank_error(false) == sketch.get_normalized_rank_error(false));
+    REQUIRE(sketch2.get_normalized_rank_error(true) == sketch.get_normalized_rank_error(true));
+    REQUIRE(sketch2.get_quantile(0.5) == sketch.get_quantile(0.5));
+    REQUIRE(sketch2.get_rank(0) == sketch.get_rank(0));
+    REQUIRE(sketch2.get_rank(static_cast<float>(n)) == sketch.get_rank(static_cast<float>(n)));
+    REQUIRE_THROWS_AS(kll_sketch<int>::deserialize(bytes.data(), 7), std::out_of_range);
+    REQUIRE_THROWS_AS(kll_sketch<int>::deserialize(bytes.data(), 15), std::out_of_range);
+    REQUIRE_THROWS_AS(kll_sketch<int>::deserialize(bytes.data(), bytes.size() - 1), std::out_of_range);
+  }
+
+  SECTION("bytes serialize deserialize many ints") {
+    kll_sketch<int> sketch;
+    const int n = 1000;
+    for (int i = 0; i < n; i++) sketch.update(i);
+    auto bytes = sketch.serialize();
+    REQUIRE(bytes.size() == sketch.get_serialized_size_bytes());
+    auto sketch2 = kll_sketch<int>::deserialize(bytes.data(), bytes.size());
+    REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+    REQUIRE(sketch2.is_empty() == sketch.is_empty());
+    REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+    REQUIRE(sketch2.get_n() == sketch.get_n());
+    REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+    REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+    REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+    REQUIRE(sketch2.get_normalized_rank_error(false) == sketch.get_normalized_rank_error(false));
+    REQUIRE(sketch2.get_normalized_rank_error(true) == sketch.get_normalized_rank_error(true));
+    REQUIRE(sketch2.get_quantile(0.5) == sketch.get_quantile(0.5));
+    REQUIRE(sketch2.get_rank(0) == sketch.get_rank(0));
+    REQUIRE(sketch2.get_rank(n) == sketch.get_rank(n));
+    REQUIRE_THROWS_AS(kll_sketch<int>::deserialize(bytes.data(), 7), std::out_of_range);
+    REQUIRE_THROWS_AS(kll_sketch<int>::deserialize(bytes.data(), 15), std::out_of_range);
+    REQUIRE_THROWS_AS(kll_sketch<int>::deserialize(bytes.data(), bytes.size() - 1), std::out_of_range);
+  }
+
+  SECTION("floor of log2 of fraction") {
+    REQUIRE(kll_helper::floor_of_log2_of_fraction(0, 1) == 0);
+    REQUIRE(kll_helper::floor_of_log2_of_fraction(1, 2) == 0);
+    REQUIRE(kll_helper::floor_of_log2_of_fraction(2, 2) == 0);
+    REQUIRE(kll_helper::floor_of_log2_of_fraction(3, 2) == 0);
+    REQUIRE(kll_helper::floor_of_log2_of_fraction(4, 2) == 1);
+    REQUIRE(kll_helper::floor_of_log2_of_fraction(5, 2) == 1);
+    REQUIRE(kll_helper::floor_of_log2_of_fraction(6, 2) == 1);
+    REQUIRE(kll_helper::floor_of_log2_of_fraction(7, 2) == 1);
+    REQUIRE(kll_helper::floor_of_log2_of_fraction(8, 2) == 2);
+  }
+*/
+  SECTION("out of order split points, float") {
+    quantiles_float_sketch sketch(200, 0);
+    sketch.update(0); // has too be non-empty to reach the check
+    float split_points[2] = {1, 0};
+    REQUIRE_THROWS_AS(sketch.get_CDF(split_points, 2), std::invalid_argument);
+  }
+
+  SECTION("out of order split points, int") {
+    quantiles_sketch<int> sketch;
+    sketch.update(0); // has too be non-empty to reach the check
+    int split_points[2] = {1, 0};
+    REQUIRE_THROWS_AS(sketch.get_CDF(split_points, 2), std::invalid_argument);
+  }
+
+  SECTION("NaN split point") {
+    quantiles_float_sketch sketch(200, 0);
+    sketch.update(0); // has too be non-empty to reach the check
+    float split_points[1] = {std::numeric_limits<float>::quiet_NaN()};
+    REQUIRE_THROWS_AS(sketch.get_CDF(split_points, 1), std::invalid_argument);
+  }
+/*
+  SECTION("merge") {
+    quantiles_float_sketch sketch1(128, 0);
+    quantiles_float_sketch sketch2(128, 0);
+    const int n = 10000;
+    for (int i = 0; i < n; i++) {
+      sketch1.update(static_cast<float>(i));
+      sketch2.update(static_cast<float>((2 * n) - i - 1));
+    }
+
+    REQUIRE(sketch1.get_min_value() == 0.0f);
+    REQUIRE(sketch1.get_max_value() == n - 1);
+    REQUIRE(sketch2.get_min_value() == n);
+    REQUIRE(sketch2.get_max_value() == 2.0f * n - 1);
+
+    sketch1.merge(sketch2);
+
+    REQUIRE_FALSE(sketch1.is_empty());
+    REQUIRE(sketch1.get_n() == 2 * n);
+    REQUIRE(sketch1.get_min_value() == 0.0f);
+    REQUIRE(sketch1.get_max_value() == 2.0f * n - 1);
+    REQUIRE(sketch1.get_quantile(0.5) == Approx(n).margin(n * RANK_EPS_FOR_K_128));
+  }
+
+  SECTION("merge lower k") {
+    quantiles_float_sketch sketch1(256, 0);
+    quantiles_float_sketch sketch2(128, 0);
+    const int n = 10000;
+    for (int i = 0; i < n; i++) {
+      sketch1.update(static_cast<float>(i));
+      sketch2.update(static_cast<float>((2 * n) - i - 1));
+    }
+
+    REQUIRE(sketch1.get_min_value() == 0.0f);
+    REQUIRE(sketch1.get_max_value() == n - 1);
+    REQUIRE(sketch2.get_min_value() == n);
+    REQUIRE(sketch2.get_max_value() == 2.0f * n - 1);
+
+    REQUIRE(sketch1.get_k() == 256);
+    REQUIRE(sketch2.get_k() == 128);
+
+    REQUIRE(sketch1.get_normalized_rank_error(false) < sketch2.get_normalized_rank_error(false));
+    REQUIRE(sketch1.get_normalized_rank_error(true) < sketch2.get_normalized_rank_error(true));
+
+    sketch1.merge(sketch2);
+
+    // sketch1 must get "contaminated" by the lower K in sketch2
+    REQUIRE(sketch2.get_normalized_rank_error(false) == sketch1.get_normalized_rank_error(false));
+    REQUIRE(sketch2.get_normalized_rank_error(true) == sketch1.get_normalized_rank_error(true));
+
+    REQUIRE_FALSE(sketch1.is_empty());
+    REQUIRE(sketch1.get_n() == 2 * n);
+    REQUIRE(sketch1.get_min_value() == 0.0f);
+    REQUIRE(sketch1.get_max_value() == 2.0f * n - 1);
+    REQUIRE(sketch1.get_quantile(0.5) == Approx(n).margin(n * RANK_EPS_FOR_K_200));
+  }
+
+  SECTION("merge exact mode, lower k") {
+    kll_float_sketch sketch1(256, 0);
+    kll_float_sketch sketch2(128, 0);
+    const int n = 10000;
+    for (int i = 0; i < n; i++) {
+      sketch1.update(static_cast<float>(i));
+    }
+
+    // rank error should not be affected by a merge with an empty sketch with lower k
+    const double rank_error_before_merge = sketch1.get_normalized_rank_error(true);
+    sketch1.merge(sketch2);
+    REQUIRE(sketch1.get_normalized_rank_error(true) == rank_error_before_merge);
+
+    REQUIRE_FALSE(sketch1.is_empty());
+    REQUIRE(sketch1.get_n() == n);
+    REQUIRE(sketch1.get_min_value() == 0.0f);
+    REQUIRE(sketch1.get_max_value() == n - 1);
+    REQUIRE(sketch1.get_quantile(0.5) == Approx(n / 2).margin(n / 2 * RANK_EPS_FOR_K_200));
+
+    sketch2.update(0);
+    sketch1.merge(sketch2);
+    // rank error should not be affected by a merge with a sketch in exact mode with lower k
+    REQUIRE(sketch1.get_normalized_rank_error(true) == rank_error_before_merge);
+  }
+
+  SECTION("merge min value from other") {
+    kll_float_sketch sketch1(200, 0);
+    kll_float_sketch sketch2(200, 0);
+    sketch1.update(1.0f);
+    sketch2.update(2.0f);
+    sketch2.merge(sketch1);
+    REQUIRE(sketch2.get_min_value() == 1.0f);
+    REQUIRE(sketch2.get_max_value() == 2.0f);
+  }
+
+  SECTION("merge min and max values from other") {
+    kll_float_sketch sketch1(200, 0);
+    for (int i = 0; i < 1000000; i++) sketch1.update(static_cast<float>(i));
+    kll_float_sketch sketch2(200, 0);
+    sketch2.merge(sketch1);
+    REQUIRE(sketch2.get_min_value() == 0.0f);
+    REQUIRE(sketch2.get_max_value() == 999999.0f);
+  }
+
+  SECTION("sketch of ints") {
+    kll_sketch<int> sketch;
+    REQUIRE_THROWS_AS(sketch.get_quantile(0), std::runtime_error);
+    REQUIRE_THROWS_AS(sketch.get_min_value(), std::runtime_error);
+    REQUIRE_THROWS_AS(sketch.get_max_value(), std::runtime_error);
+
+    const int n = 1000;
+    for (int i = 0; i < n; i++) sketch.update(i);
+
+    std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+    sketch.serialize(s);
+    REQUIRE(static_cast<size_t>(s.tellp()) == sketch.get_serialized_size_bytes());
+    auto sketch2 = kll_sketch<int>::deserialize(s);
+    REQUIRE(static_cast<size_t>(s.tellp()) == sketch2.get_serialized_size_bytes());
+    REQUIRE(s.tellg() == s.tellp());
+    REQUIRE(sketch2.is_empty() == sketch.is_empty());
+    REQUIRE(sketch2.is_estimation_mode() == sketch.is_estimation_mode());
+    REQUIRE(sketch2.get_n() == sketch.get_n());
+    REQUIRE(sketch2.get_num_retained() == sketch.get_num_retained());
+    REQUIRE(sketch2.get_min_value() == sketch.get_min_value());
+    REQUIRE(sketch2.get_max_value() == sketch.get_max_value());
+    REQUIRE(sketch2.get_normalized_rank_error(false) == sketch.get_normalized_rank_error(false));
+    REQUIRE(sketch2.get_normalized_rank_error(true) == sketch.get_normalized_rank_error(true));
+    REQUIRE(sketch2.get_quantile(0.5) == sketch.get_quantile(0.5));
+    REQUIRE(sketch2.get_rank(0) == sketch.get_rank(0));
+    REQUIRE(sketch2.get_rank(n) == sketch.get_rank(n));
+  }
+
+  SECTION("sketch of strings stream") {
+    kll_string_sketch sketch1(200, 0);
+    REQUIRE_THROWS_AS(sketch1.get_quantile(0), std::runtime_error);
+    REQUIRE_THROWS_AS(sketch1.get_min_value(), std::runtime_error);
+    REQUIRE_THROWS_AS(sketch1.get_max_value(), std::runtime_error);
+    REQUIRE(sketch1.get_serialized_size_bytes() == 8);
+
+    const int n = 1000;
+    for (int i = 0; i < n; i++) sketch1.update(std::to_string(i));
+
+    REQUIRE(sketch1.get_min_value() == std::string("0"));
+    REQUIRE(sketch1.get_max_value() == std::string("999"));
+
+    std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
+    sketch1.serialize(s);
+    REQUIRE(static_cast<size_t>(s.tellp()) == sketch1.get_serialized_size_bytes());
+    auto sketch2 = kll_string_sketch::deserialize(s, test_allocator<std::string>(0));
+    REQUIRE(static_cast<size_t>(s.tellp()) == sketch2.get_serialized_size_bytes());
+    REQUIRE(s.tellg() == s.tellp());
+    REQUIRE(sketch2.is_empty() == sketch1.is_empty());
+    REQUIRE(sketch2.is_estimation_mode() == sketch1.is_estimation_mode());
+    REQUIRE(sketch2.get_n() == sketch1.get_n());
+    REQUIRE(sketch2.get_num_retained() == sketch1.get_num_retained());
+    REQUIRE(sketch2.get_min_value() == sketch1.get_min_value());
+    REQUIRE(sketch2.get_max_value() == sketch1.get_max_value());
+    REQUIRE(sketch2.get_normalized_rank_error(false) == sketch1.get_normalized_rank_error(false));
+    REQUIRE(sketch2.get_normalized_rank_error(true) == sketch1.get_normalized_rank_error(true));
+    REQUIRE(sketch2.get_quantile(0.5) == sketch1.get_quantile(0.5));
+    REQUIRE(sketch2.get_rank(std::to_string(0)) == sketch1.get_rank(std::to_string(0)));
+    REQUIRE(sketch2.get_rank(std::to_string(n)) == sketch1.get_rank(std::to_string(n)));
+
+    // to take a look using hexdump
+    //std::ofstream os("kll-string.sk");
+    //sketch1.serialize(os);
+
+    // debug print
+    //sketch1.to_stream(std::cout);
+  }
+
+  SECTION("sketch of strings bytes") {
+    kll_string_sketch sketch1(200, 0);
+    REQUIRE_THROWS_AS(sketch1.get_quantile(0), std::runtime_error);
+    REQUIRE_THROWS_AS(sketch1.get_min_value(), std::runtime_error);
+    REQUIRE_THROWS_AS(sketch1.get_max_value(), std::runtime_error);
+    REQUIRE(sketch1.get_serialized_size_bytes() == 8);
+
+    const int n = 1000;
+    for (int i = 0; i < n; i++) sketch1.update(std::to_string(i));
+
+    REQUIRE(sketch1.get_min_value() == std::string("0"));
+    REQUIRE(sketch1.get_max_value() == std::string("999"));
+
+    auto bytes = sketch1.serialize();
+    REQUIRE(bytes.size() == sketch1.get_serialized_size_bytes());
+    auto sketch2 = kll_string_sketch::deserialize(bytes.data(), bytes.size(), 0);
+    REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+    REQUIRE(sketch2.is_empty() == sketch1.is_empty());
+    REQUIRE(sketch2.is_estimation_mode() == sketch1.is_estimation_mode());
+    REQUIRE(sketch2.get_n() == sketch1.get_n());
+    REQUIRE(sketch2.get_num_retained() == sketch1.get_num_retained());
+    REQUIRE(sketch2.get_min_value() == sketch1.get_min_value());
+    REQUIRE(sketch2.get_max_value() == sketch1.get_max_value());
+    REQUIRE(sketch2.get_normalized_rank_error(false) == sketch1.get_normalized_rank_error(false));
+    REQUIRE(sketch2.get_normalized_rank_error(true) == sketch1.get_normalized_rank_error(true));
+    REQUIRE(sketch2.get_quantile(0.5) == sketch1.get_quantile(0.5));
+    REQUIRE(sketch2.get_rank(std::to_string(0)) == sketch1.get_rank(std::to_string(0)));
+    REQUIRE(sketch2.get_rank(std::to_string(n)) == sketch1.get_rank(std::to_string(n)));
+  }
+
+
+  SECTION("sketch of strings, single item, bytes") {
+    kll_string_sketch sketch1(200, 0);
+    sketch1.update("a");
+    auto bytes = sketch1.serialize();
+    REQUIRE(bytes.size() == sketch1.get_serialized_size_bytes());
+    auto sketch2 = kll_string_sketch::deserialize(bytes.data(), bytes.size(), 0);
+    REQUIRE(bytes.size() == sketch2.get_serialized_size_bytes());
+  }
+*/
+  SECTION("copy") {
+    quantiles_sketch<int> sketch1;
+    const int n(1000);
+    for (int i = 0; i < n; i++) sketch1.update(i);
+
+    // copy constructor
+    quantiles_sketch<int> sketch2(sketch1);
+    for (int i = 0; i < n; i++) {
+      REQUIRE(sketch2.get_rank(i) == sketch1.get_rank(i));
+    }
+
+    // copy assignment
+    quantiles_sketch<int> sketch3;
+    sketch3 = sketch1;
+    for (int i = 0; i < n; i++) {
+      REQUIRE(sketch3.get_rank(i) == sketch1.get_rank(i));
+    }
+  }
+
+  SECTION("move") {
+    quantiles_sketch<int> sketch1;
+    const int n(100);
+    for (int i = 0; i < n; i++) sketch1.update(i);
+
+    // move constructor
+    quantiles_sketch<int> sketch2(std::move(sketch1));
+    for (int i = 0; i < n; i++) {
+      REQUIRE(sketch2.get_rank(i) == (double) i / n);
+    }
+
+    // move assignment
+    quantiles_sketch<int> sketch3;
+    sketch3 = std::move(sketch2);
+    for (int i = 0; i < n; i++) {
+      REQUIRE(sketch3.get_rank(i) == (double) i / n);
+    }
+  }
+/*
+  SECTION("max serialized size arithmetic type") {
+    REQUIRE(kll_sketch<float>::get_max_serialized_size_bytes(200, 10) == 1968);
+    REQUIRE(kll_sketch<float>::get_max_serialized_size_bytes(200, 100) == 2316);
+    REQUIRE(kll_sketch<float>::get_max_serialized_size_bytes(200, 1000) == 2440);
+    REQUIRE(kll_sketch<float>::get_max_serialized_size_bytes(200, 1000000) == 2800);
+    REQUIRE(kll_sketch<float>::get_max_serialized_size_bytes(200, 1000000000) == 3160);
+  }
+
+  SECTION("max serialized size non-arithmetic type") {
+    REQUIRE(kll_sketch<std::string>::get_max_serialized_size_bytes(200, 10, 4) == 1968);
+    REQUIRE(kll_sketch<std::string>::get_max_serialized_size_bytes(200, 100, 4) == 2316);
+    REQUIRE(kll_sketch<std::string>::get_max_serialized_size_bytes(200, 1000, 4) == 2440);
+    REQUIRE(kll_sketch<std::string>::get_max_serialized_size_bytes(200, 1000000, 4) == 2800);
+    REQUIRE(kll_sketch<std::string>::get_max_serialized_size_bytes(200, 1000000000, 4) == 3160);
+  }
+
+  SECTION("issue #236") {
+    kll_sketch<int8_t> kll;
+    kll.update(1);
+    kll.update(2);
+    kll.update(3);
+    auto blob = kll.serialize();
+    auto kll2 = kll_sketch<int8_t>::deserialize(blob.data(), blob.size());
+  }
+*/
+  // cleanup
+  if (test_allocator_total_bytes != 0) {
+    REQUIRE(test_allocator_total_bytes == 0);
+  }
+}
+
+} /* namespace datasketches */

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org