You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datasketches.apache.org by al...@apache.org on 2020/01/10 01:18:24 UTC

[incubator-datasketches-cpp] 01/01: moved implementation to separate files

This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch kll_separated_impl
in repository https://gitbox.apache.org/repos/asf/incubator-datasketches-cpp.git

commit 4b62cfcb1b4d64622dedbda7b194972023481198
Author: AlexanderSaydakov <Al...@users.noreply.github.com>
AuthorDate: Thu Jan 9 17:18:10 2020 -0800

    moved implementation to separate files
---
 common/include/serde.hpp                           |   4 +-
 kll/CMakeLists.txt                                 |  11 +-
 kll/include/kll_helper.hpp                         | 267 +-----
 kll/include/kll_helper_impl.hpp                    | 319 +++++++
 kll/include/kll_quantile_calculator.hpp            | 163 +---
 kll/include/kll_quantile_calculator_impl.hpp       | 191 +++++
 kll/include/kll_sketch.hpp                         | 951 +--------------------
 .../{kll_sketch.hpp => kll_sketch_impl.hpp}        | 327 +------
 kll/test/kll_sketch_validation.cpp                 |   6 +-
 9 files changed, 565 insertions(+), 1674 deletions(-)

diff --git a/common/include/serde.hpp b/common/include/serde.hpp
index ba72ff8..4ed6e71 100644
--- a/common/include/serde.hpp
+++ b/common/include/serde.hpp
@@ -20,8 +20,10 @@
 #ifndef DATASKETCHES_SERDE_HPP_
 #define DATASKETCHES_SERDE_HPP_
 
-#include <memory>
 #include <cstring>
+#include <iostream>
+#include <memory>
+#include <string>
 
 namespace datasketches {
 
diff --git a/kll/CMakeLists.txt b/kll/CMakeLists.txt
index 3e40dd4..43c53f6 100644
--- a/kll/CMakeLists.txt
+++ b/kll/CMakeLists.txt
@@ -34,7 +34,13 @@ target_include_directories(kll
 target_link_libraries(kll INTERFACE common)
 target_compile_features(kll INTERFACE cxx_std_11)
 
-set(kll_HEADERS "include/kll_sketch.hpp;include/kll_helper.hpp;include/kll_quantile_calculator.hpp")
+set(kll_HEADERS "")
+list(APPEND kll_HEADERS "include/kll_sketch.hpp")
+list(APPEND kll_HEADERS "include/kll_sketch_impl.hpp")
+list(APPEND kll_HEADERS "include/kll_helper.hpp")
+list(APPEND kll_HEADERS "include/kll_helper_impl.hpp")
+list(APPEND kll_HEADERS "include/kll_quantile_calculator.hpp")
+list(APPEND kll_HEADERS "include/kll_quantile_calculator_impl.hpp")
 
 install(TARGETS kll
   EXPORT ${PROJECT_NAME}
@@ -46,6 +52,9 @@ install(FILES ${kll_HEADERS}
 target_sources(kll
   INTERFACE
     ${CMAKE_CURRENT_SOURCE_DIR}/include/kll_helper.hpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/include/kll_helper_impl.hpp
     ${CMAKE_CURRENT_SOURCE_DIR}/include/kll_sketch.hpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/include/kll_sketch_impl.hpp
     ${CMAKE_CURRENT_SOURCE_DIR}/include/kll_quantile_calculator.hpp
+    ${CMAKE_CURRENT_SOURCE_DIR}/include/kll_quantile_calculator_impl.hpp
 )
diff --git a/kll/include/kll_helper.hpp b/kll/include/kll_helper.hpp
index d38914e..a13e49b 100644
--- a/kll/include/kll_helper.hpp
+++ b/kll/include/kll_helper.hpp
@@ -22,7 +22,6 @@
 
 #include <random>
 #include <stdexcept>
-#include <algorithm>
 #include <chrono>
 
 namespace datasketches {
@@ -42,70 +41,15 @@ static const uint64_t powers_of_three[] =  {1, 3, 9, 27, 81, 243, 729, 2187, 656
 
 class kll_helper {
   public:
-    static bool is_even(uint32_t value) {
-      return (value & 1) == 0;
-    }
-
-    static bool is_odd(uint32_t value) {
-      return (value & 1) > 0;
-    }
-
-    static uint8_t floor_of_log2_of_fraction(uint64_t numer, uint64_t denom) {
-      if (denom > numer) return 0;
-      uint8_t count = 0;
-      while (true) {
-        denom <<= 1;
-        if (denom > numer) return count;
-        count++;
-      }
-    }
-
-    static uint8_t ub_on_num_levels(uint64_t n) {
-      if (n == 0) return 1;
-      return 1 + floor_of_log2_of_fraction(n, 1);
-    }
-
-    static uint32_t compute_total_capacity(uint16_t k, uint8_t m, uint8_t num_levels) {
-      uint32_t total = 0;
-      for (uint8_t h = 0; h < num_levels; h++) {
-        total += level_capacity(k, num_levels, h, m);
-      }
-      return total;
-    }
-
-    static uint32_t level_capacity(uint16_t k, uint8_t numLevels, uint8_t height, uint8_t min_wid) {
-      if (height >= numLevels) throw std::invalid_argument("height >= numLevels");
-      const uint8_t depth = numLevels - height - 1;
-      return std::max((uint32_t) min_wid, int_cap_aux(k, depth));
-    }
-
-    static uint32_t int_cap_aux(uint16_t k, uint8_t depth) {
-      if (depth > 60) throw std::invalid_argument("depth > 60");
-      if (depth <= 30) return int_cap_aux_aux(k, depth);
-      const uint8_t half = depth / 2;
-      const uint8_t rest = depth - half;
-      const uint32_t tmp = int_cap_aux_aux(k, half);
-      return int_cap_aux_aux(tmp, rest);
-    }
-
-    static uint32_t int_cap_aux_aux(uint16_t k, uint8_t depth) {
-      if (depth > 30) throw std::invalid_argument("depth > 30");
-      const uint64_t twok = k << 1; // for rounding, we pre-multiply by 2
-      const uint64_t tmp = (uint64_t) (((uint64_t) twok << depth) / powers_of_three[depth]);
-      const uint64_t result = (tmp + 1) >> 1; // then here we add 1 and divide by 2
-      if (result > k) throw std::logic_error("result > k");
-      return result;
-    }
-
-    static uint64_t sum_the_sample_weights(uint8_t num_levels, const uint32_t* levels) {
-      uint64_t total = 0;
-      uint64_t weight = 1;
-      for (uint8_t lvl = 0; lvl < num_levels; lvl++) {
-        total += weight * (levels[lvl + 1] - levels[lvl]);
-        weight *= 2;
-      }
-      return total;
-    }
+    static inline bool is_even(uint32_t value);
+    static inline bool is_odd(uint32_t value);
+    static inline uint8_t floor_of_log2_of_fraction(uint64_t numer, uint64_t denom);
+    static inline uint8_t ub_on_num_levels(uint64_t n);
+    static inline uint32_t compute_total_capacity(uint16_t k, uint8_t m, uint8_t num_levels);
+    static inline uint32_t level_capacity(uint16_t k, uint8_t numLevels, uint8_t height, uint8_t min_wid);
+    static inline uint32_t int_cap_aux(uint16_t k, uint8_t depth);
+    static inline uint32_t int_cap_aux_aux(uint16_t k, uint8_t depth);
+    static inline uint64_t sum_the_sample_weights(uint8_t num_levels, const uint32_t* levels);
 
     /*
      * This version is for floating point types
@@ -140,97 +84,23 @@ class kll_helper {
     }
 
     template <typename T>
-    static void randomly_halve_down(T* buf, uint32_t start, uint32_t length) {
-      if (!is_even(length)) throw std::invalid_argument("length must be even");
-      const uint32_t half_length = length / 2;
-    #ifdef KLL_VALIDATION
-      const uint32_t offset = deterministic_offset();
-    #else
-      const uint32_t offset = random_bit();
-    #endif
-      uint32_t j = start + offset;
-      for (uint32_t i = start; i < (start + half_length); i++) {
-        if (i != j) buf[i] = std::move(buf[j]);
-        j += 2;
-      }
-    }
+    static void randomly_halve_down(T* buf, uint32_t start, uint32_t length);
 
     template <typename T>
-    static void randomly_halve_up(T* buf, uint32_t start, uint32_t length) {
-      if (!is_even(length)) throw std::invalid_argument("length must be even");
-      const uint32_t half_length = length / 2;
-    #ifdef KLL_VALIDATION
-      const uint32_t offset = deterministic_offset();
-    #else
-      const uint32_t offset = random_bit();
-    #endif
-      uint32_t j = (start + length) - 1 - offset;
-      for (uint32_t i = (start + length) - 1; i >= (start + half_length); i--) {
-        if (i != j) buf[i] = std::move(buf[j]);
-        j -= 2;
-      }
-    }
+    static void randomly_halve_up(T* buf, uint32_t start, uint32_t length);
 
     // this version moves objects within the same buffer
     // assumes that destination has initialized objects
     // does not destroy the originals after the move
     template <typename T, typename C>
-    static void merge_sorted_arrays(T* buf, uint32_t start_a, uint32_t len_a, uint32_t start_b, uint32_t len_b, uint32_t start_c) {
-      const uint32_t len_c = len_a + len_b;
-      const uint32_t lim_a = start_a + len_a;
-      const uint32_t lim_b = start_b + len_b;
-      const uint32_t lim_c = start_c + len_c;
-
-      uint32_t a = start_a;
-      uint32_t b = start_b;
-
-      for (uint32_t c = start_c; c < lim_c; c++) {
-        if (a == lim_a) {
-          if (b != c) buf[c] = std::move(buf[b]);
-          b++;
-        } else if (b == lim_b) {
-          if (a != c) buf[c] = std::move(buf[a]);
-          a++;
-        } else if (C()(buf[a], buf[b])) {
-          if (a != c) buf[c] = std::move(buf[a]);
-          a++;
-        } else {
-          if (b != c) buf[c] = std::move(buf[b]);
-          b++;
-        }
-      }
-      if (a != lim_a || b != lim_b) throw std::logic_error("inconsistent state");
-    }
+    static void merge_sorted_arrays(T* buf, uint32_t start_a, uint32_t len_a, uint32_t start_b, uint32_t len_b, uint32_t start_c);
 
     // this version is to merge from two different buffers into a third buffer
     // initializes objects is the destination buffer
     // moves objects from buf_a and destroys the originals
     // copies objects from buf_b
     template <typename T, typename C>
-    static void merge_sorted_arrays(const T* buf_a, uint32_t start_a, uint32_t len_a, const T* buf_b, uint32_t start_b, uint32_t len_b, T* buf_c, uint32_t start_c) {
-      const uint32_t len_c = len_a + len_b;
-      const uint32_t lim_a = start_a + len_a;
-      const uint32_t lim_b = start_b + len_b;
-      const uint32_t lim_c = start_c + len_c;
-
-      uint32_t a = start_a;
-      uint32_t b = start_b;
-
-      for (uint32_t c = start_c; c < lim_c; c++) {
-        if (a == lim_a) {
-          new (&buf_c[c]) T(buf_b[b++]);
-        } else if (b == lim_b) {
-          new (&buf_c[c]) T(std::move(buf_a[a]));
-          buf_a[a++].~T();
-        } else if (C()(buf_a[a], buf_b[b])) {
-          new (&buf_c[c]) T(std::move(buf_a[a]));
-          buf_a[a++].~T();
-        } else {
-          new (&buf_c[c]) T(buf_b[b++]);
-        }
-      }
-      if (a != lim_a || b != lim_b) throw std::logic_error("inconsistent state");
-    }
+    static void merge_sorted_arrays(const T* buf_a, uint32_t start_a, uint32_t len_a, const T* buf_b, uint32_t start_b, uint32_t len_b, T* buf_c, uint32_t start_c);
 
     struct compress_result {
       uint8_t final_num_levels;
@@ -257,123 +127,24 @@ class kll_helper {
      */
     template <typename T, typename C>
     static compress_result general_compress(uint16_t k, uint8_t m, uint8_t num_levels_in, T* items,
-            uint32_t* in_levels, uint32_t* out_levels, bool is_level_zero_sorted)
-    {
-      if (num_levels_in == 0) throw std::invalid_argument("num_levels_in == 0"); // things are too weird if zero levels are allowed
-      const uint32_t starting_item_count = in_levels[num_levels_in] - in_levels[0];
-      uint8_t current_num_levels = num_levels_in;
-      uint32_t current_item_count = starting_item_count; // decreases with each compaction
-      uint32_t target_item_count = compute_total_capacity(k, m, current_num_levels); // increases if we add levels
-      bool done_yet = false;
-      out_levels[0] = 0;
-      uint8_t current_level = 0;
-      while (!done_yet) {
-
-        // If we are at the current top level, add an empty level above it for convenience,
-        // but do not increment num_levels until later
-        if (current_level == (current_num_levels - 1)) {
-          in_levels[current_level + 2] = in_levels[current_level + 1];
-        }
-
-        const auto raw_beg = in_levels[current_level];
-        const auto raw_lim = in_levels[current_level + 1];
-        const auto raw_pop = raw_lim - raw_beg;
-
-        if ((current_item_count < target_item_count) or (raw_pop < level_capacity(k, current_num_levels, current_level, m))) {
-          // move level over as is
-          // make sure we are not moving data upwards
-          if (raw_beg < out_levels[current_level]) throw std::logic_error("wrong move");
-          std::move(&items[raw_beg], &items[raw_lim], &items[out_levels[current_level]]);
-          out_levels[current_level + 1] = out_levels[current_level] + raw_pop;
-        } else {
-          // The sketch is too full AND this level is too full, so we compact it
-          // Note: this can add a level and thus change the sketches capacities
-
-          const auto pop_above = in_levels[current_level + 2] - raw_lim;
-          const bool odd_pop = is_odd(raw_pop);
-          const auto adj_beg = odd_pop ? 1 + raw_beg : raw_beg;
-          const auto adj_pop = odd_pop ? raw_pop - 1 : raw_pop;
-          const auto half_adj_pop = adj_pop / 2;
-
-          if (odd_pop) { // move one guy over
-            items[out_levels[current_level]] = std::move(items[raw_beg]);
-            out_levels[current_level + 1] = out_levels[current_level] + 1;
-          } else { // even number of items
-            out_levels[current_level + 1] = out_levels[current_level];
-          }
-
-          // level zero might not be sorted, so we must sort it if we wish to compact it
-          if ((current_level == 0) and !is_level_zero_sorted) {
-            std::sort(&items[adj_beg], &items[adj_beg + adj_pop], C());
-          }
-
-          if (pop_above == 0) { // Level above is empty, so halve up
-            randomly_halve_up(items, adj_beg, adj_pop);
-          } else { // Level above is nonempty, so halve down, then merge up
-            randomly_halve_down(items, adj_beg, adj_pop);
-            merge_sorted_arrays<T, C>(items, adj_beg, half_adj_pop, raw_lim, pop_above, adj_beg + half_adj_pop);
-          }
-
-          // track the fact that we just eliminated some data
-          current_item_count -= half_adj_pop;
-
-          // adjust the boundaries of the level above
-          in_levels[current_level + 1] = in_levels[current_level + 1] - half_adj_pop;
-
-          // increment num_levels if we just compacted the old top level
-          // this creates some more capacity (the size of the new bottom level)
-          if (current_level == (current_num_levels - 1)) {
-            current_num_levels++;
-            target_item_count += level_capacity(k, current_num_levels, 0, m);
-          }
-
-        } // end of code for compacting a level
-
-        // determine whether we have processed all levels yet (including any new levels that we created)
-
-        if (current_level == (current_num_levels - 1)) done_yet = true;
-        current_level++;
-      } // end of loop over levels
-
-      if ((out_levels[current_num_levels] - out_levels[0]) != current_item_count) throw std::logic_error("inconsistent state");
-
-      for (uint32_t i = current_item_count; i < starting_item_count; i++) items[i].~T();
-
-      compress_result result;
-      result.final_num_levels = current_num_levels;
-      result.final_capacity = target_item_count;
-      result.final_num_items = current_item_count;
-      return result;
-    }
+            uint32_t* in_levels, uint32_t* out_levels, bool is_level_zero_sorted);
 
     template<typename T>
-    static void copy_construct(const T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first) {
-      while (src_first != src_last) {
-        new (&dst[dst_first++]) T(src[src_first++]);
-      }
-    }
+    static void copy_construct(const T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first);
 
     template<typename T>
-    static void move_construct(T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first, bool destroy) {
-      while (src_first != src_last) {
-        new (&dst[dst_first++]) T(std::move(src[src_first]));
-        if (destroy) src[src_first].~T();
-        src_first++;
-      }
-    }
+    static void move_construct(T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first, bool destroy);
 
 #ifdef KLL_VALIDATION
   private:
 
-    static uint32_t deterministic_offset() {
-      const uint32_t result(kll_next_offset);
-      kll_next_offset = 1 - kll_next_offset;
-      return result;
-    }
+    static inline uint32_t deterministic_offset();
 #endif
 
 };
 
 } /* namespace datasketches */
 
+#include "kll_helper_impl.hpp"
+
 #endif // KLL_HELPER_HPP_
diff --git a/kll/include/kll_helper_impl.hpp b/kll/include/kll_helper_impl.hpp
new file mode 100644
index 0000000..bc8435f
--- /dev/null
+++ b/kll/include/kll_helper_impl.hpp
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_HELPER_IMPL_HPP_
+#define KLL_HELPER_IMPL_HPP_
+
+#include <algorithm>
+
+namespace datasketches {
+
+bool kll_helper::is_even(uint32_t value) {
+  return (value & 1) == 0;
+}
+
+bool kll_helper::is_odd(uint32_t value) {
+  return (value & 1) > 0;
+}
+
+uint8_t kll_helper::floor_of_log2_of_fraction(uint64_t numer, uint64_t denom) {
+  if (denom > numer) return 0;
+  uint8_t count = 0;
+  while (true) {
+    denom <<= 1;
+    if (denom > numer) return count;
+    count++;
+  }
+}
+
+uint8_t kll_helper::ub_on_num_levels(uint64_t n) {
+  if (n == 0) return 1;
+  return 1 + floor_of_log2_of_fraction(n, 1);
+}
+
+uint32_t kll_helper::compute_total_capacity(uint16_t k, uint8_t m, uint8_t num_levels) {
+  uint32_t total = 0;
+  for (uint8_t h = 0; h < num_levels; h++) {
+    total += level_capacity(k, num_levels, h, m);
+  }
+  return total;
+}
+
+uint32_t kll_helper::level_capacity(uint16_t k, uint8_t numLevels, uint8_t height, uint8_t min_wid) {
+  if (height >= numLevels) throw std::invalid_argument("height >= numLevels");
+  const uint8_t depth = numLevels - height - 1;
+  return std::max((uint32_t) min_wid, int_cap_aux(k, depth));
+}
+
+uint32_t kll_helper::int_cap_aux(uint16_t k, uint8_t depth) {
+  if (depth > 60) throw std::invalid_argument("depth > 60");
+  if (depth <= 30) return int_cap_aux_aux(k, depth);
+  const uint8_t half = depth / 2;
+  const uint8_t rest = depth - half;
+  const uint32_t tmp = int_cap_aux_aux(k, half);
+  return int_cap_aux_aux(tmp, rest);
+}
+
+uint32_t kll_helper::int_cap_aux_aux(uint16_t k, uint8_t depth) {
+  if (depth > 30) throw std::invalid_argument("depth > 30");
+  const uint64_t twok = k << 1; // for rounding, we pre-multiply by 2
+  const uint64_t tmp = (uint64_t) (((uint64_t) twok << depth) / powers_of_three[depth]);
+  const uint64_t result = (tmp + 1) >> 1; // then here we add 1 and divide by 2
+  if (result > k) throw std::logic_error("result > k");
+  return result;
+}
+
+uint64_t kll_helper::sum_the_sample_weights(uint8_t num_levels, const uint32_t* levels) {
+  uint64_t total = 0;
+  uint64_t weight = 1;
+  for (uint8_t lvl = 0; lvl < num_levels; lvl++) {
+    total += weight * (levels[lvl + 1] - levels[lvl]);
+    weight *= 2;
+  }
+  return total;
+}
+
+template <typename T>
+void kll_helper::randomly_halve_down(T* buf, uint32_t start, uint32_t length) {
+  if (!is_even(length)) throw std::invalid_argument("length must be even");
+  const uint32_t half_length = length / 2;
+#ifdef KLL_VALIDATION
+  const uint32_t offset = deterministic_offset();
+#else
+  const uint32_t offset = random_bit();
+#endif
+  uint32_t j = start + offset;
+  for (uint32_t i = start; i < (start + half_length); i++) {
+    if (i != j) buf[i] = std::move(buf[j]);
+    j += 2;
+  }
+}
+
+template <typename T>
+void kll_helper::randomly_halve_up(T* buf, uint32_t start, uint32_t length) {
+  if (!is_even(length)) throw std::invalid_argument("length must be even");
+  const uint32_t half_length = length / 2;
+#ifdef KLL_VALIDATION
+  const uint32_t offset = deterministic_offset();
+#else
+  const uint32_t offset = random_bit();
+#endif
+  uint32_t j = (start + length) - 1 - offset;
+  for (uint32_t i = (start + length) - 1; i >= (start + half_length); i--) {
+    if (i != j) buf[i] = std::move(buf[j]);
+    j -= 2;
+  }
+}
+
+// this version moves objects within the same buffer
+// assumes that destination has initialized objects
+// does not destroy the originals after the move
+template <typename T, typename C>
+void kll_helper::merge_sorted_arrays(T* buf, uint32_t start_a, uint32_t len_a, uint32_t start_b, uint32_t len_b, uint32_t start_c) {
+  const uint32_t len_c = len_a + len_b;
+  const uint32_t lim_a = start_a + len_a;
+  const uint32_t lim_b = start_b + len_b;
+  const uint32_t lim_c = start_c + len_c;
+
+  uint32_t a = start_a;
+  uint32_t b = start_b;
+
+  for (uint32_t c = start_c; c < lim_c; c++) {
+    if (a == lim_a) {
+      if (b != c) buf[c] = std::move(buf[b]);
+      b++;
+    } else if (b == lim_b) {
+      if (a != c) buf[c] = std::move(buf[a]);
+      a++;
+    } else if (C()(buf[a], buf[b])) {
+      if (a != c) buf[c] = std::move(buf[a]);
+      a++;
+    } else {
+      if (b != c) buf[c] = std::move(buf[b]);
+      b++;
+    }
+  }
+  if (a != lim_a || b != lim_b) throw std::logic_error("inconsistent state");
+}
+
+// this version is to merge from two different buffers into a third buffer
+// initializes objects is the destination buffer
+// moves objects from buf_a and destroys the originals
+// copies objects from buf_b
+template <typename T, typename C>
+void kll_helper::merge_sorted_arrays(const T* buf_a, uint32_t start_a, uint32_t len_a, const T* buf_b, uint32_t start_b, uint32_t len_b, T* buf_c, uint32_t start_c) {
+  const uint32_t len_c = len_a + len_b;
+  const uint32_t lim_a = start_a + len_a;
+  const uint32_t lim_b = start_b + len_b;
+  const uint32_t lim_c = start_c + len_c;
+
+  uint32_t a = start_a;
+  uint32_t b = start_b;
+
+  for (uint32_t c = start_c; c < lim_c; c++) {
+    if (a == lim_a) {
+      new (&buf_c[c]) T(buf_b[b++]);
+    } else if (b == lim_b) {
+      new (&buf_c[c]) T(std::move(buf_a[a]));
+      buf_a[a++].~T();
+    } else if (C()(buf_a[a], buf_b[b])) {
+      new (&buf_c[c]) T(std::move(buf_a[a]));
+      buf_a[a++].~T();
+    } else {
+      new (&buf_c[c]) T(buf_b[b++]);
+    }
+  }
+  if (a != lim_a || b != lim_b) throw std::logic_error("inconsistent state");
+}
+
+/*
+ * Here is what we do for each level:
+ * If it does not need to be compacted, then simply copy it over.
+ *
+ * Otherwise, it does need to be compacted, so...
+ *   Copy zero or one guy over.
+ *   If the level above is empty, halve up.
+ *   Else the level above is nonempty, so...
+ *        halve down, then merge up.
+ *   Adjust the boundaries of the level above.
+ *
+ * It can be proved that general_compress returns a sketch that satisfies the space constraints
+ * no matter how much data is passed in.
+ * All levels except for level zero must be sorted before calling this, and will still be
+ * sorted afterwards.
+ * Level zero is not required to be sorted before, and may not be sorted afterwards.
+ */
+template <typename T, typename C>
+kll_helper::compress_result kll_helper::general_compress(uint16_t k, uint8_t m, uint8_t num_levels_in, T* items,
+        uint32_t* in_levels, uint32_t* out_levels, bool is_level_zero_sorted)
+{
+  if (num_levels_in == 0) throw std::invalid_argument("num_levels_in == 0"); // things are too weird if zero levels are allowed
+  const uint32_t starting_item_count = in_levels[num_levels_in] - in_levels[0];
+  uint8_t current_num_levels = num_levels_in;
+  uint32_t current_item_count = starting_item_count; // decreases with each compaction
+  uint32_t target_item_count = compute_total_capacity(k, m, current_num_levels); // increases if we add levels
+  bool done_yet = false;
+  out_levels[0] = 0;
+  uint8_t current_level = 0;
+  while (!done_yet) {
+
+    // If we are at the current top level, add an empty level above it for convenience,
+    // but do not increment num_levels until later
+    if (current_level == (current_num_levels - 1)) {
+      in_levels[current_level + 2] = in_levels[current_level + 1];
+    }
+
+    const auto raw_beg = in_levels[current_level];
+    const auto raw_lim = in_levels[current_level + 1];
+    const auto raw_pop = raw_lim - raw_beg;
+
+    if ((current_item_count < target_item_count) or (raw_pop < level_capacity(k, current_num_levels, current_level, m))) {
+      // move level over as is
+      // make sure we are not moving data upwards
+      if (raw_beg < out_levels[current_level]) throw std::logic_error("wrong move");
+      std::move(&items[raw_beg], &items[raw_lim], &items[out_levels[current_level]]);
+      out_levels[current_level + 1] = out_levels[current_level] + raw_pop;
+    } else {
+      // The sketch is too full AND this level is too full, so we compact it
+      // Note: this can add a level and thus change the sketches capacities
+
+      const auto pop_above = in_levels[current_level + 2] - raw_lim;
+      const bool odd_pop = is_odd(raw_pop);
+      const auto adj_beg = odd_pop ? 1 + raw_beg : raw_beg;
+      const auto adj_pop = odd_pop ? raw_pop - 1 : raw_pop;
+      const auto half_adj_pop = adj_pop / 2;
+
+      if (odd_pop) { // move one guy over
+        items[out_levels[current_level]] = std::move(items[raw_beg]);
+        out_levels[current_level + 1] = out_levels[current_level] + 1;
+      } else { // even number of items
+        out_levels[current_level + 1] = out_levels[current_level];
+      }
+
+      // level zero might not be sorted, so we must sort it if we wish to compact it
+      if ((current_level == 0) and !is_level_zero_sorted) {
+        std::sort(&items[adj_beg], &items[adj_beg + adj_pop], C());
+      }
+
+      if (pop_above == 0) { // Level above is empty, so halve up
+        randomly_halve_up(items, adj_beg, adj_pop);
+      } else { // Level above is nonempty, so halve down, then merge up
+        randomly_halve_down(items, adj_beg, adj_pop);
+        merge_sorted_arrays<T, C>(items, adj_beg, half_adj_pop, raw_lim, pop_above, adj_beg + half_adj_pop);
+      }
+
+      // track the fact that we just eliminated some data
+      current_item_count -= half_adj_pop;
+
+      // adjust the boundaries of the level above
+      in_levels[current_level + 1] = in_levels[current_level + 1] - half_adj_pop;
+
+      // increment num_levels if we just compacted the old top level
+      // this creates some more capacity (the size of the new bottom level)
+      if (current_level == (current_num_levels - 1)) {
+        current_num_levels++;
+        target_item_count += level_capacity(k, current_num_levels, 0, m);
+      }
+
+    } // end of code for compacting a level
+
+    // determine whether we have processed all levels yet (including any new levels that we created)
+
+    if (current_level == (current_num_levels - 1)) done_yet = true;
+    current_level++;
+  } // end of loop over levels
+
+  if ((out_levels[current_num_levels] - out_levels[0]) != current_item_count) throw std::logic_error("inconsistent state");
+
+  for (uint32_t i = current_item_count; i < starting_item_count; i++) items[i].~T();
+
+  compress_result result;
+  result.final_num_levels = current_num_levels;
+  result.final_capacity = target_item_count;
+  result.final_num_items = current_item_count;
+  return result;
+}
+
+template<typename T>
+void kll_helper::copy_construct(const T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first) {
+  while (src_first != src_last) {
+    new (&dst[dst_first++]) T(src[src_first++]);
+  }
+}
+
+template<typename T>
+void kll_helper::move_construct(T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first, bool destroy) {
+  while (src_first != src_last) {
+    new (&dst[dst_first++]) T(std::move(src[src_first]));
+    if (destroy) src[src_first].~T();
+    src_first++;
+  }
+}
+
+#ifdef KLL_VALIDATION
+uint32_t kll_helper::deterministic_offset() {
+  const uint32_t result(kll_next_offset);
+  kll_next_offset = 1 - kll_next_offset;
+  return result;
+}
+#endif
+
+} /* namespace datasketches */
+
+#endif // KLL_HELPER_IMPL_HPP_
diff --git a/kll/include/kll_quantile_calculator.hpp b/kll/include/kll_quantile_calculator.hpp
index 91c906b..f77071e 100644
--- a/kll/include/kll_quantile_calculator.hpp
+++ b/kll/include/kll_quantile_calculator.hpp
@@ -21,10 +21,6 @@
 #define KLL_QUANTILE_CALCULATOR_HPP_
 
 #include <memory>
-#include <cmath>
-#include <assert.h>
-
-#include "kll_helper.hpp"
 
 namespace datasketches {
 
@@ -34,29 +30,9 @@ class kll_quantile_calculator {
   typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64;
   public:
     // assumes that all levels are sorted including level 0
-    kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n) {
-      n_ = n;
-      const uint32_t num_items = levels[num_levels] - levels[0];
-      items_ = A().allocate(num_items);
-      weights_ = AllocU64().allocate(num_items + 1); // one more is intentional
-      levels_size_ = num_levels + 1;
-      levels_ = AllocU32().allocate(levels_size_);
-      populate_from_sketch(items, num_items, levels, num_levels);
-      blocky_tandem_merge_sort(items_, weights_, num_items, levels_, num_levels_);
-      convert_to_preceding_cummulative(weights_, num_items + 1);
-    }
-
-    ~kll_quantile_calculator() {
-      const uint32_t num_items = levels_[num_levels_] - levels_[0];
-      for (uint32_t i = 0; i < num_items; i++) items_[i].~T();
-      A().deallocate(items_, num_items);
-      AllocU64().deallocate(weights_, num_items + 1);
-      AllocU32().deallocate(levels_, levels_size_);
-    }
-
-    T get_quantile(double fraction) const {
-      return approximately_answer_positional_query(pos_of_phi(fraction, n_));
-    }
+    kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n);
+    ~kll_quantile_calculator();
+    T get_quantile(double fraction) const;
 
   private:
     uint64_t n_;
@@ -66,130 +42,19 @@ class kll_quantile_calculator {
     uint8_t levels_size_;
     uint8_t num_levels_;
 
-    void populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
-      kll_helper::copy_construct<T>(items, levels[0], levels[num_levels], items_, 0);
-      uint8_t src_level = 0;
-      uint8_t dst_level = 0;
-      uint64_t weight = 1;
-      uint32_t offset = levels[0];
-      while (src_level < num_levels) {
-        const uint32_t from_index(levels[src_level] - offset);
-        const uint32_t to_index(levels[src_level + 1] - offset); // exclusive
-        if (from_index < to_index) { // skip empty levels
-          std::fill(&weights_[from_index], &weights_[to_index], weight);
-          levels_[dst_level] = from_index;
-          levels_[dst_level + 1] = to_index;
-          dst_level++;
-        }
-        src_level++;
-        weight *= 2;
-      }
-      weights_[num_items] = 0;
-      num_levels_ = dst_level;
-    }
-
-    T approximately_answer_positional_query(uint64_t pos) const {
-      assert (pos < n_);
-      const uint32_t weights_size(levels_[num_levels_] + 1);
-      const uint32_t index = chunk_containing_pos(weights_, weights_size, pos);
-      return items_[index];
-    }
-
-    static void convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size) {
-      uint64_t subtotal(0);
-      for (uint32_t i = 0; i < weights_size; i++) {
-        const uint32_t new_subtotal = subtotal + weights[i];
-        weights[i] = subtotal;
-        subtotal = new_subtotal;
-      }
-    }
-
-    static uint64_t pos_of_phi(double phi, uint64_t n) {
-      const uint64_t pos = std::floor(phi * n);
-      return (pos == n) ? n - 1 : pos;
-    }
-
-    static uint32_t chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos) {
-      assert (weights_size > 1); // remember, weights_ contains an "extra" position
-      const uint32_t nominal_length(weights_size - 1);
-      assert (weights[0] <= pos);
-      assert (pos < weights[nominal_length]);
-      return search_for_chunk_containing_pos(weights, pos, 0, nominal_length);
-    }
-
-    static uint32_t search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r) {
-      if (l + 1 == r) {
-        return l;
-      }
-      const uint32_t m(l + (r - l) / 2);
-      if (arr[m] <= pos) {
-        return search_for_chunk_containing_pos(arr, pos, m, r);
-      }
-      return search_for_chunk_containing_pos(arr, pos, l, m);
-    }
-
-    static void blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
-      if (num_levels == 1) return;
-
-      // move the input in preparation for the "ping-pong" reduction strategy
-      auto tmp_items_deleter = [num_items](T* ptr) {
-        for (uint32_t i = 0; i < num_items; i++) ptr[i].~T();
-        A().deallocate(ptr, num_items);
-      };
-      std::unique_ptr<T, decltype(tmp_items_deleter)> tmp_items(A().allocate(num_items), tmp_items_deleter);
-      kll_helper::move_construct<T>(items, 0, num_items, tmp_items.get(), 0, false); // do not destroy since the items will be moved back
-      auto tmp_weights_deleter = [num_items](uint64_t* ptr) { AllocU64().deallocate(ptr, num_items); };
-      std::unique_ptr<uint64_t[], decltype(tmp_weights_deleter)> tmp_weights(AllocU64().allocate(num_items), tmp_weights_deleter); // don't need the extra one here
-      std::copy(weights, &weights[num_items], tmp_weights.get());
-      blocky_tandem_merge_sort_recursion(tmp_items.get(), tmp_weights.get(), items, weights, levels, 0, num_levels);
-    }
-
-    static void blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels) {
-      if (num_levels == 1) return;
-      const uint8_t num_levels_1 = num_levels / 2;
-      const uint8_t num_levels_2 = num_levels - num_levels_1;
-      assert (num_levels_1 >= 1);
-      assert (num_levels_2 >= num_levels_1);
-      const uint8_t starting_level_1 = starting_level;
-      const uint8_t starting_level_2 = starting_level + num_levels_1;
-      // swap roles of src and dst
-      blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_1, num_levels_1);
-      blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_2, num_levels_2);
-      tandem_merge(items_src, weights_src, items_dst, weights_dst, levels, starting_level_1, num_levels_1, starting_level_2, num_levels_2);
-    }
-
-    static void tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2) {
-      const auto from_index_1 = levels[starting_level_1];
-      const auto to_index_1 = levels[starting_level_1 + num_levels_1]; // exclusive
-      const auto from_index_2 = levels[starting_level_2];
-      const auto to_index_2 = levels[starting_level_2 + num_levels_2]; // exclusive
-      auto i_src_1 = from_index_1;
-      auto i_src_2 = from_index_2;
-      auto i_dst = from_index_1;
-
-      while ((i_src_1 < to_index_1) and (i_src_2 < to_index_2)) {
-        if (C()(items_src[i_src_1], items_src[i_src_2])) {
-          items_dst[i_dst] = std::move(items_src[i_src_1]);
-          weights_dst[i_dst] = weights_src[i_src_1];
-          i_src_1++;
-        } else {
-          items_dst[i_dst] = std::move(items_src[i_src_2]);
-          weights_dst[i_dst] = weights_src[i_src_2];
-          i_src_2++;
-        }
-        i_dst++;
-      }
-      if (i_src_1 < to_index_1) {
-        std::move(&items_src[i_src_1], &items_src[to_index_1], &items_dst[i_dst]);
-        std::copy(&weights_src[i_src_1], &weights_src[to_index_1], &weights_dst[i_dst]);
-      } else if (i_src_2 < to_index_2) {
-        std::move(&items_src[i_src_2], &items_src[to_index_2], &items_dst[i_dst]);
-        std::copy(&weights_src[i_src_2], &weights_src[to_index_2], &weights_dst[i_dst]);
-      }
-    }
-
+    void populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
+    T approximately_answer_positional_query(uint64_t pos) const;
+    static void convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size);
+    static uint64_t pos_of_phi(double phi, uint64_t n);
+    static uint32_t chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos);
+    static uint32_t search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r);
+    static void blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
+    static void blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
+    static void tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2);
 };
 
 } /* namespace datasketches */
 
+#include "kll_quantile_calculator_impl.hpp"
+
 #endif // KLL_QUANTILE_CALCULATOR_HPP_
diff --git a/kll/include/kll_quantile_calculator_impl.hpp b/kll/include/kll_quantile_calculator_impl.hpp
new file mode 100644
index 0000000..16dc06f
--- /dev/null
+++ b/kll/include/kll_quantile_calculator_impl.hpp
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_QUANTILE_CALCULATOR_IMPL_HPP_
+#define KLL_QUANTILE_CALCULATOR_IMPL_HPP_
+
+#include <memory>
+#include <cmath>
+#include <assert.h>
+
+#include "kll_helper.hpp"
+
+namespace datasketches {
+
+template <typename T, typename C, typename A>
+kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n) {
+  n_ = n;
+  const uint32_t num_items = levels[num_levels] - levels[0];
+  items_ = A().allocate(num_items);
+  weights_ = AllocU64().allocate(num_items + 1); // one more is intentional
+  levels_size_ = num_levels + 1;
+  levels_ = AllocU32().allocate(levels_size_);
+  populate_from_sketch(items, num_items, levels, num_levels);
+  blocky_tandem_merge_sort(items_, weights_, num_items, levels_, num_levels_);
+  convert_to_preceding_cummulative(weights_, num_items + 1);
+}
+
+template <typename T, typename C, typename A>
+kll_quantile_calculator<T, C, A>::~kll_quantile_calculator() {
+  const uint32_t num_items = levels_[num_levels_] - levels_[0];
+  for (uint32_t i = 0; i < num_items; i++) items_[i].~T();
+  A().deallocate(items_, num_items);
+  AllocU64().deallocate(weights_, num_items + 1);
+  AllocU32().deallocate(levels_, levels_size_);
+}
+
+template <typename T, typename C, typename A>
+T kll_quantile_calculator<T, C, A>::get_quantile(double fraction) const {
+  return approximately_answer_positional_query(pos_of_phi(fraction, n_));
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
+  kll_helper::copy_construct<T>(items, levels[0], levels[num_levels], items_, 0);
+  uint8_t src_level = 0;
+  uint8_t dst_level = 0;
+  uint64_t weight = 1;
+  uint32_t offset = levels[0];
+  while (src_level < num_levels) {
+    const uint32_t from_index(levels[src_level] - offset);
+    const uint32_t to_index(levels[src_level + 1] - offset); // exclusive
+    if (from_index < to_index) { // skip empty levels
+      std::fill(&weights_[from_index], &weights_[to_index], weight);
+      levels_[dst_level] = from_index;
+      levels_[dst_level + 1] = to_index;
+      dst_level++;
+    }
+    src_level++;
+    weight *= 2;
+  }
+  weights_[num_items] = 0;
+  num_levels_ = dst_level;
+}
+
+template <typename T, typename C, typename A>
+T kll_quantile_calculator<T, C, A>::approximately_answer_positional_query(uint64_t pos) const {
+  assert (pos < n_);
+  const uint32_t weights_size(levels_[num_levels_] + 1);
+  const uint32_t index = chunk_containing_pos(weights_, weights_size, pos);
+  return items_[index];
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size) {
+  uint64_t subtotal(0);
+  for (uint32_t i = 0; i < weights_size; i++) {
+    const uint32_t new_subtotal = subtotal + weights[i];
+    weights[i] = subtotal;
+    subtotal = new_subtotal;
+  }
+}
+
+template <typename T, typename C, typename A>
+uint64_t kll_quantile_calculator<T, C, A>::pos_of_phi(double phi, uint64_t n) {
+  const uint64_t pos = std::floor(phi * n);
+  return (pos == n) ? n - 1 : pos;
+}
+
+template <typename T, typename C, typename A>
+uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos) {
+  assert (weights_size > 1); // remember, weights_ contains an "extra" position
+  const uint32_t nominal_length(weights_size - 1);
+  assert (weights[0] <= pos);
+  assert (pos < weights[nominal_length]);
+  return search_for_chunk_containing_pos(weights, pos, 0, nominal_length);
+}
+
+template <typename T, typename C, typename A>
+uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r) {
+  if (l + 1 == r) {
+    return l;
+  }
+  const uint32_t m(l + (r - l) / 2);
+  if (arr[m] <= pos) {
+    return search_for_chunk_containing_pos(arr, pos, m, r);
+  }
+  return search_for_chunk_containing_pos(arr, pos, l, m);
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
+  if (num_levels == 1) return;
+
+  // move the input in preparation for the "ping-pong" reduction strategy
+  auto tmp_items_deleter = [num_items](T* ptr) {
+    for (uint32_t i = 0; i < num_items; i++) ptr[i].~T();
+    A().deallocate(ptr, num_items);
+  };
+  std::unique_ptr<T, decltype(tmp_items_deleter)> tmp_items(A().allocate(num_items), tmp_items_deleter);
+  kll_helper::move_construct<T>(items, 0, num_items, tmp_items.get(), 0, false); // do not destroy since the items will be moved back
+  auto tmp_weights_deleter = [num_items](uint64_t* ptr) { AllocU64().deallocate(ptr, num_items); };
+  std::unique_ptr<uint64_t[], decltype(tmp_weights_deleter)> tmp_weights(AllocU64().allocate(num_items), tmp_weights_deleter); // don't need the extra one here
+  std::copy(weights, &weights[num_items], tmp_weights.get());
+  blocky_tandem_merge_sort_recursion(tmp_items.get(), tmp_weights.get(), items, weights, levels, 0, num_levels);
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels) {
+  if (num_levels == 1) return;
+  const uint8_t num_levels_1 = num_levels / 2;
+  const uint8_t num_levels_2 = num_levels - num_levels_1;
+  assert (num_levels_1 >= 1);
+  assert (num_levels_2 >= num_levels_1);
+  const uint8_t starting_level_1 = starting_level;
+  const uint8_t starting_level_2 = starting_level + num_levels_1;
+  // swap roles of src and dst
+  blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_1, num_levels_1);
+  blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_2, num_levels_2);
+  tandem_merge(items_src, weights_src, items_dst, weights_dst, levels, starting_level_1, num_levels_1, starting_level_2, num_levels_2);
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2) {
+  const auto from_index_1 = levels[starting_level_1];
+  const auto to_index_1 = levels[starting_level_1 + num_levels_1]; // exclusive
+  const auto from_index_2 = levels[starting_level_2];
+  const auto to_index_2 = levels[starting_level_2 + num_levels_2]; // exclusive
+  auto i_src_1 = from_index_1;
+  auto i_src_2 = from_index_2;
+  auto i_dst = from_index_1;
+
+  while ((i_src_1 < to_index_1) and (i_src_2 < to_index_2)) {
+    if (C()(items_src[i_src_1], items_src[i_src_2])) {
+      items_dst[i_dst] = std::move(items_src[i_src_1]);
+      weights_dst[i_dst] = weights_src[i_src_1];
+      i_src_1++;
+    } else {
+      items_dst[i_dst] = std::move(items_src[i_src_2]);
+      weights_dst[i_dst] = weights_src[i_src_2];
+      i_src_2++;
+    }
+    i_dst++;
+  }
+  if (i_src_1 < to_index_1) {
+    std::move(&items_src[i_src_1], &items_src[to_index_1], &items_dst[i_dst]);
+    std::copy(&weights_src[i_src_1], &weights_src[to_index_1], &weights_dst[i_dst]);
+  } else if (i_src_2 < to_index_2) {
+    std::move(&items_src[i_src_2], &items_src[to_index_2], &items_dst[i_dst]);
+    std::copy(&weights_src[i_src_2], &weights_src[to_index_2], &weights_dst[i_dst]);
+  }
+}
+
+} /* namespace datasketches */
+
+#endif // KLL_QUANTILE_CALCULATOR_IMPL_HPP_
diff --git a/kll/include/kll_sketch.hpp b/kll/include/kll_sketch.hpp
index 7fc436f..141f55c 100644
--- a/kll/include/kll_sketch.hpp
+++ b/kll/include/kll_sketch.hpp
@@ -20,12 +20,8 @@
 #ifndef KLL_SKETCH_HPP_
 #define KLL_SKETCH_HPP_
 
-#include <memory>
-#include <limits>
-#include <iostream>
-#include <iomanip>
 #include <functional>
-#include <cstring>
+#include <memory>
 #include <vector>
 
 #if defined(_MSC_VER)
@@ -33,7 +29,6 @@
 #endif // _MSC_VER
 
 #include "kll_quantile_calculator.hpp"
-#include "kll_helper.hpp"
 #include "serde.hpp"
 
 namespace datasketches {
@@ -244,7 +239,7 @@ class kll_sketch {
     uint8_t get_num_levels() { return num_levels_; }
     uint32_t* get_levels() { return levels_; }
     T* get_items() { return items_; }
-#endif
+    #endif
 
   private:
     /* Serialized sketch layout:
@@ -352,946 +347,8 @@ private:
   const_iterator(const T* items, const uint32_t* levels, const uint8_t num_levels);
 };
 
-// kll_sketch implementation
-
-template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A>::kll_sketch(uint16_t k) {
-  if (k < MIN_K or k > MAX_K) {
-    throw std::invalid_argument("K must be >= " + std::to_string(MIN_K) + " and <= " + std::to_string(MAX_K) + ": " + std::to_string(k));
-  }
-  k_ = k;
-  m_ = DEFAULT_M;
-  min_k_ = k;
-  n_ = 0;
-  num_levels_ = 1;
-  levels_size_ = 2;
-  levels_ = new (AllocU32().allocate(2)) uint32_t[2] {k_, k_};
-  items_size_ = k_;
-  items_ = A().allocate(items_size_);
-  min_value_ = A().allocate(1);
-  max_value_ = A().allocate(1);
-  is_level_zero_sorted_ = false;
-}
-
-template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A>::kll_sketch(const kll_sketch& other):
-k_(other.k_),
-m_(other.m_),
-min_k_(other.min_k_),
-n_(other.n_),
-num_levels_(other.num_levels_),
-levels_(nullptr),
-levels_size_(other.levels_size_),
-items_(nullptr),
-items_size_(other.items_size_),
-min_value_(nullptr),
-max_value_(nullptr),
-is_level_zero_sorted_(other.is_level_zero_sorted_)
-{
-  levels_ = AllocU32().allocate(levels_size_);
-  std::copy(&other.levels_[0], &other.levels_[levels_size_], levels_);
-  items_ = A().allocate(items_size_);
-  std::copy(other.items_, &other.items_[items_size_], items_);
-  min_value_ = new (A().allocate(1)) T(*other.min_value_);
-  max_value_ = new (A().allocate(1)) T(*other.max_value_);
-}
-
-template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A>::kll_sketch(kll_sketch&& other) noexcept:
-k_(other.k_),
-m_(other.m_),
-min_k_(other.min_k_),
-n_(other.n_),
-num_levels_(other.num_levels_),
-levels_(other.levels_),
-levels_size_(other.levels_size_),
-items_(other.items_),
-items_size_(other.items_size_),
-min_value_(other.min_value_),
-max_value_(other.max_value_),
-is_level_zero_sorted_(other.is_level_zero_sorted_)
-{
-  other.levels_ = nullptr;
-  other.items_ = nullptr;
-  other.min_value_ = nullptr;
-  other.max_value_ = nullptr;
-}
-
-template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A>& kll_sketch<T, C, S, A>::operator=(const kll_sketch& other) {
-  kll_sketch<T, C, S, A> copy(other);
-  std::swap(k_, copy.k_);
-  std::swap(m_, copy.m_);
-  std::swap(min_k_, copy.min_k_);
-  std::swap(n_, copy.n_);
-  std::swap(num_levels_, copy.num_levels_);
-  std::swap(levels_, copy.levels_);
-  std::swap(levels_size_, copy.levels_size_);
-  std::swap(items_, copy.items_);
-  std::swap(items_size_, copy.items_size_);
-  std::swap(min_value_, copy.min_value_);
-  std::swap(max_value_, copy.max_value_);
-  std::swap(is_level_zero_sorted_, copy.is_level_zero_sorted_);
-  return *this;
-}
-
-template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A>& kll_sketch<T, C, S, A>::operator=(kll_sketch&& other) {
-  std::swap(k_, other.k_);
-  std::swap(m_, other.m_);
-  std::swap(min_k_, other.min_k_);
-  std::swap(n_, other.n_);
-  std::swap(num_levels_, other.num_levels_);
-  std::swap(levels_, other.levels_);
-  std::swap(levels_size_, other.levels_size_);
-  std::swap(items_, other.items_);
-  std::swap(items_size_, other.items_size_);
-  std::swap(min_value_, other.min_value_);
-  std::swap(max_value_, other.max_value_);
-  std::swap(is_level_zero_sorted_, other.is_level_zero_sorted_);
-  return *this;
-}
-
-template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A>::~kll_sketch() {
-  if (levels_ != nullptr) AllocU32().deallocate(levels_, levels_size_);
-  if (items_ != nullptr) {
-    const uint32_t begin = levels_[0];
-    const uint32_t end = begin + get_num_retained();
-    for (uint32_t i = begin; i < end; i++) items_[i].~T();
-    A().deallocate(items_, items_size_);
-  }
-  if (min_value_ != nullptr) {
-    min_value_->~T();
-    A().deallocate(min_value_, 1);
-  }
-  if (max_value_ != nullptr) {
-    max_value_->~T();
-    A().deallocate(max_value_, 1);
-  }
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::update(const T& value) {
-  const uint32_t index = internal_update(value);
-  new (&items_[index]) T(value);
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::update(T&& value) {
-  const uint32_t index = internal_update(value);
-  new (&items_[index]) T(std::move(value));
-}
-
-template<typename T, typename C, typename S, typename A>
-uint32_t kll_sketch<T, C, S, A>::internal_update(const T& value) {
-  if (is_empty()) {
-    new (min_value_) T(value);
-    new (max_value_) T(value);
-  } else {
-    if (C()(value, *min_value_)) *min_value_ = value;
-    if (C()(*max_value_, value)) *max_value_ = value;
-  }
-  if (levels_[0] == 0) compress_while_updating();
-  n_++;
-  is_level_zero_sorted_ = false;
-  return --levels_[0];
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::merge(const kll_sketch& other) {
-  if (other.is_empty()) return;
-  if (m_ != other.m_) {
-    throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
-  }
-  const uint64_t final_n = n_ + other.n_;
-  for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
-    update(other.items_[i]);
-  }
-  if (is_empty()) {
-    new (min_value_) T(*other.min_value_);
-    new (max_value_) T(*other.max_value_);
-  } else {
-    if (C()(*other.min_value_, *min_value_)) *min_value_ = *other.min_value_;
-    if (C()(*max_value_, *other.max_value_)) *max_value_ = *other.max_value_;
-  }
-  if (other.num_levels_ >= 2) merge_higher_levels(other, final_n);
-  n_ = final_n;
-  if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
-  assert_correct_total_weight();
-}
-
-template<typename T, typename C, typename S, typename A>
-bool kll_sketch<T, C, S, A>::is_empty() const {
-  return n_ == 0;
-}
-
-template<typename T, typename C, typename S, typename A>
-uint64_t kll_sketch<T, C, S, A>::get_n() const {
-  return n_;
-}
-
-template<typename T, typename C, typename S, typename A>
-uint32_t kll_sketch<T, C, S, A>::get_num_retained() const {
-  return levels_[num_levels_] - levels_[0];
-}
-
-template<typename T, typename C, typename S, typename A>
-bool kll_sketch<T, C, S, A>::is_estimation_mode() const {
-  return num_levels_ > 1;
-}
-
-template<typename T, typename C, typename S, typename A>
-T kll_sketch<T, C, S, A>::get_min_value() const {
-  if (is_empty()) return get_invalid_value();
-  return *min_value_;
-}
-
-template<typename T, typename C, typename S, typename A>
-T kll_sketch<T, C, S, A>::get_max_value() const {
-  if (is_empty()) return get_invalid_value();
-  return *max_value_;
-}
-
-template<typename T, typename C, typename S, typename A>
-T kll_sketch<T, C, S, A>::get_quantile(double fraction) const {
-  if (is_empty()) return get_invalid_value();
-  if (fraction == 0.0) return *min_value_;
-  if (fraction == 1.0) return *max_value_;
-  if ((fraction < 0.0) or (fraction > 1.0)) {
-    throw std::invalid_argument("Fraction cannot be less than zero or greater than 1.0");
-  }
-  // has side effect of sorting level zero if needed
-  auto quantile_calculator(const_cast<kll_sketch*>(this)->get_quantile_calculator());
-  return quantile_calculator->get_quantile(fraction);
-}
-
-template<typename T, typename C, typename S, typename A>
-std::vector<T, A> kll_sketch<T, C, S, A>::get_quantiles(const double* fractions, uint32_t size) const {
-  std::vector<T, A> quantiles;
-  if (is_empty()) return quantiles;
-  std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> quantile_calculator;
-  quantiles.reserve(size);
-  for (uint32_t i = 0; i < size; i++) {
-    const double fraction = fractions[i];
-    if ((fraction < 0.0) or (fraction > 1.0)) {
-      throw std::invalid_argument("Fraction cannot be less than zero or greater than 1.0");
-    }
-    if      (fraction == 0.0) quantiles.push_back(*min_value_);
-    else if (fraction == 1.0) quantiles.push_back(*max_value_);
-    else {
-      if (!quantile_calculator) {
-        // has side effect of sorting level zero if needed
-        quantile_calculator = const_cast<kll_sketch*>(this)->get_quantile_calculator();
-      }
-      quantiles.push_back(quantile_calculator->get_quantile(fraction));
-    }
-  }
-  return quantiles;
-}
-
-template<typename T, typename C, typename S, typename A>
-double kll_sketch<T, C, S, A>::get_rank(const T& value) const {
-  if (is_empty()) return std::numeric_limits<double>::quiet_NaN();
-  uint8_t level(0);
-  uint64_t weight(1);
-  uint64_t total(0);
-  while (level < num_levels_) {
-    const auto from_index(levels_[level]);
-    const auto to_index(levels_[level + 1]); // exclusive
-    for (uint32_t i = from_index; i < to_index; i++) {
-      if (C()(items_[i], value)) {
-        total += weight;
-      } else if ((level > 0) or is_level_zero_sorted_) {
-        break; // levels above 0 are sorted, no point comparing further
-      }
-    }
-    level++;
-    weight *= 2;
-  }
-  return (double) total / n_;
-}
-
-template<typename T, typename C, typename S, typename A>
-vector_d<A> kll_sketch<T, C, S, A>::get_PMF(const T* split_points, uint32_t size) const {
-  return get_PMF_or_CDF(split_points, size, false);
-}
-
-template<typename T, typename C, typename S, typename A>
-vector_d<A> kll_sketch<T, C, S, A>::get_CDF(const T* split_points, uint32_t size) const {
-  return get_PMF_or_CDF(split_points, size, true);
-}
-
-template<typename T, typename C, typename S, typename A>
-double kll_sketch<T, C, S, A>::get_normalized_rank_error(bool pmf) const {
-  return get_normalized_rank_error(min_k_, pmf);
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::serialize(std::ostream& os) const {
-  const bool is_single_item = n_ == 1;
-  const uint8_t preamble_ints(is_empty() or is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
-  os.write((char*)&preamble_ints, sizeof(preamble_ints));
-  const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
-  os.write((char*)&serial_version, sizeof(serial_version));
-  const uint8_t family(FAMILY);
-  os.write((char*)&family, sizeof(family));
-  const uint8_t flags_byte(
-      (is_empty() ? 1 << flags::IS_EMPTY : 0)
-    | (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
-    | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
-  );
-  os.write((char*)&flags_byte, sizeof(flags_byte));
-  os.write((char*)&k_, sizeof(k_));
-  os.write((char*)&m_, sizeof(m_));
-  const uint8_t unused(0);
-  os.write((char*)&unused, sizeof(unused));
-  if (is_empty()) return;
-  if (!is_single_item) {
-    os.write((char*)&n_, sizeof(n_));
-    os.write((char*)&min_k_, sizeof(min_k_));
-    os.write((char*)&num_levels_, sizeof(num_levels_));
-    os.write((char*)&unused, sizeof(unused));
-    os.write((char*)levels_, sizeof(levels_[0]) * num_levels_);
-    S().serialize(os, min_value_, 1);
-    S().serialize(os, max_value_, 1);
-  }
-  S().serialize(os, &items_[levels_[0]], get_num_retained());
-}
-
-template<typename T, typename C, typename S, typename A>
-vector_u8<A> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const {
-  const bool is_single_item = n_ == 1;
-  const size_t size = header_size_bytes + get_serialized_size_bytes();
-  vector_u8<A> bytes(size);
-  uint8_t* ptr = bytes.data() + header_size_bytes;
-  const uint8_t preamble_ints(is_empty() or is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
-  ptr += copy_to_mem(&preamble_ints, ptr, sizeof(preamble_ints));
-  const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
-  ptr += copy_to_mem(&serial_version, ptr, sizeof(serial_version));
-  const uint8_t family(FAMILY);
-  ptr += copy_to_mem(&family, ptr, sizeof(family));
-  const uint8_t flags_byte(
-      (is_empty() ? 1 << flags::IS_EMPTY : 0)
-    | (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
-    | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
-  );
-  ptr += copy_to_mem(&flags_byte, ptr, sizeof(flags_byte));
-  ptr += copy_to_mem(&k_, ptr, sizeof(k_));
-  ptr += copy_to_mem(&m_, ptr, sizeof(m_));
-  const uint8_t unused(0);
-  ptr += copy_to_mem(&unused, ptr, sizeof(unused));
-  if (!is_empty()) {
-    if (!is_single_item) {
-      ptr += copy_to_mem(&n_, ptr, sizeof(n_));
-      ptr += copy_to_mem(&min_k_, ptr, sizeof(min_k_));
-      ptr += copy_to_mem(&num_levels_, ptr, sizeof(num_levels_));
-      ptr += copy_to_mem(&unused, ptr, sizeof(unused));
-      ptr += copy_to_mem(levels_, ptr, sizeof(levels_[0]) * num_levels_);
-      ptr += S().serialize(ptr, min_value_, 1);
-      ptr += S().serialize(ptr, max_value_, 1);
-    }
-    ptr += S().serialize(ptr, &items_[levels_[0]], get_num_retained());
-  }
-  const size_t delta = ptr - bytes.data();
-  if (delta != size) throw std::logic_error("serialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size));
-  return bytes;
-}
-
-template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(std::istream& is) {
-  uint8_t preamble_ints;
-  is.read((char*)&preamble_ints, sizeof(preamble_ints));
-  uint8_t serial_version;
-  is.read((char*)&serial_version, sizeof(serial_version));
-  uint8_t family_id;
-  is.read((char*)&family_id, sizeof(family_id));
-  uint8_t flags_byte;
-  is.read((char*)&flags_byte, sizeof(flags_byte));
-  uint16_t k;
-  is.read((char*)&k, sizeof(k));
-  uint8_t m;
-  is.read((char*)&m, sizeof(m));
-  uint8_t unused;
-  is.read((char*)&unused, sizeof(unused));
-
-  check_m(m);
-  check_preamble_ints(preamble_ints, flags_byte);
-  check_serial_version(serial_version);
-  check_family_id(family_id);
-
-  const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
-  return is_empty ? kll_sketch<T, C, S, A>(k) : kll_sketch<T, C, S, A>(k, flags_byte, is);
-}
-
-template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size) {
-  const char* ptr = static_cast<const char*>(bytes);
-  uint8_t preamble_ints;
-  ptr += copy_from_mem(ptr, &preamble_ints, sizeof(preamble_ints));
-  uint8_t serial_version;
-  ptr += copy_from_mem(ptr, &serial_version, sizeof(serial_version));
-  uint8_t family_id;
-  ptr += copy_from_mem(ptr, &family_id, sizeof(family_id));
-  uint8_t flags_byte;
-  ptr += copy_from_mem(ptr, &flags_byte, sizeof(flags_byte));
-  uint16_t k;
-  ptr += copy_from_mem(ptr, &k, sizeof(k));
-  uint8_t m;
-  ptr += copy_from_mem(ptr, &m, sizeof(m));
-
-  check_m(m);
-  check_preamble_ints(preamble_ints, flags_byte);
-  check_serial_version(serial_version);
-  check_family_id(family_id);
-
-  const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
-  return is_empty ? kll_sketch<T, C, S, A>(k) : kll_sketch<T, C, S, A>(k, flags_byte, bytes, size);
-}
-
-/*
- * Gets the normalized rank error given k and pmf.
- * k - the configuration parameter
- * pmf - if true, returns the "double-sided" normalized rank error for the get_PMF() function.
- * Otherwise, it is the "single-sided" normalized rank error for all the other queries.
- * Constants were derived as the best fit to 99 percentile empirically measured max error in thousands of trials
- */
-template<typename T, typename C, typename S, typename A>
-double kll_sketch<T, C, S, A>::get_normalized_rank_error(uint16_t k, bool pmf) {
-  return pmf
-      ? 2.446 / pow(k, 0.9433)
-      : 2.296 / pow(k, 0.9723);
-}
-
-// for deserialization
-// the common part of the preamble was read and compatibility checks were done
-template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint8_t flags_byte, std::istream& is) {
-  k_ = k;
-  m_ = DEFAULT_M;
-  const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM)); // used in serial version 2
-  if (is_single_item) {
-    n_ = 1;
-    min_k_ = k_;
-    num_levels_ = 1;
-  } else {
-    is.read((char*)&n_, sizeof(n_));
-    is.read((char*)&min_k_, sizeof(min_k_));
-    is.read((char*)&num_levels_, sizeof(num_levels_));
-    uint8_t unused;
-    is.read((char*)&unused, sizeof(unused));
-  }
-  levels_ = AllocU32().allocate(num_levels_ + 1);
-  levels_size_ = num_levels_ + 1;
-  const uint32_t capacity(kll_helper::compute_total_capacity(k_, m_, num_levels_));
-  if (is_single_item) {
-    levels_[0] = capacity - 1;
-  } else {
-    // the last integer in levels_ is not serialized because it can be derived
-    is.read((char*)levels_, sizeof(levels_[0]) * num_levels_);
-  }
-  levels_[num_levels_] = capacity;
-  min_value_ = A().allocate(1);
-  max_value_ = A().allocate(1);
-  if (!is_single_item) {
-    S().deserialize(is, min_value_, 1);
-    S().deserialize(is, max_value_, 1);
-  }
-  items_ = A().allocate(capacity);
-  items_size_ = capacity;
-  const auto num_items = levels_[num_levels_] - levels_[0];
-  S().deserialize(is, &items_[levels_[0]], num_items);
-  if (is_single_item) {
-    new (min_value_) T(items_[levels_[0]]);
-    new (max_value_) T(items_[levels_[0]]);
-  }
-  is_level_zero_sorted_ = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
-}
-
-// for deserialization
-// the common part of the preamble was read and compatibility checks were done
-template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint8_t flags_byte, const void* bytes, size_t size) {
-  k_ = k;
-  m_ = DEFAULT_M;
-  const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM)); // used in serial version 2
-  const char* ptr = static_cast<const char*>(bytes) + DATA_START_SINGLE_ITEM;
-  if (is_single_item) {
-    n_ = 1;
-    min_k_ = k_;
-    num_levels_ = 1;
-  } else {
-    ptr += copy_from_mem(ptr, &n_, sizeof(n_));
-    ptr += copy_from_mem(ptr, &min_k_, sizeof(min_k_));
-    ptr += copy_from_mem(ptr, &num_levels_, sizeof(num_levels_));
-    ptr++; // skip unused byte
-  }
-  levels_ = AllocU32().allocate(num_levels_ + 1);
-  levels_size_ = num_levels_ + 1;
-  const uint32_t capacity(kll_helper::compute_total_capacity(k_, m_, num_levels_));
-  if (is_single_item) {
-    levels_[0] = capacity - 1;
-  } else {
-    // the last integer in levels_ is not serialized because it can be derived
-    ptr += copy_from_mem(ptr, levels_, sizeof(levels_[0]) * num_levels_);
-  }
-  levels_[num_levels_] = capacity;
-  min_value_ = A().allocate(1);
-  max_value_ = A().allocate(1);
-  if (!is_single_item) {
-    ptr += S().deserialize(ptr, min_value_, 1);
-    ptr += S().deserialize(ptr, max_value_, 1);
-  }
-  items_ = A().allocate(capacity);
-  items_size_ = capacity;
-  const auto num_items(levels_[num_levels_] - levels_[0]);
-  ptr += S().deserialize(ptr, &items_[levels_[0]], num_items);
-  if (is_single_item) {
-    new (min_value_) T(items_[levels_[0]]);
-    new (max_value_) T(items_[levels_[0]]);
-  }
-  is_level_zero_sorted_ = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
-  const size_t delta = ptr - static_cast<const char*>(bytes);
-  if (delta != size) throw std::logic_error("deserialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size));
-}
-
-// The following code is only valid in the special case of exactly reaching capacity while updating.
-// It cannot be used while merging, while reducing k, or anything else.
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::compress_while_updating(void) {
-  const uint8_t level = find_level_to_compact();
-
-  // It is important to add the new top level right here. Be aware that this operation
-  // grows the buffer and shifts the data and also the boundaries of the data and grows the
-  // levels array and increments num_levels_
-  if (level == (num_levels_ - 1)) {
-    add_empty_top_level_to_completely_full_sketch();
-  }
-
-  const uint32_t raw_beg = levels_[level];
-  const uint32_t raw_lim = levels_[level + 1];
-  // +2 is OK because we already added a new top level if necessary
-  const uint32_t pop_above = levels_[level + 2] - raw_lim;
-  const uint32_t raw_pop = raw_lim - raw_beg;
-  const bool odd_pop = kll_helper::is_odd(raw_pop);
-  const uint32_t adj_beg = odd_pop ? raw_beg + 1 : raw_beg;
-  const uint32_t adj_pop = odd_pop ? raw_pop - 1 : raw_pop;
-  const uint32_t half_adj_pop = adj_pop / 2;
-  const uint32_t destroy_beg = levels_[0];
-
-  // level zero might not be sorted, so we must sort it if we wish to compact it
-  // sort_level_zero() is not used here because of the adjustment for odd number of items
-  if ((level == 0) and !is_level_zero_sorted_) {
-    std::sort(&items_[adj_beg], &items_[adj_beg + adj_pop], C());
-  }
-  if (pop_above == 0) {
-    kll_helper::randomly_halve_up(items_, adj_beg, adj_pop);
-  } else {
-    kll_helper::randomly_halve_down(items_, adj_beg, adj_pop);
-    kll_helper::merge_sorted_arrays<T, C>(items_, adj_beg, half_adj_pop, raw_lim, pop_above, adj_beg + half_adj_pop);
-  }
-  levels_[level + 1] -= half_adj_pop; // adjust boundaries of the level above
-  if (odd_pop) {
-    levels_[level] = levels_[level + 1] - 1; // the current level now contains one item
-    if (levels_[level] != raw_beg) items_[levels_[level]] = std::move(items_[raw_beg]); // namely this leftover guy
-  } else {
-    levels_[level] = levels_[level + 1]; // the current level is now empty
-  }
-
-  // verify that we freed up half_adj_pop array slots just below the current level
-  assert (levels_[level] == (raw_beg + half_adj_pop));
-
-  // finally, we need to shift up the data in the levels below
-  // so that the freed-up space can be used by level zero
-  if (level > 0) {
-    const uint32_t amount = raw_beg - levels_[0];
-    std::move_backward(&items_[levels_[0]], &items_[levels_[0] + amount], &items_[levels_[0] + half_adj_pop + amount]);
-    for (uint8_t lvl = 0; lvl < level; lvl++) levels_[lvl] += half_adj_pop;
-  }
-  for (uint32_t i = 0; i < half_adj_pop; i++) items_[i + destroy_beg].~T();
-}
-
-template<typename T, typename C, typename S, typename A>
-uint8_t kll_sketch<T, C, S, A>::find_level_to_compact() const {
-  uint8_t level = 0;
-  while (true) {
-    assert (level < num_levels_);
-    const uint32_t pop = levels_[level + 1] - levels_[level];
-    const uint32_t cap = kll_helper::level_capacity(k_, num_levels_, level, m_);
-    if (pop >= cap) {
-      return level;
-    }
-    level++;
-  }
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::add_empty_top_level_to_completely_full_sketch() {
-  const uint32_t cur_total_cap = levels_[num_levels_];
-
-  // make sure that we are following a certain growth scheme
-  assert (levels_[0] == 0);
-  assert (items_size_ == cur_total_cap);
-
-  // note that merging MIGHT over-grow levels_, in which case we might not have to grow it here
-  const uint8_t new_levels_size = num_levels_ + 2;
-  if (levels_size_ < new_levels_size) {
-    uint32_t* new_levels = AllocU32().allocate(new_levels_size);
-    std::copy(&levels_[0], &levels_[levels_size_], new_levels);
-    AllocU32().deallocate(levels_, levels_size_);
-    levels_ = new_levels;
-    levels_size_ = new_levels_size;
-  }
-
-  const uint32_t delta_cap = kll_helper::level_capacity(k_, num_levels_ + 1, 0, m_);
-  const uint32_t new_total_cap = cur_total_cap + delta_cap;
-
-  // move (and shift) the current data into the new buffer
-  T* new_buf = A().allocate(new_total_cap);
-  kll_helper::move_construct<T>(items_, 0, cur_total_cap, new_buf, delta_cap, true);
-  A().deallocate(items_, items_size_);
-  items_ = new_buf;
-  items_size_ = new_total_cap;
-
-  // this loop includes the old "extra" index at the top
-  for (uint8_t i = 0; i <= num_levels_; i++) {
-    levels_[i] += delta_cap;
-  }
-
-  assert (levels_[num_levels_] == new_total_cap);
-
-  num_levels_++;
-  levels_[num_levels_] = new_total_cap; // initialize the new "extra" index at the top
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::sort_level_zero() {
-  if (!is_level_zero_sorted_) {
-    std::sort(&items_[levels_[0]], &items_[levels_[1]], C());
-    is_level_zero_sorted_ = true;
-  }
-}
-
-template<typename T, typename C, typename S, typename A>
-std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> kll_sketch<T, C, S, A>::get_quantile_calculator() {
-  sort_level_zero();
-  typedef typename std::allocator_traits<A>::template rebind_alloc<kll_quantile_calculator<T, C, A>> AllocCalc;
-  std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> quantile_calculator(
-    new (AllocCalc().allocate(1)) kll_quantile_calculator<T, C, A>(items_, levels_, num_levels_, n_),
-    [](kll_quantile_calculator<T, C, A>* ptr){ ptr->~kll_quantile_calculator<T, C, A>(); AllocCalc().deallocate(ptr, 1); }
-  );
-  return quantile_calculator;
-}
-
-template<typename T, typename C, typename S, typename A>
-vector_d<A> kll_sketch<T, C, S, A>::get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const {
-  if (is_empty()) return vector_d<A>();
-  kll_helper::validate_values<T, C>(split_points, size);
-  vector_d<A> buckets(size + 1, 0);
-  uint8_t level(0);
-  uint64_t weight(1);
-  while (level < num_levels_) {
-    const auto from_index = levels_[level];
-    const auto to_index = levels_[level + 1]; // exclusive
-    if ((level == 0) and !is_level_zero_sorted_) {
-      increment_buckets_unsorted_level(from_index, to_index, weight, split_points, size, buckets.data());
-    } else {
-      increment_buckets_sorted_level(from_index, to_index, weight, split_points, size, buckets.data());
-    }
-    level++;
-    weight *= 2;
-  }
-  // normalize and, if CDF, convert to cumulative
-  if (is_CDF) {
-    double subtotal = 0;
-    for (uint32_t i = 0; i <= size; i++) {
-      subtotal += buckets[i];
-      buckets[i] = subtotal / n_;
-    }
-  } else {
-    for (uint32_t i = 0; i <= size; i++) {
-      buckets[i] /= n_;
-    }
-  }
-  return buckets;
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::increment_buckets_unsorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
-    const T* split_points, uint32_t size, double* buckets) const
-{
-  for (uint32_t i = from_index; i < to_index; i++) {
-    uint32_t j;
-    for (j = 0; j < size; j++) {
-      if (C()(items_[i], split_points[j])) {
-        break;
-      }
-    }
-    buckets[j] += weight;
-  }
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
-    const T* split_points, uint32_t size, double* buckets) const
-{
-  uint32_t i = from_index;
-  uint32_t j = 0;
-  while ((i <  to_index) and (j < size)) {
-    if (C()(items_[i], split_points[j])) {
-      buckets[j] += weight; // this sample goes into this bucket
-      i++; // move on to next sample and see whether it also goes into this bucket
-    } else {
-      j++; // no more samples for this bucket
-    }
-  }
-  // now either i == to_index (we are out of samples), or
-  // j == size (we are out of buckets, but there are more samples remaining)
-  // we only need to do something in the latter case
-  if (j == size) {
-    buckets[j] += weight * (to_index - i);
-  }
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::merge_higher_levels(const kll_sketch& other, uint64_t final_n) {
-  const uint32_t tmp_num_items = get_num_retained() + other.get_num_retained_above_level_zero();
-  auto tmp_items_deleter = [tmp_num_items](T* ptr) { A().deallocate(ptr, tmp_num_items); }; // no destructor needed
-  const std::unique_ptr<T, decltype(tmp_items_deleter)> workbuf(A().allocate(tmp_num_items), tmp_items_deleter);
-  const uint8_t ub = kll_helper::ub_on_num_levels(final_n);
-  const size_t work_levels_size = ub + 2; // ub+1 does not work
-  auto tmp_levels_deleter = [work_levels_size](uint32_t* ptr) { AllocU32().deallocate(ptr, work_levels_size); };
-  const std::unique_ptr<uint32_t[], decltype(tmp_levels_deleter)> worklevels(AllocU32().allocate(work_levels_size), tmp_levels_deleter);
-  const std::unique_ptr<uint32_t[], decltype(tmp_levels_deleter)> outlevels(AllocU32().allocate(work_levels_size), tmp_levels_deleter);
-
-  const uint8_t provisional_num_levels = std::max(num_levels_, other.num_levels_);
-
-  populate_work_arrays(other, workbuf.get(), worklevels.get(), provisional_num_levels);
-
-  const kll_helper::compress_result result = kll_helper::general_compress<T, C>(k_, m_, provisional_num_levels, workbuf.get(),
-      worklevels.get(), outlevels.get(), is_level_zero_sorted_);
-
-  // ub can sometimes be much bigger
-  if (result.final_num_levels > ub) throw std::logic_error("merge error");
-
-  // now we need to transfer the results back into "this" sketch
-  if (result.final_capacity != items_size_) {
-    A().deallocate(items_, items_size_);
-    items_size_ = result.final_capacity;
-    items_ = A().allocate(items_size_);
-  }
-  const uint32_t free_space_at_bottom = result.final_capacity - result.final_num_items;
-  kll_helper::move_construct<T>(workbuf.get(), outlevels[0], outlevels[0] + result.final_num_items, items_, free_space_at_bottom, true);
-
-  if (levels_size_ < (result.final_num_levels + 1)) {
-    AllocU32().deallocate(levels_, levels_size_);
-    levels_size_ = result.final_num_levels + 1;
-    levels_ = AllocU32().allocate(levels_size_);
-  }
-  const uint32_t offset = free_space_at_bottom - outlevels[0];
-  for (uint8_t lvl = 0; lvl < levels_size_; lvl++) { // includes the "extra" index
-    levels_[lvl] = outlevels[lvl] + offset;
-  }
-  num_levels_ = result.final_num_levels;
-}
-
-// this leaves items_ uninitialized (all objects moved out and destroyed)
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
-  worklevels[0] = 0;
-
-  // the level zero data from "other" was already inserted into "this"
-  kll_helper::move_construct<T>(items_, levels_[0], levels_[1], workbuf, 0, true);
-  worklevels[1] = safe_level_size(0);
-
-  for (uint8_t lvl = 1; lvl < provisional_num_levels; lvl++) {
-    const uint32_t self_pop = safe_level_size(lvl);
-    const uint32_t other_pop = other.safe_level_size(lvl);
-    worklevels[lvl + 1] = worklevels[lvl] + self_pop + other_pop;
-
-    if ((self_pop > 0) and (other_pop == 0)) {
-      kll_helper::move_construct<T>(items_, levels_[lvl], levels_[lvl] + self_pop, workbuf, worklevels[lvl], true);
-    } else if ((self_pop == 0) and (other_pop > 0)) {
-      kll_helper::copy_construct<T>(other.items_, other.levels_[lvl], other.levels_[lvl] + other_pop, workbuf, worklevels[lvl]);
-    } else if ((self_pop > 0) and (other_pop > 0)) {
-      kll_helper::merge_sorted_arrays<T, C>(items_, levels_[lvl], self_pop, other.items_, other.levels_[lvl], other_pop, workbuf, worklevels[lvl]);
-    }
-  }
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::assert_correct_total_weight() const {
-  const uint64_t total(kll_helper::sum_the_sample_weights(num_levels_, levels_));
-  if (total != n_) {
-    throw std::logic_error("Total weight does not match N");
-  }
-}
-
-template<typename T, typename C, typename S, typename A>
-uint32_t kll_sketch<T, C, S, A>::safe_level_size(uint8_t level) const {
-  if (level >= num_levels_) return 0;
-  return levels_[level + 1] - levels_[level];
-}
-
-template<typename T, typename C, typename S, typename A>
-uint32_t kll_sketch<T, C, S, A>::get_num_retained_above_level_zero() const {
-  if (num_levels_ == 1) return 0;
-  return levels_[num_levels_] - levels_[1];
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::check_m(uint8_t m) {
-  if (m != DEFAULT_M) {
-    throw std::invalid_argument("Possible corruption: M must be " + std::to_string(DEFAULT_M)
-        + ": " + std::to_string(m));
-  }
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::check_preamble_ints(uint8_t preamble_ints, uint8_t flags_byte) {
-  const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
-  const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM));
-  if (is_empty or is_single_item) {
-    if (preamble_ints != PREAMBLE_INTS_SHORT) {
-      throw std::invalid_argument("Possible corruption: preamble ints must be "
-          + std::to_string(PREAMBLE_INTS_SHORT) + " for an empty or single item sketch: " + std::to_string(preamble_ints));
-    }
-  } else {
-    if (preamble_ints != PREAMBLE_INTS_FULL) {
-      throw std::invalid_argument("Possible corruption: preamble ints must be "
-          + std::to_string(PREAMBLE_INTS_FULL) + " for a sketch with more than one item: " + std::to_string(preamble_ints));
-    }
-  }
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::check_serial_version(uint8_t serial_version) {
-  if (serial_version != SERIAL_VERSION_1 and serial_version != SERIAL_VERSION_2) {
-    throw std::invalid_argument("Possible corruption: serial version mismatch: expected "
-        + std::to_string(SERIAL_VERSION_1) + " or " + std::to_string(SERIAL_VERSION_2)
-        + ", got " + std::to_string(serial_version));
-  }
-}
-
-template<typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::check_family_id(uint8_t family_id) {
-  if (family_id != FAMILY) {
-    throw std::invalid_argument("Possible corruption: family mismatch: expected "
-        + std::to_string(FAMILY) + ", got " + std::to_string(family_id));
-  }
-}
-
-template <typename T, typename C, typename S, typename A>
-void kll_sketch<T, C, S, A>::to_stream(std::ostream& os, bool print_levels, bool print_items) const {
-  os << "### KLL sketch summary:" << std::endl;
-  os << "   K              : " << k_ << std::endl;
-  os << "   min K          : " << min_k_ << std::endl;
-  os << "   M              : " << (unsigned int) m_ << std::endl;
-  os << "   N              : " << n_ << std::endl;
-  os << "   Epsilon        : " << std::setprecision(3) << get_normalized_rank_error(false) * 100 << "%" << std::endl;
-  os << "   Epsilon PMF    : " << get_normalized_rank_error(true) * 100 << "%" << std::endl;
-  os << "   Empty          : " << (is_empty() ? "true" : "false") << std::endl;
-  os << "   Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
-  os << "   Levels         : " << (unsigned int) num_levels_ << std::endl;
-  os << "   Sorted         : " << (is_level_zero_sorted_ ? "true" : "false") << std::endl;
-  os << "   Capacity items : " << items_size_ << std::endl;
-  os << "   Retained items : " << get_num_retained() << std::endl;
-  os << "   Storage bytes  : " << get_serialized_size_bytes() << std::endl;
-  if (!is_empty()) {
-    os << "   Min value      : " << *min_value_ << std::endl;
-    os << "   Max value      : " << *max_value_ << std::endl;
-  }
-  os << "### End sketch summary" << std::endl;
-
-  // for debugging
-  const bool with_levels(false);
-  const bool with_data(false);
-
-  if (with_levels) {
-    os << "### KLL sketch levels:" << std::endl;
-    os << "   index: nominal capacity, actual size" << std::endl;
-    for (uint8_t i = 0; i < num_levels_; i++) {
-      os << "   " << (unsigned int) i << ": " << kll_helper::level_capacity(k_, num_levels_, i, m_) << ", " << safe_level_size(i) << std::endl;
-    }
-    os << "### End sketch levels" << std::endl;
-  }
-
-  if (with_data) {
-    os << "### KLL sketch data:" << std::endl;
-    uint8_t level(0);
-    while (level < num_levels_) {
-      const uint32_t from_index = levels_[level];
-      const uint32_t to_index = levels_[level + 1]; // exclusive
-      if (from_index < to_index) {
-        os << " level " << (unsigned int) level << ":" << std::endl;
-      }
-      for (uint32_t i = from_index; i < to_index; i++) {
-        os << "   " << items_[i] << std::endl;
-      }
-      level++;
-    }
-    os << "### End sketch data" << std::endl;
-  }
-}
-
-template <typename T, typename C, typename S, typename A>
-typename kll_sketch<T, C, S, A>::const_iterator kll_sketch<T, C, S, A>::begin() const {
-  return kll_sketch<T, C, S, A>::const_iterator(items_, levels_, num_levels_);
-}
-
-template <typename T, typename C, typename S, typename A>
-typename kll_sketch<T, C, S, A>::const_iterator kll_sketch<T, C, S, A>::end() const {
-  return kll_sketch<T, C, S, A>::const_iterator(nullptr, nullptr, num_levels_);
-}
-
-// kll_sketch::const_iterator implementation
-
-template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A>::const_iterator::const_iterator(const T* items, const uint32_t* levels, const uint8_t num_levels):
-items(items), levels(levels), num_levels(num_levels), index(levels == nullptr ? 0 : levels[0]), level(levels == nullptr ? num_levels : 0), weight(1)
-{}
-
-template<typename T, typename C, typename S, typename A>
-kll_sketch<T, C, S, A>::const_iterator::const_iterator(const const_iterator& other):
-items(other.items), levels(other.levels), num_levels(other.num_levels), index(other.index), level(other.level), weight(other.weight)
-{}
-
-template<typename T, typename C, typename S, typename A>
-typename kll_sketch<T, C, S, A>::const_iterator& kll_sketch<T, C, S, A>::const_iterator::operator++() {
-  ++index;
-  if (index == levels[level + 1]) { // go to the next non-empty level
-    do {
-      ++level;
-      weight *= 2;
-    } while (level < num_levels and levels[level] == levels[level + 1]);
-  }
-  return *this;
-}
-
-template<typename T, typename C, typename S, typename A>
-typename kll_sketch<T, C, S, A>::const_iterator& kll_sketch<T, C, S, A>::const_iterator::operator++(int) {
-  const_iterator tmp(*this);
-  operator++();
-  return tmp;
-}
-
-template<typename T, typename C, typename S, typename A>
-bool kll_sketch<T, C, S, A>::const_iterator::operator==(const const_iterator& other) const {
-  if (level != other.level) return false;
-  if (level == num_levels) return true; // end
-  return index == other.index;
-}
-
-template<typename T, typename C, typename S, typename A>
-bool kll_sketch<T, C, S, A>::const_iterator::operator!=(const const_iterator& other) const {
-  return !operator==(other);
-}
-
-template<typename T, typename C, typename S, typename A>
-const std::pair<const T&, const uint64_t> kll_sketch<T, C, S, A>::const_iterator::operator*() const {
-  return std::pair<const T&, const uint64_t>(items[index], weight);
-}
-
 } /* namespace datasketches */
 
+#include "kll_sketch_impl.hpp"
+
 #endif
diff --git a/kll/include/kll_sketch.hpp b/kll/include/kll_sketch_impl.hpp
similarity index 70%
copy from kll/include/kll_sketch.hpp
copy to kll/include/kll_sketch_impl.hpp
index 7fc436f..96efb55 100644
--- a/kll/include/kll_sketch.hpp
+++ b/kll/include/kll_sketch_impl.hpp
@@ -17,343 +17,20 @@
  * under the License.
  */
 
-#ifndef KLL_SKETCH_HPP_
-#define KLL_SKETCH_HPP_
+#ifndef KLL_SKETCH_IMPL_HPP_
+#define KLL_SKETCH_IMPL_HPP_
 
-#include <memory>
-#include <limits>
 #include <iostream>
 #include <iomanip>
-#include <functional>
-#include <cstring>
-#include <vector>
 
 #if defined(_MSC_VER)
 #include <iso646.h> // for and/or keywords
 #endif // _MSC_VER
 
-#include "kll_quantile_calculator.hpp"
 #include "kll_helper.hpp"
-#include "serde.hpp"
 
 namespace datasketches {
 
-/*
- * Implementation of a very compact quantiles sketch with lazy compaction scheme
- * and nearly optimal accuracy per retained item.
- * See <a href="https://arxiv.org/abs/1603.05346v2">Optimal Quantile Approximation in Streams</a>.
- *
- * <p>This is a stochastic streaming sketch that enables near-real time analysis of the
- * approximate distribution of values from a very large stream in a single pass, requiring only
- * that the values are comparable.
- * The analysis is obtained using <i>get_quantile()</i> or <i>get_quantiles()</i> functions or the
- * inverse functions get_rank(), get_PMF() (Probability Mass Function), and get_CDF()
- * (Cumulative Distribution Function).
- *
- * <p>Given an input stream of <i>N</i> numeric values, the <i>absolute rank</i> of any specific
- * value is defined as its index <i>(0 to N-1)</i> in the hypothetical sorted stream of all
- * <i>N</i> input values.
- *
- * <p>The <i>normalized rank</i> (<i>rank</i>) of any specific value is defined as its
- * <i>absolute rank</i> divided by <i>N</i>.
- * Thus, the <i>normalized rank</i> is a value between zero and one.
- * In the documentation for this sketch <i>absolute rank</i> is never used so any
- * reference to just <i>rank</i> should be interpreted to mean <i>normalized rank</i>.
- *
- * <p>This sketch is configured with a parameter <i>k</i>, which affects the size of the sketch
- * and its estimation error.
- *
- * <p>The estimation error is commonly called <i>epsilon</i> (or <i>eps</i>) and is a fraction
- * between zero and one. Larger values of <i>k</i> result in smaller values of epsilon.
- * Epsilon is always with respect to the rank and cannot be applied to the
- * corresponding values.
- *
- * <p>The relationship between the normalized rank and the corresponding values can be viewed
- * as a two dimensional monotonic plot with the normalized rank on one axis and the
- * corresponding values on the other axis. If the y-axis is specified as the value-axis and
- * the x-axis as the normalized rank, then <i>y = get_quantile(x)</i> is a monotonically
- * increasing function.
- *
- * <p>The functions <i>get_quantile(rank)</i> and get_quantiles(...) translate ranks into
- * corresponding values. The functions <i>get_rank(value),
- * get_CDF(...) (Cumulative Distribution Function), and get_PMF(...)
- * (Probability Mass Function)</i> perform the opposite operation and translate values into ranks.
- *
- * <p>The <i>getPMF(...)</i> function has about 13 to 47% worse rank error (depending
- * on <i>k</i>) than the other queries because the mass of each "bin" of the PMF has
- * "double-sided" error from the upper and lower edges of the bin as a result of a subtraction,
- * as the errors from the two edges can sometimes add.
- *
- * <p>The default <i>k</i> of 200 yields a "single-sided" epsilon of about 1.33% and a
- * "double-sided" (PMF) epsilon of about 1.65%.
- *
- * <p>A <i>get_quantile(rank)</i> query has the following guarantees:
- * <ul>
- * <li>Let <i>v = get_quantile(r)</i> where <i>r</i> is the rank between zero and one.</li>
- * <li>The value <i>v</i> will be a value from the input stream.</li>
- * <li>Let <i>trueRank</i> be the true rank of <i>v</i> derived from the hypothetical sorted
- * stream of all <i>N</i> values.</li>
- * <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
- * <li>Then <i>r - eps &le; trueRank &le; r + eps</i> with a confidence of 99%. Note that the
- * error is on the rank, not the value.</li>
- * </ul>
- *
- * <p>A <i>get_rank(value)</i> query has the following guarantees:
- * <ul>
- * <li>Let <i>r = get_rank(v)</i> where <i>v</i> is a value between the min and max values of
- * the input stream.</li>
- * <li>Let <i>true_rank</i> be the true rank of <i>v</i> derived from the hypothetical sorted
- * stream of all <i>N</i> values.</li>
- * <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
- * <li>Then <i>r - eps &le; trueRank &le; r + eps</i> with a confidence of 99%.</li>
- * </ul>
- *
- * <p>A <i>get_PMF()</i> query has the following guarantees:
- * <ul>
- * <li>Let <i>{r1, r2, ..., r(m+1)} = get_PMF(v1, v2, ..., vm)</i> where <i>v1, v2</i> are values
- * between the min and max values of the input stream.
- * <li>Let <i>mass<sub>i</sub> = estimated mass between v<sub>i</sub> and v<sub>i+1</sub></i>.</li>
- * <li>Let <i>trueMass</i> be the true mass between the values of <i>v<sub>i</sub>,
- * v<sub>i+1</sub></i> derived from the hypothetical sorted stream of all <i>N</i> values.</li>
- * <li>Let <i>eps = get_normalized_rank_error(true)</i>.</li>
- * <li>then <i>mass - eps &le; trueMass &le; mass + eps</i> with a confidence of 99%.</li>
- * <li>r(m+1) includes the mass of all points larger than vm.</li>
- * </ul>
- *
- * <p>A <i>get_CDF(...)</i> query has the following guarantees;
- * <ul>
- * <li>Let <i>{r1, r2, ..., r(m+1)} = get_CDF(v1, v2, ..., vm)</i> where <i>v1, v2</i> are values
- * between the min and max values of the input stream.
- * <li>Let <i>mass<sub>i</sub> = r<sub>i+1</sub> - r<sub>i</sub></i>.</li>
- * <li>Let <i>trueMass</i> be the true mass between the true ranks of <i>v<sub>i</sub>,
- * v<sub>i+1</sub></i> derived from the hypothetical sorted stream of all <i>N</i> values.</li>
- * <li>Let <i>eps = get_normalized_rank_error(true)</i>.</li>
- * <li>then <i>mass - eps &le; trueMass &le; mass + eps</i> with a confidence of 99%.</li>
- * <li>1 - r(m+1) includes the mass of all points larger than vm.</li>
- * </ul>
- *
- * <p>From the above, it might seem like we could make some estimates to bound the
- * <em>value</em> returned from a call to <em>get_quantile()</em>. The sketch, however, does not
- * let us derive error bounds or confidences around values. Because errors are independent, we
- * can approximately bracket a value as shown below, but there are no error estimates available.
- * Additionally, the interval may be quite large for certain distributions.
- * <ul>
- * <li>Let <i>v = get_quantile(r)</i>, the estimated quantile value of rank <i>r</i>.</li>
- * <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
- * <li>Let <i>v<sub>lo</sub></i> = estimated quantile value of rank <i>(r - eps)</i>.</li>
- * <li>Let <i>v<sub>hi</sub></i> = estimated quantile value of rank <i>(r + eps)</i>.</li>
- * <li>Then <i>v<sub>lo</sub> &le; v &le; v<sub>hi</sub></i>, with 99% confidence.</li>
- * </ul>
- *
- * author Kevin Lang
- * author Alexander Saydakov
- * author Lee Rhodes
- */
-
-template<typename A> using AllocU8 = typename std::allocator_traits<A>::template rebind_alloc<uint8_t>;
-template<typename A> using vector_u8 = std::vector<uint8_t, AllocU8<A>>;
-template<typename A> using AllocD = typename std::allocator_traits<A>::template rebind_alloc<double>;
-template<typename A> using vector_d = std::vector<double, AllocD<A>>;
-
-template <typename T, typename C = std::less<T>, typename S = serde<T>, typename A = std::allocator<T>>
-class kll_sketch {
-  typedef typename std::allocator_traits<A>::template rebind_alloc<uint32_t> AllocU32;
-
-  public:
-    static const uint8_t DEFAULT_M = 8;
-    static const uint16_t DEFAULT_K = 200;
-    static const uint16_t MIN_K = DEFAULT_M;
-    static const uint16_t MAX_K = (1 << 16) - 1;
-
-    explicit kll_sketch(uint16_t k = DEFAULT_K);
-    kll_sketch(const kll_sketch& other);
-    kll_sketch(kll_sketch&& other) noexcept;
-    ~kll_sketch();
-    kll_sketch& operator=(const kll_sketch& other);
-    kll_sketch& operator=(kll_sketch&& other);
-    void update(const T& value);
-    void update(T&& value);
-    void merge(const kll_sketch& other);
-    bool is_empty() const;
-    uint64_t get_n() const;
-    uint32_t get_num_retained() const;
-    bool is_estimation_mode() const;
-    T get_min_value() const;
-    T get_max_value() const;
-    T get_quantile(double fraction) const;
-    std::vector<T, A> get_quantiles(const double* fractions, uint32_t size) const;
-    double get_rank(const T& value) const;
-    vector_d<A> get_PMF(const T* split_points, uint32_t size) const;
-    vector_d<A> get_CDF(const T* split_points, uint32_t size) const;
-    double get_normalized_rank_error(bool pmf) const;
-
-    // implementation for fixed-size arithmetic types (integral and floating point)
-    template<typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
-    size_t get_serialized_size_bytes() const {
-      if (is_empty()) { return EMPTY_SIZE_BYTES; }
-      if (num_levels_ == 1 and get_num_retained() == 1) {
-        return DATA_START_SINGLE_ITEM + sizeof(TT);
-      }
-      // the last integer in the levels_ array is not serialized because it can be derived
-      return DATA_START + num_levels_ * sizeof(uint32_t) + (get_num_retained() + 2) * sizeof(TT);
-    }
-
-    // implementation for all other types
-    template<typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
-    size_t get_serialized_size_bytes() const {
-      if (is_empty()) { return EMPTY_SIZE_BYTES; }
-      if (num_levels_ == 1 and get_num_retained() == 1) {
-        return DATA_START_SINGLE_ITEM + S().size_of_item(items_[levels_[0]]);
-      }
-      // the last integer in the levels_ array is not serialized because it can be derived
-      size_t size = DATA_START + num_levels_ * sizeof(uint32_t);
-      size += S().size_of_item(*min_value_);
-      size += S().size_of_item(*max_value_);
-      for (auto& it: *this) size += S().size_of_item(it.first);
-      return size;
-    }
-
-    // this may need to be specialized to return correct size if sizeof(T) does not match the actual serialized size of an item
-    // this method is for the user's convenience to predict the sketch size before serialization
-    // and is not used in the serialization and deserialization code
-    // predicting the size before serialization may not make sense if the item type is not of a fixed size (like string)
-    static size_t get_max_serialized_size_bytes(uint16_t k, uint64_t n);
-
-    void serialize(std::ostream& os) const;
-    typedef vector_u8<A> vector_bytes; // alias for users
-    vector_bytes serialize(unsigned header_size_bytes = 0) const;
-    static kll_sketch<T, C, S, A> deserialize(std::istream& is);
-    static kll_sketch<T, C, S, A> deserialize(const void* bytes, size_t size);
-
-    /*
-     * Gets the normalized rank error given k and pmf.
-     * k - the configuration parameter
-     * pmf - if true, returns the "double-sided" normalized rank error for the get_PMF() function.
-     * Otherwise, it is the "single-sided" normalized rank error for all the other queries.
-     * Constants were derived as the best fit to 99 percentile empirically measured max error in thousands of trials
-     */
-    static double get_normalized_rank_error(uint16_t k, bool pmf);
-
-    void to_stream(std::ostream& os, bool print_levels = false, bool print_items = false) const;
-
-    class const_iterator;
-    const_iterator begin() const;
-    const_iterator end() const;
-
-    #ifdef KLL_VALIDATION
-    uint8_t get_num_levels() { return num_levels_; }
-    uint32_t* get_levels() { return levels_; }
-    T* get_items() { return items_; }
-#endif
-
-  private:
-    /* Serialized sketch layout:
-     *  Adr:
-     *      ||    7    |   6   |    5   |    4   |    3   |    2    |    1   |      0       |
-     *  0   || unused  |   M   |--------K--------|  Flags |  FamID  | SerVer | PreambleInts |
-     *      ||   15    |   14  |   13   |   12   |   11   |   10    |    9   |      8       |
-     *  1   ||-----------------------------------N------------------------------------------|
-     *      ||   23    |   22  |   21   |   20   |   19   |    18   |   17   |      16      |
-     *  2   ||---------------data----------------|-unused-|numLevels|-------min K-----------|
-     */
-
-    static const size_t EMPTY_SIZE_BYTES = 8;
-    static const size_t DATA_START_SINGLE_ITEM = 8;
-    static const size_t DATA_START = 20;
-
-    static const uint8_t SERIAL_VERSION_1 = 1;
-    static const uint8_t SERIAL_VERSION_2 = 2;
-    static const uint8_t FAMILY = 15;
-
-    enum flags { IS_EMPTY, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM };
-
-    static const uint8_t PREAMBLE_INTS_SHORT = 2; // for empty and single item
-    static const uint8_t PREAMBLE_INTS_FULL = 5;
-
-    uint16_t k_;
-    uint8_t m_; // minimum buffer "width"
-    uint16_t min_k_; // for error estimation after merging with different k
-    uint64_t n_;
-    uint8_t num_levels_;
-    uint32_t* levels_;
-    uint8_t levels_size_;
-    T* items_;
-    uint32_t items_size_;
-    T* min_value_;
-    T* max_value_;
-    bool is_level_zero_sorted_;
-
-    // for deserialization
-    // the common part of the preamble was read and compatibility checks were done
-    kll_sketch(uint16_t k, uint8_t flags_byte, std::istream& is);
-
-    // for deserialization
-    // the common part of the preamble was read and compatibility checks were done
-    kll_sketch(uint16_t k, uint8_t flags_byte, const void* bytes, size_t size);
-
-    // common update code
-    inline uint32_t internal_update(const T& value);
-
-    // The following code is only valid in the special case of exactly reaching capacity while updating.
-    // It cannot be used while merging, while reducing k, or anything else.
-    void compress_while_updating(void);
-
-    uint8_t find_level_to_compact() const;
-    void add_empty_top_level_to_completely_full_sketch();
-    void sort_level_zero();
-    std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> get_quantile_calculator();
-    vector_d<A> get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const;
-    void increment_buckets_unsorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
-        const T* split_points, uint32_t size, double* buckets) const;
-    void increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
-        const T* split_points, uint32_t size, double* buckets) const;
-    void merge_higher_levels(const kll_sketch& other, uint64_t final_n);
-    void populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
-    void assert_correct_total_weight() const;
-    uint32_t safe_level_size(uint8_t level) const;
-    uint32_t get_num_retained_above_level_zero() const;
-
-    static void check_m(uint8_t m);
-    static void check_preamble_ints(uint8_t preamble_ints, uint8_t flags_byte);
-    static void check_serial_version(uint8_t serial_version);
-    static void check_family_id(uint8_t family_id);
-
-    // implementation for floating point types
-    template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
-    static TT get_invalid_value() {
-      return std::numeric_limits<TT>::quiet_NaN();
-    }
-
-    // implementation for all other types
-    template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
-    static TT get_invalid_value() {
-      throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values");
-    }
-
-};
-
-template<typename T, typename C, typename S, typename A>
-class kll_sketch<T, C, S, A>::const_iterator: public std::iterator<std::input_iterator_tag, T> {
-public:
-  friend class kll_sketch<T, C, S, A>;
-  const_iterator(const const_iterator& other);
-  const_iterator& operator++();
-  const_iterator& operator++(int);
-  bool operator==(const const_iterator& other) const;
-  bool operator!=(const const_iterator& other) const;
-  const std::pair<const T&, const uint64_t> operator*() const;
-private:
-  const T* items;
-  const uint32_t* levels;
-  const uint8_t num_levels;
-  uint32_t index;
-  uint8_t level;
-  uint64_t weight;
-  const_iterator(const T* items, const uint32_t* levels, const uint8_t num_levels);
-};
-
-// kll_sketch implementation
-
 template<typename T, typename C, typename S, typename A>
 kll_sketch<T, C, S, A>::kll_sketch(uint16_t k) {
   if (k < MIN_K or k > MAX_K) {
diff --git a/kll/test/kll_sketch_validation.cpp b/kll/test/kll_sketch_validation.cpp
index 5333936..0faa3fe 100644
--- a/kll/test/kll_sketch_validation.cpp
+++ b/kll/test/kll_sketch_validation.cpp
@@ -179,8 +179,8 @@ class kll_sketch_validation: public CppUnit::TestFixture {
       unsigned num_samples = sketch.get_num_retained();
       int64_t hashed_samples = simple_hash_of_sub_array(sketch.get_items(), sketch.get_levels()[0], num_samples);
       std::cout << i;
-      assert (correct_results[7 * i + 4] == num_levels);
-      assert (correct_results[7 * i + 5] == num_samples);
+      CPPUNIT_ASSERT(correct_results[7 * i + 4] == num_levels);
+      CPPUNIT_ASSERT(correct_results[7 * i + 5] == num_samples);
       if (correct_results[7 * i + 6] == hashed_samples) {
         std::cout << " pass" << std::endl;
       } else {
@@ -201,7 +201,7 @@ class kll_sketch_validation: public CppUnit::TestFixture {
       cur &= mask;
       arr[i] = cur;
     }
-    return std::move(arr);
+    return arr;
   }
 
   static int64_t simple_hash_of_sub_array(const float* arr, unsigned start, unsigned length) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@datasketches.apache.org
For additional commands, e-mail: commits-help@datasketches.apache.org