You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/07/28 08:00:23 UTC

[impala] 01/02: IMPALA-9882: Import KLL functionality from Apache DataSketches

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a8fd3213f6e54d73658b66cddcc5b0c955dda283
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Wed Jun 24 10:46:13 2020 +0200

    IMPALA-9882: Import KLL functionality from Apache DataSketches
    
    First, I updated our existing snapshot of DataSketches to the
    following commit:
    c67d92faad3827932ca3b5d864222e64977f2c20
    "Merge pull request #166 from gaborkaszab/const_cast"
    This affects files originated from kll/ and common/ directories of
    the DataSketches repo.
    
    Then I copied all the files needed for KLL into our snapshot
    directory.
    
    You can find the original Apache DataSketches files here:
    https://github.com/apache/incubator-datasketches-cpp
    
    This new snapshot however, broke the interface we used for
    serializing hll_union objects with dropping serialize_compact(). As a
    solution I had to make changes to the serialization and merging
    phases of the union operator by not serializing hll_union itself but
    the underlying hll_sketch instead.
    
    Change-Id: I848488d5145c808109bd50aecfbf3ef83f981943
    Reviewed-on: http://gerrit.cloudera.org:8080/16196
    Reviewed-by: Gabor Kaszab <ga...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/CMakeLists.txt                        |    2 +-
 be/src/exprs/aggregate-functions-ir.cc             |   39 +-
 be/src/exprs/datasketches-test.cc                  |   43 +-
 .../datasketches/AuxHashMap-internal.hpp           |   12 +-
 be/src/thirdparty/datasketches/CommonUtil.hpp      |   71 --
 .../CompositeInterpolationXTable-internal.hpp      |    6 +-
 .../datasketches/CompositeInterpolationXTable.hpp  |    6 +-
 .../datasketches/CouponHashSet-internal.hpp        |   19 +-
 .../datasketches/CouponList-internal.hpp           |   18 +-
 .../thirdparty/datasketches/Hll4Array-internal.hpp |    2 +-
 .../thirdparty/datasketches/HllArray-internal.hpp  |   24 +-
 .../thirdparty/datasketches/HllSketch-internal.hpp |   50 +-
 .../datasketches/HllSketchImplFactory.hpp          |    1 -
 .../thirdparty/datasketches/HllUnion-internal.hpp  |   48 +-
 be/src/thirdparty/datasketches/HllUtil.hpp         |   54 +-
 be/src/thirdparty/datasketches/MurmurHash3.h       |   28 +-
 be/src/thirdparty/datasketches/README.md           |   19 +-
 .../datasketches/bounds_binomial_proportions.hpp   |  291 +++++
 ...siteInterpolationXTable.hpp => common_defs.hpp} |   22 +-
 be/src/thirdparty/datasketches/count_zeros.hpp     |  114 ++
 be/src/thirdparty/datasketches/hll.hpp             |   87 +-
 be/src/thirdparty/datasketches/kll_helper.hpp      |  150 +++
 be/src/thirdparty/datasketches/kll_helper_impl.hpp |  319 ++++++
 .../datasketches/kll_quantile_calculator.hpp       |   60 ++
 .../datasketches/kll_quantile_calculator_impl.hpp  |  190 ++++
 be/src/thirdparty/datasketches/kll_sketch.hpp      |  559 ++++++++++
 be/src/thirdparty/datasketches/kll_sketch_impl.hpp | 1130 ++++++++++++++++++++
 .../thirdparty/datasketches/memory_operations.hpp  |   57 +
 be/src/thirdparty/datasketches/serde.hpp           |  196 ++++
 29 files changed, 3256 insertions(+), 361 deletions(-)

diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index 7af6145..649be16 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -80,7 +80,7 @@ add_library(ExprsTests STATIC
 )
 add_dependencies(ExprsTests gen-deps)
 
-ADD_UNIFIED_BE_LSAN_TEST(datasketches-test "TestDataSketchesHll.*")
+ADD_UNIFIED_BE_LSAN_TEST(datasketches-test "TestDataSketchesHll.*:TestDataSketchesKll.*")
 ADD_UNIFIED_BE_LSAN_TEST(expr-test "Instantiations/ExprTest.*")
 # Exception to unified be tests: custom main initiailizes LLVM
 ADD_BE_LSAN_TEST(expr-codegen-test)
diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 5b87d0b..09629b8 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -1612,21 +1612,31 @@ BigIntVal AggregateFunctions::HllFinalize(FunctionContext* ctx, const StringVal&
   return estimate;
 }
 
-/// Auxiliary function that receives an input type that has a serialize_compact()
-/// function (e.g. hll_sketch or hll_union) and returns the serialized version of it
-/// wrapped into a StringVal.
-/// Introducing this variable in the .cc to avoid including the whole DataSketches HLL
+/// Auxiliary function that receives a hll_sketch and returns the serialized version of
+/// it wrapped into a StringVal.
+/// Introducing this function in the .cc to avoid including the whole DataSketches HLL
 /// functionality into the header.
-template <typename T>
-StringVal SerializeCompactDsHll(FunctionContext* ctx, const T& input) {
+StringVal SerializeCompactDsHllSketch(FunctionContext* ctx,
+    const datasketches::hll_sketch& sketch) {
   std::stringstream serialized_input;
-  input.serialize_compact(serialized_input);
+  sketch.serialize_compact(serialized_input);
   std::string serialized_input_str = serialized_input.str();
   StringVal dst(ctx, serialized_input_str.size());
   memcpy(dst.ptr, serialized_input_str.c_str(), serialized_input_str.size());
   return dst;
 }
 
+/// Auxiliary function that receives a hll_union, gets the underlying HLL sketch from the
+/// union object and returns the serialized, compacted HLL sketch wrapped into StringVal.
+/// Introducing this function in the .cc to avoid including the whole DataSketches HLL
+/// functionality into the header.
+StringVal SerializeDsHllUnion(FunctionContext* ctx,
+    const datasketches::hll_union& ds_union) {
+  std::stringstream serialized_input;
+  datasketches::hll_sketch sketch = ds_union.get_result(DS_HLL_TYPE);
+  return SerializeCompactDsHllSketch(ctx, sketch);
+}
+
 void AggregateFunctions::DsHllInit(FunctionContext* ctx, StringVal* dst) {
   AllocBuffer(ctx, dst, sizeof(datasketches::hll_sketch));
   if (UNLIKELY(dst->is_null)) {
@@ -1670,7 +1680,7 @@ StringVal AggregateFunctions::DsHllSerialize(FunctionContext* ctx,
   DCHECK_EQ(src.len, sizeof(datasketches::hll_sketch));
   datasketches::hll_sketch* sketch_ptr =
       reinterpret_cast<datasketches::hll_sketch*>(src.ptr);
-  StringVal dst = SerializeCompactDsHll(ctx, *sketch_ptr);
+  StringVal dst = SerializeCompactDsHllSketch(ctx, *sketch_ptr);
   ctx->Free(src.ptr);
   return dst;
 }
@@ -1711,7 +1721,7 @@ StringVal AggregateFunctions::DsHllFinalizeSketch(FunctionContext* ctx,
       reinterpret_cast<datasketches::hll_sketch*>(src.ptr);
   StringVal result_str = StringVal::null();
   if (sketch_ptr->get_estimate() > 0.0) {
-    result_str = SerializeCompactDsHll(ctx, *sketch_ptr);
+    result_str = SerializeCompactDsHllSketch(ctx, *sketch_ptr);
   }
   ctx->Free(src.ptr);
   return result_str;
@@ -1751,7 +1761,7 @@ StringVal AggregateFunctions::DsHllUnionSerialize(FunctionContext* ctx,
   DCHECK_EQ(src.len, sizeof(datasketches::hll_union));
   datasketches::hll_union* union_ptr =
       reinterpret_cast<datasketches::hll_union*>(src.ptr);
-  StringVal dst = SerializeCompactDsHll(ctx, *union_ptr);
+  StringVal dst = SerializeDsHllUnion(ctx, *union_ptr);
   ctx->Free(src.ptr);
   return dst;
 }
@@ -1762,13 +1772,14 @@ void AggregateFunctions::DsHllUnionMerge(
   DCHECK(!dst->is_null);
   DCHECK_EQ(dst->len, sizeof(datasketches::hll_union));
 
-  datasketches::hll_union src_union =
-      datasketches::hll_union::deserialize(reinterpret_cast<char*>(src.ptr), src.len);
+  // Note, 'src' is a serialized hll_sketch and not a serialized hll_union.
+  datasketches::hll_sketch src_sketch =
+      datasketches::hll_sketch::deserialize(reinterpret_cast<char*>(src.ptr), src.len);
 
   datasketches::hll_union* dst_union_ptr =
       reinterpret_cast<datasketches::hll_union*>(dst->ptr);
 
-  dst_union_ptr->update(src_union.get_result(DS_HLL_TYPE));
+  dst_union_ptr->update(src_sketch);
 }
 
 StringVal AggregateFunctions::DsHllUnionFinalize(FunctionContext* ctx,
@@ -1782,7 +1793,7 @@ StringVal AggregateFunctions::DsHllUnionFinalize(FunctionContext* ctx,
     ctx->Free(src.ptr);
     return StringVal::null();
   }
-  StringVal result = SerializeCompactDsHll(ctx, sketch);
+  StringVal result = SerializeCompactDsHllSketch(ctx, sketch);
   ctx->Free(src.ptr);
   return result;
 }
diff --git a/be/src/exprs/datasketches-test.cc b/be/src/exprs/datasketches-test.cc
index afd2505..a4175f7 100644
--- a/be/src/exprs/datasketches-test.cc
+++ b/be/src/exprs/datasketches-test.cc
@@ -16,8 +16,10 @@
 // under the License.
 
 #include "thirdparty/datasketches/hll.hpp"
+#include "thirdparty/datasketches/kll_sketch.hpp"
 
 #include <sstream>
+#include <stdlib.h>
 
 #include "testutil/gtest-util.h"
 
@@ -29,8 +31,7 @@ namespace impala {
 // The below code is mostly a copy-paste from the example code found on the official
 // DataSketches web page: https://datasketches.apache.org/docs/HLL/HllCppExample.html
 // The purpose is to create 2 HLL sketches that have overlap in their data, serialize
-// these sketches into files, deserialize them and give a cardinality estimate combining
-// the 2 sketches.
+// them, deserialize them and give a cardinality estimate combining the 2 sketches.
 TEST(TestDataSketchesHll, UseDataSketchesInterface) {
   const int lg_k = 11;
   const auto type = datasketches::HLL_4;
@@ -69,4 +70,42 @@ TEST(TestDataSketchesHll, UseDataSketchesInterface) {
   }
 }
 
+
+// This test is meant to cover that the KLL algorithm from the DataSketches library can
+// be imported into Impala, builds without errors and the basic functionality is
+// available to use.
+// The below code is mostly a copy-paste from the example code found on the official
+// DataSketches web page:
+// https://datasketches.apache.org/docs/Quantiles/QuantilesCppExample.html
+// The purpose is to create 2 KLL sketches that have overlap in their data, serialize
+// them, deserialize them and get an estimate for quantiles after combining the 2
+// sketches.
+TEST(TestDataSketchesKll, UseDataSketchesInterface) {
+  std::stringstream sketch_stream1;
+  std::stringstream sketch_stream2;
+  {
+    datasketches::kll_sketch<float> sketch1;
+    for (int i = 0; i < 100000; ++i) sketch1.update(i);
+    sketch1.serialize(sketch_stream1);
+
+    datasketches::kll_sketch<float> sketch2;
+    for (int i = 30000; i < 130000; ++i) sketch2.update(i);
+    sketch2.serialize(sketch_stream2);
+  }
+
+  {
+    auto sketch1 = datasketches::kll_sketch<float>::deserialize(sketch_stream1);
+    auto sketch2 = datasketches::kll_sketch<float>::deserialize(sketch_stream2);
+    sketch1.merge(sketch2);
+
+    const double fractions[3] {0, 0.5, 1};
+    auto quantiles = sketch1.get_quantiles(fractions, 3);
+    EXPECT_EQ(0, quantiles[0]);
+    // The median is an approximate. Here we check that it is in 2% error range.
+    int exact_median = 65000;
+    EXPECT_LE(abs(quantiles[1] - exact_median), exact_median * 0.02);
+    EXPECT_EQ(129999, quantiles[2]);
+  }
+}
+
 }
diff --git a/be/src/thirdparty/datasketches/AuxHashMap-internal.hpp b/be/src/thirdparty/datasketches/AuxHashMap-internal.hpp
index 0d7db7a..9a8e135 100644
--- a/be/src/thirdparty/datasketches/AuxHashMap-internal.hpp
+++ b/be/src/thirdparty/datasketches/AuxHashMap-internal.hpp
@@ -75,7 +75,7 @@ AuxHashMap<A>* AuxHashMap<A>::deserialize(const void* bytes, size_t len,
   const int* auxPtr = static_cast<const int*>(bytes);
   if (srcCompact) {
     if (len < auxCount * sizeof(int)) {
-      throw std::invalid_argument("Input array too small to hold AuxHashMap image");
+      throw std::out_of_range("Input array too small to hold AuxHashMap image");
     }
     auxHashMap = new (ahmAlloc().allocate(1)) AuxHashMap<A>(lgArrInts, lgConfigK);
     for (int i = 0; i < auxCount; ++i) {
@@ -87,7 +87,7 @@ AuxHashMap<A>* AuxHashMap<A>::deserialize(const void* bytes, size_t len,
   } else { // updatable
     int itemsToRead = 1 << lgAuxArrInts;
     if (len < itemsToRead * sizeof(int)) {
-      throw std::invalid_argument("Input array too small to hold AuxHashMap image");
+      throw std::out_of_range("Input array too small to hold AuxHashMap image");
     }
     auxHashMap = new (ahmAlloc().allocate(1)) AuxHashMap<A>(lgArrInts, lgConfigK);
     for (int i = 0; i < itemsToRead; ++i) {
@@ -118,8 +118,10 @@ AuxHashMap<A>* AuxHashMap<A>::deserialize(std::istream& is, const int lgConfigK,
     lgArrInts = lgAuxArrInts;
   }
 
-  // TODO: truncated stream will throw exception without freeing memory  
   AuxHashMap<A>* auxHashMap = new (ahmAlloc().allocate(1)) AuxHashMap<A>(lgArrInts, lgConfigK);
+  typedef std::unique_ptr<AuxHashMap<A>, std::function<void(AuxHashMap<A>*)>> aux_hash_map_ptr;
+  aux_hash_map_ptr aux_ptr(auxHashMap, auxHashMap->make_deleter());
+
   int configKmask = (1 << lgConfigK) - 1;
 
   if (srcCompact) {
@@ -147,7 +149,7 @@ AuxHashMap<A>* AuxHashMap<A>::deserialize(std::istream& is, const int lgConfigK,
     throw std::invalid_argument("Deserialized AuxHashMap has wrong number of entries");
   }
 
-  return auxHashMap;
+  return aux_ptr.release();
 }
 
 template<typename A>
@@ -159,7 +161,7 @@ AuxHashMap<A>::~AuxHashMap<A>() {
 
 template<typename A>
 std::function<void(AuxHashMap<A>*)> AuxHashMap<A>::make_deleter() {
-  return [](AuxHashMap<A>* ptr) {    
+  return [](AuxHashMap<A>* ptr) {
     ptr->~AuxHashMap();
     ahmAlloc().deallocate(ptr, 1);
   };
diff --git a/be/src/thirdparty/datasketches/CommonUtil.hpp b/be/src/thirdparty/datasketches/CommonUtil.hpp
deleted file mode 100644
index 55e7dfc..0000000
--- a/be/src/thirdparty/datasketches/CommonUtil.hpp
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// author Kevin Lang, Yahoo Research
-// author Jon Malkin, Yahoo Research
-
-#ifndef _COMMONUTIL_HPP_
-#define _COMMONUTIL_HPP_
-
-#include <cstdint>
-
-namespace datasketches {
-
-class CommonUtil final {
-  public:
-    static unsigned int getNumberOfLeadingZeros(uint64_t x);
-};
-
-#define FCLZ_MASK_56 ((uint64_t) 0x00ffffffffffffff)
-#define FCLZ_MASK_48 ((uint64_t) 0x0000ffffffffffff)
-#define FCLZ_MASK_40 ((uint64_t) 0x000000ffffffffff)
-#define FCLZ_MASK_32 ((uint64_t) 0x00000000ffffffff)
-#define FCLZ_MASK_24 ((uint64_t) 0x0000000000ffffff)
-#define FCLZ_MASK_16 ((uint64_t) 0x000000000000ffff)
-#define FCLZ_MASK_08 ((uint64_t) 0x00000000000000ff)
-
-inline unsigned int CommonUtil::getNumberOfLeadingZeros(const uint64_t x) {
-  if (x == 0)
-    return 64;
-
-  static const uint8_t clzByteCount[256] = {8,7,6,6,5,5,5,5,4,4,4,4,4,4,4,4,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, [...]
-
-  if (x > FCLZ_MASK_56)
-    return ((unsigned int) ( 0 + clzByteCount[(x >> 56) & FCLZ_MASK_08]));
-  if (x > FCLZ_MASK_48)
-    return ((unsigned int) ( 8 + clzByteCount[(x >> 48) & FCLZ_MASK_08]));
-  if (x > FCLZ_MASK_40)
-    return ((unsigned int) (16 + clzByteCount[(x >> 40) & FCLZ_MASK_08]));
-  if (x > FCLZ_MASK_32)
-    return ((unsigned int) (24 + clzByteCount[(x >> 32) & FCLZ_MASK_08]));
-  if (x > FCLZ_MASK_24)
-    return ((unsigned int) (32 + clzByteCount[(x >> 24) & FCLZ_MASK_08]));
-  if (x > FCLZ_MASK_16)
-    return ((unsigned int) (40 + clzByteCount[(x >> 16) & FCLZ_MASK_08]));
-  if (x > FCLZ_MASK_08)
-    return ((unsigned int) (48 + clzByteCount[(x >>  8) & FCLZ_MASK_08]));
-  if (1)
-    return ((unsigned int) (56 + clzByteCount[(x >>  0) & FCLZ_MASK_08]));
-
-}
-
-
-}
-
-#endif // _COMMONUTIL_HPP_
\ No newline at end of file
diff --git a/be/src/thirdparty/datasketches/CompositeInterpolationXTable-internal.hpp b/be/src/thirdparty/datasketches/CompositeInterpolationXTable-internal.hpp
index a3d0302..10aa047 100644
--- a/be/src/thirdparty/datasketches/CompositeInterpolationXTable-internal.hpp
+++ b/be/src/thirdparty/datasketches/CompositeInterpolationXTable-internal.hpp
@@ -36,7 +36,7 @@ static const int yStrides[] =
   {1, 2, 3, 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480, 40960, 81920};
 
 template<typename A>
-const int CompositeInterpolationXTable<A>::get_y_stride(const int logK) {
+int CompositeInterpolationXTable<A>::get_y_stride(const int logK) {
   if (logK < HllUtil<A>::MIN_LOG_K || logK > HllUtil<A>::MAX_LOG_K) {
     throw std::invalid_argument("logK must be in range [" + std::to_string(HllUtil<A>::MIN_LOG_K)
                                 + ", " + std::to_string(HllUtil<A>::MAX_LOG_K) + "]. Found: "
@@ -46,7 +46,7 @@ const int CompositeInterpolationXTable<A>::get_y_stride(const int logK) {
 }
 
 template<typename A>
-const int CompositeInterpolationXTable<A>::get_x_arr_length(const int logK) {
+int CompositeInterpolationXTable<A>::get_x_arr_length() {
   return numXArrValues;
 }
 
@@ -797,7 +797,7 @@ static const double xArr[18][numXArrValues] = {
 };
 
 template<typename A>
-const double* const CompositeInterpolationXTable<A>::get_x_arr(const int logK) {
+const double* CompositeInterpolationXTable<A>::get_x_arr(const int logK) {
   if (logK < HllUtil<A>::MIN_LOG_K || logK > HllUtil<A>::MAX_LOG_K) {
     throw std::invalid_argument("logK must be in range [" + std::to_string(HllUtil<A>::MIN_LOG_K)
                                 + ", " + std::to_string(HllUtil<A>::MAX_LOG_K) + "]. Found: "
diff --git a/be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp b/be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp
index 67563c5..8baecbe 100644
--- a/be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp
+++ b/be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp
@@ -27,10 +27,10 @@ namespace datasketches {
 template<typename A = std::allocator<char>>
 class CompositeInterpolationXTable {
   public:
-    static const int get_y_stride(int logK);
+    static int get_y_stride(int logK);
 
-    static const double* const get_x_arr(int logK);
-    static const int get_x_arr_length(int logK);
+    static const double* get_x_arr(int logK);
+    static int get_x_arr_length();
 };
 
 }
diff --git a/be/src/thirdparty/datasketches/CouponHashSet-internal.hpp b/be/src/thirdparty/datasketches/CouponHashSet-internal.hpp
index 84c8199..35facfe 100644
--- a/be/src/thirdparty/datasketches/CouponHashSet-internal.hpp
+++ b/be/src/thirdparty/datasketches/CouponHashSet-internal.hpp
@@ -63,7 +63,7 @@ std::function<void(HllSketchImpl<A>*)> CouponHashSet<A>::get_deleter() const {
 template<typename A>
 CouponHashSet<A>* CouponHashSet<A>::newSet(const void* bytes, size_t len) {
   if (len < HllUtil<A>::HASH_SET_INT_ARR_START) { // hard-coded 
-    throw std::invalid_argument("Input data length insufficient to hold CouponHashSet");
+    throw std::out_of_range("Input data length insufficient to hold CouponHashSet");
   }
 
   const uint8_t* data = static_cast<const uint8_t*>(bytes);
@@ -102,12 +102,11 @@ CouponHashSet<A>* CouponHashSet<A>::newSet(const void* bytes, size_t len) {
   const int couponsInArray = (compactFlag ? couponCount : (1 << lgArrInts));
   const size_t expectedLength = HllUtil<A>::HASH_SET_INT_ARR_START + (couponsInArray * sizeof(int));
   if (len < expectedLength) {
-    throw std::invalid_argument("Byte array too short for sketch. Expected " + std::to_string(expectedLength)
+    throw std::out_of_range("Byte array too short for sketch. Expected " + std::to_string(expectedLength)
                                 + ", found: " + std::to_string(len));
   }
 
   CouponHashSet<A>* sketch = new (chsAlloc().allocate(1)) CouponHashSet<A>(lgK, tgtHllType);
-  sketch->putOutOfOrderFlag(true);
 
   if (compactFlag) {
     const uint8_t* curPos = data + HllUtil<A>::HASH_SET_INT_ARR_START;
@@ -170,7 +169,8 @@ CouponHashSet<A>* CouponHashSet<A>::newSet(std::istream& is) {
   }
 
   CouponHashSet<A>* sketch = new (chsAlloc().allocate(1)) CouponHashSet<A>(lgK, tgtHllType);
-  sketch->putOutOfOrderFlag(true);
+  typedef std::unique_ptr<CouponHashSet<A>, std::function<void(HllSketchImpl<A>*)>> coupon_hash_set_ptr;
+  coupon_hash_set_ptr ptr(sketch, sketch->get_deleter());
 
   // Don't set couponCount here;
   // we'll set later if updatable, and increment with updates if compact
@@ -181,18 +181,19 @@ CouponHashSet<A>* CouponHashSet<A>::newSet(std::istream& is) {
       sketch->couponUpdate(coupon);
     }
   } else {
-    int* oldArr = sketch->couponIntArr;
-    const size_t oldArrLen = 1 << sketch->lgCouponArrInts;
-    sketch->lgCouponArrInts = lgArrInts;
     typedef typename std::allocator_traits<A>::template rebind_alloc<int> intAlloc;
+    intAlloc().deallocate(sketch->couponIntArr, 1 << sketch->lgCouponArrInts);
+    sketch->lgCouponArrInts = lgArrInts;
     sketch->couponIntArr = intAlloc().allocate(1 << lgArrInts);
     sketch->couponCount = couponCount;
     // for stream processing, read entire list so read pointer ends up set correctly
     is.read((char*)sketch->couponIntArr, (1 << sketch->lgCouponArrInts) * sizeof(int));
-    intAlloc().deallocate(oldArr, oldArrLen);
   } 
 
-  return sketch;
+  if (!is.good())
+    throw std::runtime_error("error reading from std::istream"); 
+
+  return ptr.release();
 }
 
 template<typename A>
diff --git a/be/src/thirdparty/datasketches/CouponList-internal.hpp b/be/src/thirdparty/datasketches/CouponList-internal.hpp
index f9d3ca0..1800a37 100644
--- a/be/src/thirdparty/datasketches/CouponList-internal.hpp
+++ b/be/src/thirdparty/datasketches/CouponList-internal.hpp
@@ -34,11 +34,10 @@ CouponList<A>::CouponList(const int lgConfigK, const target_hll_type tgtHllType,
   : HllSketchImpl<A>(lgConfigK, tgtHllType, mode, false) {
     if (mode == hll_mode::LIST) {
       lgCouponArrInts = HllUtil<A>::LG_INIT_LIST_SIZE;
-      oooFlag = false;
     } else { // mode == SET
       lgCouponArrInts = HllUtil<A>::LG_INIT_SET_SIZE;
-      oooFlag = true;
     }
+    oooFlag = false;
     const int arrayLen = 1 << lgCouponArrInts;
     typedef typename std::allocator_traits<A>::template rebind_alloc<int> intAlloc;
     couponIntArr = intAlloc().allocate(arrayLen);
@@ -100,7 +99,7 @@ CouponList<A>* CouponList<A>::copyAs(target_hll_type tgtHllType) const {
 template<typename A>
 CouponList<A>* CouponList<A>::newList(const void* bytes, size_t len) {
   if (len < HllUtil<A>::LIST_INT_ARR_START) {
-    throw std::invalid_argument("Input data length insufficient to hold CouponHashSet");
+    throw std::out_of_range("Input data length insufficient to hold CouponHashSet");
   }
 
   const uint8_t* data = static_cast<const uint8_t*>(bytes);
@@ -130,7 +129,7 @@ CouponList<A>* CouponList<A>::newList(const void* bytes, size_t len) {
   const int couponsInArray = (compact ? couponCount : (1 << HllUtil<A>::computeLgArrInts(LIST, couponCount, lgK)));
   const size_t expectedLength = HllUtil<A>::LIST_INT_ARR_START + (couponsInArray * sizeof(int));
   if (len < expectedLength) {
-    throw std::invalid_argument("Byte array too short for sketch. Expected " + std::to_string(expectedLength)
+    throw std::out_of_range("Byte array too short for sketch. Expected " + std::to_string(expectedLength)
                                 + ", found: " + std::to_string(len));
   }
 
@@ -174,6 +173,8 @@ CouponList<A>* CouponList<A>::newList(std::istream& is) {
   const bool emptyFlag = ((listHeader[HllUtil<A>::FLAGS_BYTE] & HllUtil<A>::EMPTY_FLAG_MASK) ? true : false);
 
   CouponList<A>* sketch = new (clAlloc().allocate(1)) CouponList<A>(lgK, tgtHllType, mode);
+  typedef std::unique_ptr<CouponList<A>, std::function<void(HllSketchImpl<A>*)>> coupon_list_ptr;
+  coupon_list_ptr ptr(sketch, sketch->get_deleter());
   const int couponCount = listHeader[HllUtil<A>::LIST_COUNT_BYTE];
   sketch->couponCount = couponCount;
   sketch->putOutOfOrderFlag(oooFlag); // should always be false for LIST
@@ -186,7 +187,10 @@ CouponList<A>* CouponList<A>::newList(std::istream& is) {
     is.read((char*)sketch->couponIntArr, numToRead * sizeof(int));
   }
 
-  return sketch;
+  if (!is.good())
+    throw std::runtime_error("error reading from std::istream"); 
+
+  return ptr.release();
 }
 
 template<typename A>
@@ -296,9 +300,9 @@ HllSketchImpl<A>* CouponList<A>::couponUpdate(int coupon) {
       ++couponCount;
       if (couponCount >= len) { // array full
         if (this->lgConfigK < 8) {
-          return promoteHeapListOrSetToHll(*this); // oooFlag = false
+          return promoteHeapListOrSetToHll(*this);
         }
-        return promoteHeapListToSet(*this); // oooFlag = true;
+        return promoteHeapListToSet(*this);
       }
       return this;
     }
diff --git a/be/src/thirdparty/datasketches/Hll4Array-internal.hpp b/be/src/thirdparty/datasketches/Hll4Array-internal.hpp
index 30b24f7..8498bb8 100644
--- a/be/src/thirdparty/datasketches/Hll4Array-internal.hpp
+++ b/be/src/thirdparty/datasketches/Hll4Array-internal.hpp
@@ -175,7 +175,7 @@ void Hll4Array<A>::internalHll4Update(const int slotNo, const int newVal) {
       //assert(shiftedNewValue >= 0);
 
       if (rawStoredOldValue == HllUtil<A>::AUX_TOKEN) { // 879
-        // Given that we have an AUX_TOKEN, tehre are 4 cases for how to
+        // Given that we have an AUX_TOKEN, there are 4 cases for how to
         // actually modify the data structure
 
         if (shiftedNewValue >= HllUtil<A>::AUX_TOKEN) { // case 1: 881
diff --git a/be/src/thirdparty/datasketches/HllArray-internal.hpp b/be/src/thirdparty/datasketches/HllArray-internal.hpp
index b8c0a57..0a4bdce 100644
--- a/be/src/thirdparty/datasketches/HllArray-internal.hpp
+++ b/be/src/thirdparty/datasketches/HllArray-internal.hpp
@@ -95,7 +95,7 @@ HllArray<A>* HllArray<A>::copyAs(const target_hll_type tgtHllType) const {
 template<typename A>
 HllArray<A>* HllArray<A>::newHll(const void* bytes, size_t len) {
   if (len < HllUtil<A>::HLL_BYTE_ARR_START) {
-    throw std::invalid_argument("Input data length insufficient to hold HLL array");
+    throw std::out_of_range("Input data length insufficient to hold HLL array");
   }
 
   const uint8_t* data = static_cast<const uint8_t*>(bytes);
@@ -124,7 +124,7 @@ HllArray<A>* HllArray<A>::newHll(const void* bytes, size_t len) {
 
   const int arrayBytes = hllArrBytes(tgtHllType, lgK);
   if (len < static_cast<size_t>(HllUtil<A>::HLL_BYTE_ARR_START + arrayBytes)) {
-    throw std::invalid_argument("Input array too small to hold sketch image");
+    throw std::out_of_range("Input array too small to hold sketch image");
   }
 
   double hip, kxq0, kxq1;
@@ -137,17 +137,20 @@ HllArray<A>* HllArray<A>::newHll(const void* bytes, size_t len) {
   std::memcpy(&auxCount, data + HllUtil<A>::AUX_COUNT_INT, sizeof(int));
 
   AuxHashMap<A>* auxHashMap = nullptr;
+  typedef std::unique_ptr<AuxHashMap<A>, std::function<void(AuxHashMap<A>*)>> aux_hash_map_ptr;
+  aux_hash_map_ptr aux_ptr;
   if (auxCount > 0) { // necessarily TgtHllType == HLL_4
     int auxLgIntArrSize = (int) data[4];
     const size_t offset = HllUtil<A>::HLL_BYTE_ARR_START + arrayBytes;
     const uint8_t* auxDataStart = data + offset;
     auxHashMap = AuxHashMap<A>::deserialize(auxDataStart, len - offset, lgK, auxCount, auxLgIntArrSize, comapctFlag);
+    aux_ptr = aux_hash_map_ptr(auxHashMap, auxHashMap->make_deleter());
   }
 
   HllArray<A>* sketch = HllSketchImplFactory<A>::newHll(lgK, tgtHllType, startFullSizeFlag);
   sketch->putCurMin(curMin);
   sketch->putOutOfOrderFlag(oooFlag);
-  sketch->putHipAccum(hip);
+  if (!oooFlag) sketch->putHipAccum(hip);
   sketch->putKxQ0(kxq0);
   sketch->putKxQ1(kxq1);
   sketch->putNumAtCurMin(numAtCurMin);
@@ -157,6 +160,7 @@ HllArray<A>* HllArray<A>::newHll(const void* bytes, size_t len) {
   if (auxHashMap != nullptr)
     ((Hll4Array<A>*)sketch)->putAuxHashMap(auxHashMap);
 
+  aux_ptr.release();
   return sketch;
 }
 
@@ -188,8 +192,9 @@ HllArray<A>* HllArray<A>::newHll(std::istream& is) {
   const int lgK = (int) listHeader[HllUtil<A>::LG_K_BYTE];
   const int curMin = (int) listHeader[HllUtil<A>::HLL_CUR_MIN_BYTE];
 
-  // TODO: truncated stream will throw exception without freeing memory
   HllArray* sketch = HllSketchImplFactory<A>::newHll(lgK, tgtHllType, startFullSizeFlag);
+  typedef std::unique_ptr<HllArray<A>, std::function<void(HllSketchImpl<A>*)>> hll_array_ptr;
+  hll_array_ptr sketch_ptr(sketch, sketch->get_deleter());
   sketch->putCurMin(curMin);
   sketch->putOutOfOrderFlag(oooFlag);
 
@@ -197,7 +202,7 @@ HllArray<A>* HllArray<A>::newHll(std::istream& is) {
   is.read((char*)&hip, sizeof(hip));
   is.read((char*)&kxq0, sizeof(kxq0));
   is.read((char*)&kxq1, sizeof(kxq1));
-  sketch->putHipAccum(hip);
+  if (!oooFlag) sketch->putHipAccum(hip);
   sketch->putKxQ0(kxq0);
   sketch->putKxQ1(kxq1);
 
@@ -214,7 +219,10 @@ HllArray<A>* HllArray<A>::newHll(std::istream& is) {
     ((Hll4Array<A>*)sketch)->putAuxHashMap(auxHashMap);
   }
 
-  return sketch;
+  if (!is.good())
+    throw std::runtime_error("error reading from std::istream"); 
+
+  return sketch_ptr.release();
 }
 
 template<typename A>
@@ -405,7 +413,7 @@ double HllArray<A>::getCompositeEstimate() const {
   const double rawEst = getHllRawEstimate(this->lgConfigK, kxq0 + kxq1);
 
   const double* xArr = CompositeInterpolationXTable<A>::get_x_arr(this->lgConfigK);
-  const int xArrLen = CompositeInterpolationXTable<A>::get_x_arr_length(this->lgConfigK);
+  const int xArrLen = CompositeInterpolationXTable<A>::get_x_arr_length();
   const double yStride = CompositeInterpolationXTable<A>::get_y_stride(this->lgConfigK);
 
   if (rawEst < xArr[0]) {
@@ -588,7 +596,7 @@ template<typename A>
 void HllArray<A>::hipAndKxQIncrementalUpdate(uint8_t oldValue, uint8_t newValue) {
   const int configK = 1 << this->getLgConfigK();
   // update hip BEFORE updating kxq
-  hipAccum += configK / (kxq0 + kxq1);
+  if (!oooFlag) hipAccum += configK / (kxq0 + kxq1);
   // update kxq0 and kxq1; subtract first, then add
   if (oldValue < 32) { kxq0 -= INVERSE_POWERS_OF_2[oldValue]; }
   else               { kxq1 -= INVERSE_POWERS_OF_2[oldValue]; }
diff --git a/be/src/thirdparty/datasketches/HllSketch-internal.hpp b/be/src/thirdparty/datasketches/HllSketch-internal.hpp
index 6587fe8..dd16955 100644
--- a/be/src/thirdparty/datasketches/HllSketch-internal.hpp
+++ b/be/src/thirdparty/datasketches/HllSketch-internal.hpp
@@ -25,6 +25,7 @@
 #include "HllSketchImplFactory.hpp"
 #include "CouponList.hpp"
 #include "HllArray.hpp"
+#include "common_defs.hpp"
 
 #include <cstdio>
 #include <cstdlib>
@@ -73,11 +74,6 @@ hll_sketch_alloc<A>::~hll_sketch_alloc() {
 }
 
 template<typename A>
-std::ostream& operator<<(std::ostream& os, const hll_sketch_alloc<A>& sketch) {
-  return sketch.to_string(os, true, true, false, false);
-}
-
-template<typename A>
 hll_sketch_alloc<A>::hll_sketch_alloc(const hll_sketch_alloc<A>& that) :
   sketch_impl(that.sketch_impl->copy())
 {}
@@ -123,7 +119,7 @@ template<typename A>
 void hll_sketch_alloc<A>::update(const std::string& datum) {
   if (datum.empty()) { return; }
   HashState hashResult;
-  HllUtil<A>::hash(datum.c_str(), datum.length(), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+  HllUtil<A>::hash(datum.c_str(), datum.length(), DEFAULT_SEED, hashResult);
   coupon_update(HllUtil<A>::coupon(hashResult));
 }
 
@@ -131,7 +127,7 @@ template<typename A>
 void hll_sketch_alloc<A>::update(const uint64_t datum) {
   // no sign extension with 64 bits so no need to cast to signed value
   HashState hashResult;
-  HllUtil<A>::hash(&datum, sizeof(uint64_t), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+  HllUtil<A>::hash(&datum, sizeof(uint64_t), DEFAULT_SEED, hashResult);
   coupon_update(HllUtil<A>::coupon(hashResult));
 }
 
@@ -153,7 +149,7 @@ void hll_sketch_alloc<A>::update(const uint8_t datum) {
 template<typename A>
 void hll_sketch_alloc<A>::update(const int64_t datum) {
   HashState hashResult;
-  HllUtil<A>::hash(&datum, sizeof(int64_t), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+  HllUtil<A>::hash(&datum, sizeof(int64_t), DEFAULT_SEED, hashResult);
   coupon_update(HllUtil<A>::coupon(hashResult));
 }
 
@@ -161,7 +157,7 @@ template<typename A>
 void hll_sketch_alloc<A>::update(const int32_t datum) {
   int64_t val = static_cast<int64_t>(datum);
   HashState hashResult;
-  HllUtil<A>::hash(&val, sizeof(int64_t), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+  HllUtil<A>::hash(&val, sizeof(int64_t), DEFAULT_SEED, hashResult);
   coupon_update(HllUtil<A>::coupon(hashResult));
 }
 
@@ -169,7 +165,7 @@ template<typename A>
 void hll_sketch_alloc<A>::update(const int16_t datum) {
   int64_t val = static_cast<int64_t>(datum);
   HashState hashResult;
-  HllUtil<A>::hash(&val, sizeof(int64_t), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+  HllUtil<A>::hash(&val, sizeof(int64_t), DEFAULT_SEED, hashResult);
   coupon_update(HllUtil<A>::coupon(hashResult));
 }
 
@@ -177,7 +173,7 @@ template<typename A>
 void hll_sketch_alloc<A>::update(const int8_t datum) {
   int64_t val = static_cast<int64_t>(datum);
   HashState hashResult;
-  HllUtil<A>::hash(&val, sizeof(int64_t), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+  HllUtil<A>::hash(&val, sizeof(int64_t), DEFAULT_SEED, hashResult);
   coupon_update(HllUtil<A>::coupon(hashResult));
 }
 
@@ -191,7 +187,7 @@ void hll_sketch_alloc<A>::update(const double datum) {
     d.longBytes = 0x7ff8000000000000L; // canonicalize NaN using value from Java's Double.doubleToLongBits()
   }
   HashState hashResult;
-  HllUtil<A>::hash(&d, sizeof(double), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+  HllUtil<A>::hash(&d, sizeof(double), DEFAULT_SEED, hashResult);
   coupon_update(HllUtil<A>::coupon(hashResult));
 }
 
@@ -205,7 +201,7 @@ void hll_sketch_alloc<A>::update(const float datum) {
     d.longBytes = 0x7ff8000000000000L; // canonicalize NaN using value from Java's Double.doubleToLongBits()
   }
   HashState hashResult;
-  HllUtil<A>::hash(&d, sizeof(double), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+  HllUtil<A>::hash(&d, sizeof(double), DEFAULT_SEED, hashResult);
   coupon_update(HllUtil<A>::coupon(hashResult));
 }
 
@@ -213,7 +209,7 @@ template<typename A>
 void hll_sketch_alloc<A>::update(const void* data, const size_t lengthBytes) {
   if (data == nullptr) { return; }
   HashState hashResult;
-  HllUtil<A>::hash(data, lengthBytes, HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+  HllUtil<A>::hash(data, lengthBytes, DEFAULT_SEED, hashResult);
   coupon_update(HllUtil<A>::coupon(hashResult));
 }
 
@@ -248,21 +244,11 @@ vector_u8<A> hll_sketch_alloc<A>::serialize_updatable() const {
 }
 
 template<typename A>
-std::string hll_sketch_alloc<A>::to_string(const bool summary,
-                                    const bool detail,
-                                    const bool aux_detail,
-                                    const bool all) const {
-  std::ostringstream oss;
-  to_string(oss, summary, detail, aux_detail, all);
-  return oss.str();
-}
-
-template<typename A>
-std::ostream& hll_sketch_alloc<A>::to_string(std::ostream& os,
-                                      const bool summary,
-                                      const bool detail,
-                                      const bool aux_detail,
-                                      const bool all) const {
+string<A> hll_sketch_alloc<A>::to_string(const bool summary,
+                                         const bool detail,
+                                         const bool aux_detail,
+                                         const bool all) const {
+  std::basic_ostringstream<char, std::char_traits<char>, AllocChar<A>> os;
   if (summary) {
     os << "### HLL sketch summary:" << std::endl
        << "  Log Config K   : " << get_lg_config_k() << std::endl
@@ -279,6 +265,10 @@ std::ostream& hll_sketch_alloc<A>::to_string(std::ostream& os,
          << "  HipAccum       : " << hllArray->getHipAccum() << std::endl
          << "  KxQ0           : " << hllArray->getKxQ0() << std::endl
          << "  KxQ1           : " << hllArray->getKxQ1() << std::endl;
+      if (get_target_type() == HLL_4) {
+        const Hll4Array<A>* hll4_ptr = static_cast<const Hll4Array<A>*>(sketch_impl);
+        os << "  Aux table?     : " << (hll4_ptr->getAuxHashMap() != nullptr ? "true" : "false") << std::endl;
+      }
     } else {
       os << "  Coupon count   : "
          << std::to_string(((CouponList<A>*) sketch_impl)->getCouponCount()) << std::endl;
@@ -350,7 +340,7 @@ std::ostream& hll_sketch_alloc<A>::to_string(std::ostream& os,
     }
   }
 
-  return os;
+  return os.str();
 }
 
 template<typename A>
diff --git a/be/src/thirdparty/datasketches/HllSketchImplFactory.hpp b/be/src/thirdparty/datasketches/HllSketchImplFactory.hpp
index eae6f75..eb8dd77 100644
--- a/be/src/thirdparty/datasketches/HllSketchImplFactory.hpp
+++ b/be/src/thirdparty/datasketches/HllSketchImplFactory.hpp
@@ -56,7 +56,6 @@ CouponHashSet<A>* HllSketchImplFactory<A>::promoteListToSet(const CouponList<A>&
   for (auto coupon: list) {
     chSet->couponUpdate(coupon);
   }
-  chSet->putOutOfOrderFlag(true);
   return chSet;
 }
 
diff --git a/be/src/thirdparty/datasketches/HllUnion-internal.hpp b/be/src/thirdparty/datasketches/HllUnion-internal.hpp
index 0d039f2..5aa184f 100644
--- a/be/src/thirdparty/datasketches/HllUnion-internal.hpp
+++ b/be/src/thirdparty/datasketches/HllUnion-internal.hpp
@@ -66,11 +66,6 @@ hll_union_alloc<A> hll_union_alloc<A>::deserialize(std::istream& is) {
 }
 
 template<typename A>
-static std::ostream& operator<<(std::ostream& os, const hll_union_alloc<A>& hllUnion) {
-  return hllUnion.to_string(os, true, true, false, false);
-}
-
-template<typename A>
 hll_sketch_alloc<A> hll_union_alloc<A>::get_result(target_hll_type target_type) const {
   return hll_sketch_alloc<A>(gadget, target_type);
 }
@@ -163,38 +158,6 @@ void hll_union_alloc<A>::coupon_update(const int coupon) {
 }
 
 template<typename A>
-vector_u8<A> hll_union_alloc<A>::serialize_compact() const {
-  return gadget.serialize_compact();
-}
-
-template<typename A>
-vector_u8<A> hll_union_alloc<A>::serialize_updatable() const {
-  return gadget.serialize_updatable();
-}
-
-template<typename A>
-void hll_union_alloc<A>::serialize_compact(std::ostream& os) const {
-  return gadget.serialize_compact(os);
-}
-
-template<typename A>
-void hll_union_alloc<A>::serialize_updatable(std::ostream& os) const {
-  return gadget.serialize_updatable(os);
-}
-
-template<typename A>
-std::ostream& hll_union_alloc<A>::to_string(std::ostream& os, const bool summary,
-                                  const bool detail, const bool aux_detail, const bool all) const {
-  return gadget.to_string(os, summary, detail, aux_detail, all);
-}
-
-template<typename A>
-std::string hll_union_alloc<A>::to_string(const bool summary, const bool detail,
-                                   const bool aux_detail, const bool all) const {
-  return gadget.to_string(summary, detail, aux_detail, all);
-}
-
-template<typename A>
 double hll_union_alloc<A>::get_estimate() const {
   return gadget.get_estimate();
 }
@@ -315,7 +278,7 @@ void hll_union_alloc<A>::union_impl(const hll_sketch_alloc<A>& sketch, const int
   if (src_impl->getCurMode() == LIST || src_impl->getCurMode() == SET) {
     if (dst_impl->isEmpty() && src_impl->getLgConfigK() == dst_impl->getLgConfigK()) {
       dst_impl = src_impl->copyAs(HLL_8);
-      gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget replaced
+      gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget to be replaced
     } else {
       const CouponList<A>* src = static_cast<const CouponList<A>*>(src_impl);
       for (auto coupon: *src) {
@@ -329,21 +292,22 @@ void hll_union_alloc<A>::union_impl(const hll_sketch_alloc<A>& sketch, const int
       const CouponList<A>* src = static_cast<const CouponList<A>*>(dst_impl);
       dst_impl = copy_or_downsample(src_impl, lg_max_k);
       static_cast<Hll8Array<A>*>(dst_impl)->mergeList(*src);
-      gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget replaced
+      gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget to be replaced
     } else { // gadget is HLL
       if (src_impl->getLgConfigK() < dst_impl->getLgConfigK()) {
         dst_impl = copy_or_downsample(dst_impl, sketch.get_lg_config_k());
-        gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget replaced
+        gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget to be replaced
       }
       const HllArray<A>* src = static_cast<const HllArray<A>*>(src_impl);
       static_cast<Hll8Array<A>*>(dst_impl)->mergeHll(*src);
       dst_impl->putOutOfOrderFlag(true);
+      static_cast<Hll8Array<A>*>(dst_impl)->putHipAccum(0);
     }
   } else { // src is HLL, gadget is empty
     dst_impl = copy_or_downsample(src_impl, lg_max_k);
-    gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget replaced
+    gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget to be replaced
   }
-  gadget.sketch_impl = dst_impl;
+  gadget.sketch_impl = dst_impl; // gadget replaced
 }
 
 }
diff --git a/be/src/thirdparty/datasketches/HllUtil.hpp b/be/src/thirdparty/datasketches/HllUtil.hpp
index eb956bc..1bea84a 100644
--- a/be/src/thirdparty/datasketches/HllUtil.hpp
+++ b/be/src/thirdparty/datasketches/HllUtil.hpp
@@ -22,7 +22,8 @@
 
 #include "MurmurHash3.h"
 #include "RelativeErrorTables.hpp"
-#include "CommonUtil.hpp"
+#include "count_zeros.hpp"
+#include "common_defs.hpp"
 
 #include <cmath>
 #include <stdexcept>
@@ -83,8 +84,6 @@ public:
   static const int MIN_LOG_K = 4;
   static const int MAX_LOG_K = 21;
 
-  static const uint64_t DEFAULT_UPDATE_SEED = 9001L;
-
   static const double HLL_HIP_RSE_FACTOR; // sqrt(log(2.0)) = 0.8325546
   static const double HLL_NON_HIP_RSE_FACTOR; // sqrt((3.0 * log(2.0)) - 1.0) = 1.03896
   static const double COUPON_RSE_FACTOR; // 0.409 at transition point not the asymptote
@@ -108,7 +107,6 @@ public:
   static int coupon(const uint64_t hash[]);
   static int coupon(const HashState& hashState);
   static void hash(const void* key, int keyLen, uint64_t seed, HashState& result);
-
   static int checkLgK(int lgK);
   static void checkMemSize(uint64_t minBytes, uint64_t capBytes);
   static inline void checkNumStdDev(int numStdDev);
@@ -118,8 +116,6 @@ public:
   static double invPow2(int e);
   static unsigned int ceilingPowerOf2(unsigned int n);
   static unsigned int simpleIntLog2(unsigned int n); // n must be power of 2
-  static unsigned int getNumberOfLeadingZeros(uint64_t x);
-  static unsigned int numberOfTrailingZeros(uint32_t n);
   static int computeLgArrInts(hll_mode mode, int count, int lgConfigK);
   static double getRelErr(bool upperBound, bool unioned,
                           int lgConfigK, int numStdDev);
@@ -144,7 +140,7 @@ const int HllUtil<A>::LG_AUX_ARR_INTS[] = {
 template<typename A>
 inline int HllUtil<A>::coupon(const uint64_t hash[]) {
   int addr26 = (int) (hash[0] & KEY_MASK_26);
-  int lz = CommonUtil::getNumberOfLeadingZeros(hash[1]);
+  int lz = count_leading_zeros_in_u64(hash[1]);
   int value = ((lz > 62 ? 62 : lz) + 1); 
   return (value << KEY_BITS_26) | addr26;
 }
@@ -152,14 +148,14 @@ inline int HllUtil<A>::coupon(const uint64_t hash[]) {
 template<typename A>
 inline int HllUtil<A>::coupon(const HashState& hashState) {
   int addr26 = (int) (hashState.h1 & KEY_MASK_26);
-  int lz = CommonUtil::getNumberOfLeadingZeros(hashState.h2);  
+  int lz = count_leading_zeros_in_u64(hashState.h2);  
   int value = ((lz > 62 ? 62 : lz) + 1); 
   return (value << KEY_BITS_26) | addr26;
 }
 
 template<typename A>
 inline void HllUtil<A>::hash(const void* key, const int keyLen, const uint64_t seed, HashState& result) {
-  MurmurHash3_x64_128(key, keyLen, DEFAULT_UPDATE_SEED, result);
+  MurmurHash3_x64_128(key, keyLen, seed, result);
 }
 
 template<typename A>
@@ -219,7 +215,7 @@ inline double HllUtil<A>::invPow2(const int e) {
 // compute the next highest power of 2 of 32-bit n
 // taken from https://graphics.stanford.edu/~seander/bithacks.html
 template<typename A>
-inline unsigned int HllUtil<A>::ceilingPowerOf2(unsigned int n) {
+inline uint32_t HllUtil<A>::ceilingPowerOf2(uint32_t n) {
   --n;
   n |= n >> 1;
   n |= n >> 2;
@@ -230,45 +226,11 @@ inline unsigned int HllUtil<A>::ceilingPowerOf2(unsigned int n) {
 }
 
 template<typename A>
-inline unsigned int HllUtil<A>::simpleIntLog2(unsigned int n) {
+inline uint32_t HllUtil<A>::simpleIntLog2(uint32_t n) {
   if (n == 0) {
     throw std::logic_error("cannot take log of 0");
   }
-  const unsigned int e = numberOfTrailingZeros(n);
-  return e;
-}
-
-// taken from https://graphics.stanford.edu/~seander/bithacks.html
-// input is 32-bit word to count zero bits on right
-template<typename A>
-inline unsigned int HllUtil<A>::numberOfTrailingZeros(uint32_t v) {
-  unsigned int c;     // c will be the number of zero bits on the right,
-                      // so if v is 1101000 (base 2), then c will be 3
-  // NOTE: if 0 == v, then c = 31.
-  if (v & 0x1) {
-    // special case for odd v (assumed to happen half of the time)
-    c = 0;
-  } else {
-    c = 1;
-    if ((v & 0xffff) == 0) {  
-      v >>= 16;  
-      c += 16;
-    }
-    if ((v & 0xff) == 0) {  
-      v >>= 8;  
-      c += 8;
-    }
-    if ((v & 0xf) == 0) {  
-      v >>= 4;
-      c += 4;
-    }
-    if ((v & 0x3) == 0) {  
-      v >>= 2;
-      c += 2;
-    }
-    c -= v & 0x1;
-  }
-  return c;	
+  return count_trailing_zeros_in_u32(n);
 }
 
 template<typename A>
diff --git a/be/src/thirdparty/datasketches/MurmurHash3.h b/be/src/thirdparty/datasketches/MurmurHash3.h
index f68e989..b438c7d 100644
--- a/be/src/thirdparty/datasketches/MurmurHash3.h
+++ b/be/src/thirdparty/datasketches/MurmurHash3.h
@@ -132,22 +132,22 @@ FORCE_INLINE void MurmurHash3_x64_128(const void* key, int lenBytes, uint64_t se
 
   switch(lenBytes & 15)
   {
-  case 15: k2 ^= ((uint64_t)tail[14]) << 48; //@suppress("No break at end of case")
-  case 14: k2 ^= ((uint64_t)tail[13]) << 40; //@suppress("No break at end of case")
-  case 13: k2 ^= ((uint64_t)tail[12]) << 32; //@suppress("No break at end of case")
-  case 12: k2 ^= ((uint64_t)tail[11]) << 24; //@suppress("No break at end of case")
-  case 11: k2 ^= ((uint64_t)tail[10]) << 16; //@suppress("No break at end of case")
-  case 10: k2 ^= ((uint64_t)tail[ 9]) << 8;  //@suppress("No break at end of case")
+  case 15: k2 ^= ((uint64_t)tail[14]) << 48; // falls through
+  case 14: k2 ^= ((uint64_t)tail[13]) << 40; // falls through
+  case 13: k2 ^= ((uint64_t)tail[12]) << 32; // falls through
+  case 12: k2 ^= ((uint64_t)tail[11]) << 24; // falls through
+  case 11: k2 ^= ((uint64_t)tail[10]) << 16; // falls through
+  case 10: k2 ^= ((uint64_t)tail[ 9]) << 8;  // falls through
   case  9: k2 ^= ((uint64_t)tail[ 8]) << 0;
            k2 *= c2; k2  = ROTL64(k2,33); k2 *= c1; out.h2 ^= k2;
-           //@suppress("No break at end of case")
-  case  8: k1 ^= ((uint64_t)tail[ 7]) << 56; //@suppress("No break at end of case")
-  case  7: k1 ^= ((uint64_t)tail[ 6]) << 48; //@suppress("No break at end of case")
-  case  6: k1 ^= ((uint64_t)tail[ 5]) << 40; //@suppress("No break at end of case")
-  case  5: k1 ^= ((uint64_t)tail[ 4]) << 32; //@suppress("No break at end of case")
-  case  4: k1 ^= ((uint64_t)tail[ 3]) << 24; //@suppress("No break at end of case")
-  case  3: k1 ^= ((uint64_t)tail[ 2]) << 16; //@suppress("No break at end of case")
-  case  2: k1 ^= ((uint64_t)tail[ 1]) << 8; //@suppress("No break at end of case")
+           // falls through
+  case  8: k1 ^= ((uint64_t)tail[ 7]) << 56; // falls through
+  case  7: k1 ^= ((uint64_t)tail[ 6]) << 48; // falls through
+  case  6: k1 ^= ((uint64_t)tail[ 5]) << 40; // falls through
+  case  5: k1 ^= ((uint64_t)tail[ 4]) << 32; // falls through
+  case  4: k1 ^= ((uint64_t)tail[ 3]) << 24; // falls through
+  case  3: k1 ^= ((uint64_t)tail[ 2]) << 16; // falls through
+  case  2: k1 ^= ((uint64_t)tail[ 1]) << 8; // falls through
   case  1: k1 ^= ((uint64_t)tail[ 0]) << 0;
            k1 *= c1; k1  = ROTL64(k1,31); k1 *= c2; out.h1 ^= k1;
   };
diff --git a/be/src/thirdparty/datasketches/README.md b/be/src/thirdparty/datasketches/README.md
index 51e1d57..d5c56ce 100644
--- a/be/src/thirdparty/datasketches/README.md
+++ b/be/src/thirdparty/datasketches/README.md
@@ -1,14 +1,15 @@
-The content of this folder imports the functionality needed for HLL approximate
-algorithm from Apache DataSketches by copying the necessary files from that
-project into this folder. Note, that the original structure of files was
-changed during this process as originally hll/ and common/ libraries were
-both affected but I copied these into the same directory so that Impala can
-compile them without rewriting the include paths in the files themselves. Also
-note, that not the whole common/ directory was copied just the files needed for
-HLL.
+The content of this folder imports the functionality needed for HLL and KLL
+approximate algorithms from Apache DataSketches by copying the necessary files
+from that project into this folder. Note, that the original structure of files was
+changed during this process as originally the following folders were affected:
+  hll/include/
+  kll/include/
+  common/include/
+I copied the content of these folders into the same directory so that Impala
+can compile them without rewriting the include paths in the files themselves.
 
 The git hash of the snapshot I used as a source for the files:
-a6265b307a03085abe26c20413fdbf7d7a5eaf29
+c67d92faad3827932ca3b5d864222e64977f2c20
 
 Browse the source files here:
 https://github.com/apache/incubator-datasketches-cpp
diff --git a/be/src/thirdparty/datasketches/bounds_binomial_proportions.hpp b/be/src/thirdparty/datasketches/bounds_binomial_proportions.hpp
new file mode 100644
index 0000000..06ab484
--- /dev/null
+++ b/be/src/thirdparty/datasketches/bounds_binomial_proportions.hpp
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _BOUNDS_BINOMIAL_PROPORTIONS_HPP_
+#define _BOUNDS_BINOMIAL_PROPORTIONS_HPP_
+
+#include <cmath>
+#include <stdexcept>
+
+namespace datasketches {
+
+/**
+ * Confidence intervals for binomial proportions.
+ *
+ * <p>This class computes an approximation to the Clopper-Pearson confidence interval
+ * for a binomial proportion. Exact Clopper-Pearson intervals are strictly
+ * conservative, but these approximations are not.</p>
+ *
+ * <p>The main inputs are numbers <i>n</i> and <i>k</i>, which are not the same as other things
+ * that are called <i>n</i> and <i>k</i> in our sketching library. There is also a third
+ * parameter, numStdDev, that specifies the desired confidence level.</p>
+ * <ul>
+ * <li><i>n</i> is the number of independent randomized trials. It is given and therefore known.
+ * </li>
+ * <li><i>p</i> is the probability of a trial being a success. It is unknown.</li>
+ * <li><i>k</i> is the number of trials (out of <i>n</i>) that turn out to be successes. It is
+ * a random variable governed by a binomial distribution. After any given
+ * batch of <i>n</i> independent trials, the random variable <i>k</i> has a specific
+ * value which is observed and is therefore known.</li>
+ * <li><i>pHat</i> = <i>k</i> / <i>n</i> is an unbiased estimate of the unknown success
+ * probability <i>p</i>.</li>
+ * </ul>
+ *
+ * <p>Alternatively, consider a coin with unknown heads probability <i>p</i>. Where
+ * <i>n</i> is the number of independent flips of that coin, and <i>k</i> is the number
+ * of times that the coin comes up heads during a given batch of <i>n</i> flips.
+ * This class computes a frequentist confidence interval [lowerBoundOnP, upperBoundOnP] for the
+ * unknown <i>p</i>.</p>
+ *
+ * <p>Conceptually, the desired confidence level is specified by a tail probability delta.</p>
+ *
+ * <p>Ideally, over a large ensemble of independent batches of trials,
+ * the fraction of batches in which the true <i>p</i> lies below lowerBoundOnP would be at most
+ * delta, and the fraction of batches in which the true <i>p</i> lies above upperBoundOnP
+ * would also be at most delta.
+ *
+ * <p>Setting aside the philosophical difficulties attaching to that statement, it isn't quite
+ * true because we are approximating the Clopper-Pearson interval.</p>
+ *
+ * <p>Finally, we point out that in this class's interface, the confidence parameter delta is
+ * not specified directly, but rather through a "number of standard deviations" numStdDev.
+ * The library effectively converts that to a delta via delta = normalCDF (-1.0 * numStdDev).</p>
+ *
+ * <p>It is perhaps worth emphasizing that the library is NOT merely adding and subtracting
+ * numStdDev standard deviations to the estimate. It is doing something better, that to some
+ * extent accounts for the fact that the binomial distribution has a non-gaussian shape.</p>
+ *
+ * <p>In particular, it is using an approximation to the inverse of the incomplete beta function
+ * that appears as formula 26.5.22 on page 945 of the "Handbook of Mathematical Functions"
+ * by Abramowitz and Stegun.</p>
+ *
+ * @author Kevin Lang
+ * @author Jon Malkin
+ */
+class bounds_binomial_proportions { // confidence intervals for binomial proportions
+
+public:
+  /**
+   * Computes lower bound of approximate Clopper-Pearson confidence interval for a binomial
+   * proportion.
+   *
+   * <p>Implementation Notes:<br>
+   * The approximateLowerBoundOnP is defined with respect to the right tail of the binomial
+   * distribution.</p>
+   * <ul>
+   * <li>We want to solve for the <i>p</i> for which sum<sub><i>j,k,n</i></sub>bino(<i>j;n,p</i>)
+   * = delta.</li>
+   * <li>We now restate that in terms of the left tail.</li>
+   * <li>We want to solve for the p for which sum<sub><i>j,0,(k-1)</i></sub>bino(<i>j;n,p</i>)
+   * = 1 - delta.</li>
+   * <li>Define <i>x</i> = 1-<i>p</i>.</li>
+   * <li>We want to solve for the <i>x</i> for which I<sub><i>x(n-k+1,k)</i></sub> = 1 - delta.</li>
+   * <li>We specify 1-delta via numStdDevs through the right tail of the standard normal
+   * distribution.</li>
+   * <li>Smaller values of numStdDevs correspond to bigger values of 1-delta and hence to smaller
+   * values of delta. In fact, usefully small values of delta correspond to negative values of
+   * numStdDevs.</li>
+   * <li>return <i>p</i> = 1-<i>x</i>.</li>
+   * </ul>
+   *
+   * @param n is the number of trials. Must be non-negative.
+   * @param k is the number of successes. Must be non-negative, and cannot exceed n.
+   * @param num_std_devs the number of standard deviations defining the confidence interval
+   * @return the lower bound of the approximate Clopper-Pearson confidence interval for the
+   * unknown success probability.
+   */
+  static inline double approximate_lower_bound_on_p(long n, long k, double num_std_devs) {
+    check_inputs(n, k);
+    if (n == 0) { return 0.0; } // the coin was never flipped, so we know nothing
+    else if (k == 0) { return 0.0; }
+    else if (k == 1) { return (exact_lower_bound_on_p_k_eq_1(n, delta_of_num_stdevs(num_std_devs))); }
+    else if (k == n) { return (exact_lower_bound_on_p_k_eq_n(n, delta_of_num_stdevs(num_std_devs))); }
+    else {
+      double x = abramowitz_stegun_formula_26p5p22((n - k) + 1, k, (-1.0 * num_std_devs));
+      return (1.0 - x); // which is p
+    }
+  }
+
+  /**
+   * Computes upper bound of approximate Clopper-Pearson confidence interval for a binomial
+   * proportion.
+   *
+   * <p>Implementation Notes:<br>
+   * The approximateUpperBoundOnP is defined with respect to the left tail of the binomial
+   * distribution.</p>
+   * <ul>
+   * <li>We want to solve for the <i>p</i> for which sum<sub><i>j,0,k</i></sub>bino(<i>j;n,p</i>)
+   * = delta.</li>
+   * <li>Define <i>x</i> = 1-<i>p</i>.</li>
+   * <li>We want to solve for the <i>x</i> for which I<sub><i>x(n-k,k+1)</i></sub> = delta.</li>
+   * <li>We specify delta via numStdDevs through the right tail of the standard normal
+   * distribution.</li>
+   * <li>Bigger values of numStdDevs correspond to smaller values of delta.</li>
+   * <li>return <i>p</i> = 1-<i>x</i>.</li>
+   * </ul>
+   * @param n is the number of trials. Must be non-negative.
+   * @param k is the number of successes. Must be non-negative, and cannot exceed <i>n</i>.
+   * @param num_std_devs the number of standard deviations defining the confidence interval
+   * @return the upper bound of the approximate Clopper-Pearson confidence interval for the
+   * unknown success probability.
+   */
+  static inline double approximate_upper_bound_on_p(long n, long k, double num_std_devs) {
+    check_inputs(n, k);
+    if (n == 0) { return 1.0; } // the coin was never flipped, so we know nothing
+    else if (k == n) { return 1.0; }
+    else if (k == (n - 1)) {
+      return (exactU_upper_bound_on_p_k_eq_minusone(n, delta_of_num_stdevs(num_std_devs)));
+    }
+    else if (k == 0) {
+      return (exact_upper_bound_on_p_k_eq_zero(n, delta_of_num_stdevs(num_std_devs)));
+    }
+    else {
+      double x = abramowitz_stegun_formula_26p5p22(n - k, k + 1, num_std_devs);
+      return (1.0 - x); // which is p
+    }
+  }
+
+  /**
+   * Computes an estimate of an unknown binomial proportion.
+   * @param n is the number of trials. Must be non-negative.
+   * @param k is the number of successes. Must be non-negative, and cannot exceed n.
+   * @return the estimate of the unknown binomial proportion.
+   */
+  static inline double estimate_unknown_p(long n, long k) {
+    check_inputs(n, k);
+    if (n == 0) { return 0.5; } // the coin was never flipped, so we know nothing
+    else { return ((double) k / (double) n); }
+  }
+
+  /**
+   * Computes an approximation to the erf() function.
+   * @param x is the input to the erf function
+   * @return returns erf(x), accurate to roughly 7 decimal digits.
+   */
+  static inline double erf(double x) {
+    if (x < 0.0) { return (-1.0 * (erf_of_nonneg(-1.0 * x))); }
+    else { return (erf_of_nonneg(x)); }
+  }
+
+  /**
+   * Computes an approximation to normal_cdf(x).
+   * @param x is the input to the normal_cdf function
+   * @return returns the approximation to normalCDF(x).
+   */
+   static inline double normal_cdf(double x) {
+    return (0.5 * (1.0 + (erf(x / (sqrt(2.0))))));
+  }
+
+private:
+  static inline void check_inputs(long n, long k) {
+    if (n < 0) { throw std::invalid_argument("N must be non-negative"); }
+    if (k < 0) { throw std::invalid_argument("K must be non-negative"); }
+    if (k > n) { throw std::invalid_argument("K cannot exceed N"); }
+  }
+
+  //@formatter:off
+  // Abramowitz and Stegun formula 7.1.28, p. 88; Claims accuracy of about 7 decimal digits */
+  static inline double erf_of_nonneg(double x) {
+    // The constants that appear below, formatted for easy checking against the book.
+    //    a1 = 0.07052 30784
+    //    a3 = 0.00927 05272
+    //    a5 = 0.00027 65672
+    //    a2 = 0.04228 20123
+    //    a4 = 0.00015 20143
+    //    a6 = 0.00004 30638
+    static const double a1 = 0.0705230784;
+    static const double a3 = 0.0092705272;
+    static const double a5 = 0.0002765672;
+    static const double a2 = 0.0422820123;
+    static const double a4 = 0.0001520143;
+    static const double a6 = 0.0000430638;
+    const double x2 = x * x; // x squared, x cubed, etc.
+    const double x3 = x2 * x;
+    const double x4 = x2 * x2;
+    const double x5 = x2 * x3;
+    const double x6 = x3 * x3;
+    const double sum = ( 1.0
+                 + (a1 * x)
+                 + (a2 * x2)
+                 + (a3 * x3)
+                 + (a4 * x4)
+                 + (a5 * x5)
+                 + (a6 * x6) );
+    const double sum2 = sum * sum; // raise the sum to the 16th power
+    const double sum4 = sum2 * sum2;
+    const double sum8 = sum4 * sum4;
+    const double sum16 = sum8 * sum8;
+    return (1.0 - (1.0 / sum16));
+  }
+
+  static inline double delta_of_num_stdevs(double kappa) {
+    return (normal_cdf(-1.0 * kappa));
+  }
+
+  //@formatter:on
+  // Formula 26.5.22 on page 945 of Abramowitz & Stegun, which is an approximation
+  // of the inverse of the incomplete beta function I_x(a,b) = delta
+  // viewed as a scalar function of x.
+  // In other words, we specify delta, and it gives us x (with a and b held constant).
+  // However, delta is specified in an indirect way through yp which
+  // is the number of stdDevs that leaves delta probability in the right
+  // tail of a standard gaussian distribution.
+
+  // We point out that the variable names correspond to those in the book,
+  // and it is worth keeping it that way so that it will always be easy to verify
+  // that the formula was typed in correctly.
+
+  static inline double abramowitz_stegun_formula_26p5p22(double a, double b,
+      double yp) {
+    const double b2m1 = (2.0 * b) - 1.0;
+    const double a2m1 = (2.0 * a) - 1.0;
+    const double lambda = ((yp * yp) - 3.0) / 6.0;
+    const double htmp = (1.0 / a2m1) + (1.0 / b2m1);
+    const double h = 2.0 / htmp;
+    const double term1 = (yp * (sqrt(h + lambda))) / h;
+    const double term2 = (1.0 / b2m1) - (1.0 / a2m1);
+    const double term3 = (lambda + (5.0 / 6.0)) - (2.0 / (3.0 * h));
+    const double w = term1 - (term2 * term3);
+    const double xp = a / (a + (b * (exp(2.0 * w))));
+    return xp;
+  }
+
+  // Formulas for some special cases.
+
+  static inline double exact_upper_bound_on_p_k_eq_zero(double n, double delta) {
+    return (1.0 - pow(delta, (1.0 / n)));
+  }
+
+  static inline double exact_lower_bound_on_p_k_eq_n(double n, double delta) {
+    return (pow(delta, (1.0 / n)));
+  }
+
+  static inline double exact_lower_bound_on_p_k_eq_1(double n, double delta) {
+    return (1.0 - pow((1.0 - delta), (1.0 / n)));
+  }
+
+  static inline double exactU_upper_bound_on_p_k_eq_minusone(double n, double delta) {
+    return (pow((1.0 - delta), (1.0 / n)));
+  }
+
+};
+
+}
+
+#endif // _BOUNDS_BINOMIAL_PROPORTIONS_HPP_
diff --git a/be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp b/be/src/thirdparty/datasketches/common_defs.hpp
similarity index 66%
copy from be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp
copy to be/src/thirdparty/datasketches/common_defs.hpp
index 67563c5..7a7b40d 100644
--- a/be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp
+++ b/be/src/thirdparty/datasketches/common_defs.hpp
@@ -17,24 +17,20 @@
  * under the License.
  */
 
-#ifndef _COMPOSITEINTERPOLATIONXTABLE_HPP_
-#define _COMPOSITEINTERPOLATIONXTABLE_HPP_
+#ifndef _COMMON_DEFS_HPP_
+#define _COMMON_DEFS_HPP_
 
+#include <cstdint>
+#include <string>
 #include <memory>
 
 namespace datasketches {
 
-template<typename A = std::allocator<char>>
-class CompositeInterpolationXTable {
-  public:
-    static const int get_y_stride(int logK);
+static const uint64_t DEFAULT_SEED = 9001;
 
-    static const double* const get_x_arr(int logK);
-    static const int get_x_arr_length(int logK);
-};
+template<typename A> using AllocChar = typename std::allocator_traits<A>::template rebind_alloc<char>;
+template<typename A> using string = std::basic_string<char, std::char_traits<char>, AllocChar<A>>;
 
-}
+} // namespace
 
-#include "CompositeInterpolationXTable-internal.hpp"
-
-#endif /* _COMPOSITEINTERPOLATIONXTABLE_HPP_ */
\ No newline at end of file
+#endif // _COMMON_DEFS_HPP_
diff --git a/be/src/thirdparty/datasketches/count_zeros.hpp b/be/src/thirdparty/datasketches/count_zeros.hpp
new file mode 100644
index 0000000..0c9f6b4
--- /dev/null
+++ b/be/src/thirdparty/datasketches/count_zeros.hpp
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _COUNT_ZEROS_HPP_
+#define _COUNT_ZEROS_HPP_
+
+#include <cstdint>
+
+#include <stdio.h>
+
+namespace datasketches {
+
+static const uint8_t byte_leading_zeros_table[256] = {
+    8, 7, 6, 6, 5, 5, 5, 5, 4, 4, 4, 4, 4, 4, 4, 4,
+    3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
+    2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
+    2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
+    1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
+    1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
+    1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
+    1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
+    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
+};
+
+static const uint8_t byte_trailing_zeros_table[256] = {
+    8, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    6, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    7, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    6, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+    4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0
+};
+
+static const uint64_t FCLZ_MASK_56 = 0x00ffffffffffffff;
+static const uint64_t FCLZ_MASK_48 = 0x0000ffffffffffff;
+static const uint64_t FCLZ_MASK_40 = 0x000000ffffffffff;
+static const uint64_t FCLZ_MASK_32 = 0x00000000ffffffff;
+static const uint64_t FCLZ_MASK_24 = 0x0000000000ffffff;
+static const uint64_t FCLZ_MASK_16 = 0x000000000000ffff;
+static const uint64_t FCLZ_MASK_08 = 0x00000000000000ff;
+
+static inline uint8_t count_leading_zeros_in_u64(uint64_t input) {
+  if (input > FCLZ_MASK_56)
+    return      byte_leading_zeros_table[(input >> 56) & FCLZ_MASK_08];
+  if (input > FCLZ_MASK_48)
+    return  8 + byte_leading_zeros_table[(input >> 48) & FCLZ_MASK_08];
+  if (input > FCLZ_MASK_40)
+    return 16 + byte_leading_zeros_table[(input >> 40) & FCLZ_MASK_08];
+  if (input > FCLZ_MASK_32)
+    return 24 + byte_leading_zeros_table[(input >> 32) & FCLZ_MASK_08];
+  if (input > FCLZ_MASK_24)
+    return 32 + byte_leading_zeros_table[(input >> 24) & FCLZ_MASK_08];
+  if (input > FCLZ_MASK_16)
+    return 40 + byte_leading_zeros_table[(input >> 16) & FCLZ_MASK_08];
+  if (input > FCLZ_MASK_08)
+    return 48 + byte_leading_zeros_table[(input >>  8) & FCLZ_MASK_08];
+  if (true)
+    return 56 + byte_leading_zeros_table[(input      ) & FCLZ_MASK_08];
+}
+
+static inline uint8_t count_trailing_zeros_in_u32(uint32_t input) {
+  for (int i = 0; i < 4; i++) {
+    const int byte = input & 0xff;
+    if (byte != 0) return (i << 3) + byte_trailing_zeros_table[byte];
+    input >>= 8;
+  }
+  return 32;
+}
+
+static inline uint8_t count_trailing_zeros_in_u64(uint64_t input) {
+  for (int i = 0; i < 8; i++) {
+    const int byte = input & 0xff;
+    if (byte != 0) return (i << 3) + byte_trailing_zeros_table[byte];
+    input >>= 8;
+  }
+  return 64;
+}
+
+} /* namespace datasketches */
+
+#endif // _COUNT_ZEROS_HPP_
diff --git a/be/src/thirdparty/datasketches/hll.hpp b/be/src/thirdparty/datasketches/hll.hpp
index 31db794..c76ba48 100644
--- a/be/src/thirdparty/datasketches/hll.hpp
+++ b/be/src/thirdparty/datasketches/hll.hpp
@@ -20,6 +20,7 @@
 #ifndef _HLL_HPP_
 #define _HLL_HPP_
 
+#include "common_defs.hpp"
 #include "HllUtil.hpp"
 
 #include <memory>
@@ -194,31 +195,16 @@ class hll_sketch_alloc final {
 
     /**
      * Human readable summary with optional detail
-     * @param os std::ostram to which the summary is written
      * @param summary if true, output the sketch summary
      * @param detail if true, output the internal data array
      * @param auxDetail if true, output the internal Aux array, if it exists.
      * @param all if true, outputs all entries including empty ones
      * @return human readable string with optional detail.
      */
-    std::ostream& to_string(std::ostream& os,
-                            bool summary = true,
-                            bool detail = false,
-                            bool aux_detail = false,
-                            bool all = false) const;
-
-    /**
-     * Human readable summary with optional detail
-     * @param summary if true, output the sketch summary
-     * @param detail if true, output the internal data array
-     * @param auxDetail if true, output the internal Aux array, if it exists.
-     * @param all if true, outputs all entries including empty ones
-     * @return human readable string with optional detail.
-     */
-    std::string to_string(bool summary = true,
-                          bool detail = false,
-                          bool aux_detail = false,
-                          bool all = false) const;
+    string<A> to_string(bool summary = true,
+                        bool detail = false,
+                        bool aux_detail = false,
+                        bool all = false) const;
 
     /**
      * Present the given std::string as a potential unique item.
@@ -547,63 +533,6 @@ class hll_union_alloc {
      */
     hll_sketch_alloc<A> get_result(target_hll_type tgt_type = HLL_4) const;
 
-    typedef vector_u8<A> vector_bytes; // alias for users
-
-    /**
-     * Serializes the sketch to a byte array, compacting data structures
-     * where feasible to eliminate unused storage in the serialized image.
-     * @param header_size_bytes Allows for PostgreSQL integration
-     */
-    vector_bytes serialize_compact() const;
-  
-    /**
-     * Serializes the sketch to a byte array, retaining all internal 
-     * data structures in their current form.
-     */
-    vector_bytes serialize_updatable() const;
-
-    /**
-     * Serializes the sketch to an ostream, compacting data structures
-     * where feasible to eliminate unused storage in the serialized image.
-     * @param os std::ostream to use for output.
-     */
-    void serialize_compact(std::ostream& os) const;
-
-    /**
-     * Serializes the sketch to an ostream, retaining all internal data
-     * structures in their current form.
-     * @param os std::ostream to use for output.
-     */
-    void serialize_updatable(std::ostream& os) const;
-
-    /**
-     * Human readable summary with optional detail
-     * @param os std::ostram to which the summary is written
-     * @param summary if true, output the sketch summary
-     * @param detail if true, output the internal data array
-     * @param auxDetail if true, output the internal Aux array, if it exists.
-     * @param all if true, outputs all entries including empty ones
-     * @return human readable string with optional detail.
-     */
-    std::ostream& to_string(std::ostream& os,
-                            bool summary = true,
-                            bool detail = false,
-                            bool aux_Detail = false,
-                            bool all = false) const;
-  
-    /**
-     * Human readable summary with optional detail
-     * @param summary if true, output the sketch summary
-     * @param detail if true, output the internal data array
-     * @param auxDetail if true, output the internal Aux array, if it exists.
-     * @param all if true, outputs all entries including empty ones
-     * @return human readable string with optional detail.
-     */
-    std::string to_string(bool summary = true,
-                          bool detail = false,
-                          bool aux_detail = false,
-                          bool all = false) const;
-
     /**
      * Update this union operator with the given sketch.
      * @param The given sketch.
@@ -742,12 +671,6 @@ class hll_union_alloc {
     hll_sketch_alloc<A> gadget;
 };
 
-template<typename A>
-static std::ostream& operator<<(std::ostream& os, const hll_sketch_alloc<A>& sketch);
-
-template<typename A>
-static std::ostream& operator<<(std::ostream& os, const hll_union_alloc<A>& union_in);
-
 /// convenience alias for hll_sketch with default allocator
 typedef hll_sketch_alloc<> hll_sketch;
 
diff --git a/be/src/thirdparty/datasketches/kll_helper.hpp b/be/src/thirdparty/datasketches/kll_helper.hpp
new file mode 100644
index 0000000..4857f51
--- /dev/null
+++ b/be/src/thirdparty/datasketches/kll_helper.hpp
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_HELPER_HPP_
+#define KLL_HELPER_HPP_
+
+#include <random>
+#include <stdexcept>
+#include <chrono>
+
+namespace datasketches {
+
+static std::independent_bits_engine<std::mt19937, 1, uint32_t> random_bit(std::chrono::system_clock::now().time_since_epoch().count());
+
+#ifdef KLL_VALIDATION
+extern uint32_t kll_next_offset;
+#endif
+
+// 0 <= power <= 30
+static const uint64_t powers_of_three[] =  {1, 3, 9, 27, 81, 243, 729, 2187, 6561, 19683, 59049, 177147, 531441,
+1594323, 4782969, 14348907, 43046721, 129140163, 387420489, 1162261467,
+3486784401, 10460353203, 31381059609, 94143178827, 282429536481,
+847288609443, 2541865828329, 7625597484987, 22876792454961, 68630377364883,
+205891132094649};
+
+class kll_helper {
+  public:
+    static inline bool is_even(uint32_t value);
+    static inline bool is_odd(uint32_t value);
+    static inline uint8_t floor_of_log2_of_fraction(uint64_t numer, uint64_t denom);
+    static inline uint8_t ub_on_num_levels(uint64_t n);
+    static inline uint32_t compute_total_capacity(uint16_t k, uint8_t m, uint8_t num_levels);
+    static inline uint32_t level_capacity(uint16_t k, uint8_t numLevels, uint8_t height, uint8_t min_wid);
+    static inline uint32_t int_cap_aux(uint16_t k, uint8_t depth);
+    static inline uint32_t int_cap_aux_aux(uint16_t k, uint8_t depth);
+    static inline uint64_t sum_the_sample_weights(uint8_t num_levels, const uint32_t* levels);
+
+    /*
+     * This version is for floating point types
+     * Checks the sequential validity of the given array of values.
+     * They must be unique, monotonically increasing and not NaN.
+     */
+    template <typename T, typename C>
+    static typename std::enable_if<std::is_floating_point<T>::value, void>::type
+    validate_values(const T* values, uint32_t size) {
+      for (uint32_t i = 0; i < size ; i++) {
+        if (std::isnan(values[i])) {
+          throw std::invalid_argument("Values must not be NaN");
+        }
+        if ((i < (size - 1)) && !(C()(values[i], values[i + 1]))) {
+          throw std::invalid_argument("Values must be unique and monotonically increasing");
+        }
+      }
+    }
+    /*
+     * This version is for non-floating point types
+     * Checks the sequential validity of the given array of values.
+     * They must be unique and monotonically increasing.
+     */
+    template <typename T, typename C>
+    static typename std::enable_if<!std::is_floating_point<T>::value, void>::type
+    validate_values(const T* values, uint32_t size) {
+      for (uint32_t i = 0; i < size ; i++) {
+        if ((i < (size - 1)) && !(C()(values[i], values[i + 1]))) {
+          throw std::invalid_argument("Values must be unique and monotonically increasing");
+        }
+      }
+    }
+
+    template <typename T>
+    static void randomly_halve_down(T* buf, uint32_t start, uint32_t length);
+
+    template <typename T>
+    static void randomly_halve_up(T* buf, uint32_t start, uint32_t length);
+
+    // this version moves objects within the same buffer
+    // assumes that destination has initialized objects
+    // does not destroy the originals after the move
+    template <typename T, typename C>
+    static void merge_sorted_arrays(T* buf, uint32_t start_a, uint32_t len_a, uint32_t start_b, uint32_t len_b, uint32_t start_c);
+
+    // this version is to merge from two different buffers into a third buffer
+    // initializes objects is the destination buffer
+    // moves objects from buf_a and destroys the originals
+    // copies objects from buf_b
+    template <typename T, typename C>
+    static void merge_sorted_arrays(const T* buf_a, uint32_t start_a, uint32_t len_a, const T* buf_b, uint32_t start_b, uint32_t len_b, T* buf_c, uint32_t start_c);
+
+    struct compress_result {
+      uint8_t final_num_levels;
+      uint32_t final_capacity;
+      uint32_t final_num_items;
+    };
+
+    /*
+     * Here is what we do for each level:
+     * If it does not need to be compacted, then simply copy it over.
+     *
+     * Otherwise, it does need to be compacted, so...
+     *   Copy zero or one guy over.
+     *   If the level above is empty, halve up.
+     *   Else the level above is nonempty, so...
+     *        halve down, then merge up.
+     *   Adjust the boundaries of the level above.
+     *
+     * It can be proved that general_compress returns a sketch that satisfies the space constraints
+     * no matter how much data is passed in.
+     * All levels except for level zero must be sorted before calling this, and will still be
+     * sorted afterwards.
+     * Level zero is not required to be sorted before, and may not be sorted afterwards.
+     */
+    template <typename T, typename C>
+    static compress_result general_compress(uint16_t k, uint8_t m, uint8_t num_levels_in, T* items,
+            uint32_t* in_levels, uint32_t* out_levels, bool is_level_zero_sorted);
+
+    template<typename T>
+    static void copy_construct(const T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first);
+
+    template<typename T>
+    static void move_construct(T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first, bool destroy);
+
+#ifdef KLL_VALIDATION
+  private:
+
+    static inline uint32_t deterministic_offset();
+#endif
+
+};
+
+} /* namespace datasketches */
+
+#include "kll_helper_impl.hpp"
+
+#endif // KLL_HELPER_HPP_
diff --git a/be/src/thirdparty/datasketches/kll_helper_impl.hpp b/be/src/thirdparty/datasketches/kll_helper_impl.hpp
new file mode 100644
index 0000000..011a78d
--- /dev/null
+++ b/be/src/thirdparty/datasketches/kll_helper_impl.hpp
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_HELPER_IMPL_HPP_
+#define KLL_HELPER_IMPL_HPP_
+
+#include <algorithm>
+
+namespace datasketches {
+
+bool kll_helper::is_even(uint32_t value) {
+  return (value & 1) == 0;
+}
+
+bool kll_helper::is_odd(uint32_t value) {
+  return (value & 1) > 0;
+}
+
+uint8_t kll_helper::floor_of_log2_of_fraction(uint64_t numer, uint64_t denom) {
+  if (denom > numer) return 0;
+  uint8_t count = 0;
+  while (true) {
+    denom <<= 1;
+    if (denom > numer) return count;
+    count++;
+  }
+}
+
+uint8_t kll_helper::ub_on_num_levels(uint64_t n) {
+  if (n == 0) return 1;
+  return 1 + floor_of_log2_of_fraction(n, 1);
+}
+
+uint32_t kll_helper::compute_total_capacity(uint16_t k, uint8_t m, uint8_t num_levels) {
+  uint32_t total = 0;
+  for (uint8_t h = 0; h < num_levels; h++) {
+    total += level_capacity(k, num_levels, h, m);
+  }
+  return total;
+}
+
+uint32_t kll_helper::level_capacity(uint16_t k, uint8_t numLevels, uint8_t height, uint8_t min_wid) {
+  if (height >= numLevels) throw std::invalid_argument("height >= numLevels");
+  const uint8_t depth = numLevels - height - 1;
+  return std::max((uint32_t) min_wid, int_cap_aux(k, depth));
+}
+
+uint32_t kll_helper::int_cap_aux(uint16_t k, uint8_t depth) {
+  if (depth > 60) throw std::invalid_argument("depth > 60");
+  if (depth <= 30) return int_cap_aux_aux(k, depth);
+  const uint8_t half = depth / 2;
+  const uint8_t rest = depth - half;
+  const uint32_t tmp = int_cap_aux_aux(k, half);
+  return int_cap_aux_aux(tmp, rest);
+}
+
+uint32_t kll_helper::int_cap_aux_aux(uint16_t k, uint8_t depth) {
+  if (depth > 30) throw std::invalid_argument("depth > 30");
+  const uint64_t twok = k << 1; // for rounding, we pre-multiply by 2
+  const uint64_t tmp = (uint64_t) (((uint64_t) twok << depth) / powers_of_three[depth]);
+  const uint64_t result = (tmp + 1) >> 1; // then here we add 1 and divide by 2
+  if (result > k) throw std::logic_error("result > k");
+  return result;
+}
+
+uint64_t kll_helper::sum_the_sample_weights(uint8_t num_levels, const uint32_t* levels) {
+  uint64_t total = 0;
+  uint64_t weight = 1;
+  for (uint8_t lvl = 0; lvl < num_levels; lvl++) {
+    total += weight * (levels[lvl + 1] - levels[lvl]);
+    weight *= 2;
+  }
+  return total;
+}
+
+template <typename T>
+void kll_helper::randomly_halve_down(T* buf, uint32_t start, uint32_t length) {
+  if (!is_even(length)) throw std::invalid_argument("length must be even");
+  const uint32_t half_length = length / 2;
+#ifdef KLL_VALIDATION
+  const uint32_t offset = deterministic_offset();
+#else
+  const uint32_t offset = random_bit();
+#endif
+  uint32_t j = start + offset;
+  for (uint32_t i = start; i < (start + half_length); i++) {
+    if (i != j) buf[i] = std::move(buf[j]);
+    j += 2;
+  }
+}
+
+template <typename T>
+void kll_helper::randomly_halve_up(T* buf, uint32_t start, uint32_t length) {
+  if (!is_even(length)) throw std::invalid_argument("length must be even");
+  const uint32_t half_length = length / 2;
+#ifdef KLL_VALIDATION
+  const uint32_t offset = deterministic_offset();
+#else
+  const uint32_t offset = random_bit();
+#endif
+  uint32_t j = (start + length) - 1 - offset;
+  for (uint32_t i = (start + length) - 1; i >= (start + half_length); i--) {
+    if (i != j) buf[i] = std::move(buf[j]);
+    j -= 2;
+  }
+}
+
+// this version moves objects within the same buffer
+// assumes that destination has initialized objects
+// does not destroy the originals after the move
+template <typename T, typename C>
+void kll_helper::merge_sorted_arrays(T* buf, uint32_t start_a, uint32_t len_a, uint32_t start_b, uint32_t len_b, uint32_t start_c) {
+  const uint32_t len_c = len_a + len_b;
+  const uint32_t lim_a = start_a + len_a;
+  const uint32_t lim_b = start_b + len_b;
+  const uint32_t lim_c = start_c + len_c;
+
+  uint32_t a = start_a;
+  uint32_t b = start_b;
+
+  for (uint32_t c = start_c; c < lim_c; c++) {
+    if (a == lim_a) {
+      if (b != c) buf[c] = std::move(buf[b]);
+      b++;
+    } else if (b == lim_b) {
+      if (a != c) buf[c] = std::move(buf[a]);
+      a++;
+    } else if (C()(buf[a], buf[b])) {
+      if (a != c) buf[c] = std::move(buf[a]);
+      a++;
+    } else {
+      if (b != c) buf[c] = std::move(buf[b]);
+      b++;
+    }
+  }
+  if (a != lim_a || b != lim_b) throw std::logic_error("inconsistent state");
+}
+
+// this version is to merge from two different buffers into a third buffer
+// initializes objects is the destination buffer
+// moves objects from buf_a and destroys the originals
+// copies objects from buf_b
+template <typename T, typename C>
+void kll_helper::merge_sorted_arrays(const T* buf_a, uint32_t start_a, uint32_t len_a, const T* buf_b, uint32_t start_b, uint32_t len_b, T* buf_c, uint32_t start_c) {
+  const uint32_t len_c = len_a + len_b;
+  const uint32_t lim_a = start_a + len_a;
+  const uint32_t lim_b = start_b + len_b;
+  const uint32_t lim_c = start_c + len_c;
+
+  uint32_t a = start_a;
+  uint32_t b = start_b;
+
+  for (uint32_t c = start_c; c < lim_c; c++) {
+    if (a == lim_a) {
+      new (&buf_c[c]) T(buf_b[b++]);
+    } else if (b == lim_b) {
+      new (&buf_c[c]) T(std::move(buf_a[a]));
+      buf_a[a++].~T();
+    } else if (C()(buf_a[a], buf_b[b])) {
+      new (&buf_c[c]) T(std::move(buf_a[a]));
+      buf_a[a++].~T();
+    } else {
+      new (&buf_c[c]) T(buf_b[b++]);
+    }
+  }
+  if (a != lim_a || b != lim_b) throw std::logic_error("inconsistent state");
+}
+
+/*
+ * Here is what we do for each level:
+ * If it does not need to be compacted, then simply copy it over.
+ *
+ * Otherwise, it does need to be compacted, so...
+ *   Copy zero or one guy over.
+ *   If the level above is empty, halve up.
+ *   Else the level above is nonempty, so...
+ *        halve down, then merge up.
+ *   Adjust the boundaries of the level above.
+ *
+ * It can be proved that general_compress returns a sketch that satisfies the space constraints
+ * no matter how much data is passed in.
+ * All levels except for level zero must be sorted before calling this, and will still be
+ * sorted afterwards.
+ * Level zero is not required to be sorted before, and may not be sorted afterwards.
+ */
+template <typename T, typename C>
+kll_helper::compress_result kll_helper::general_compress(uint16_t k, uint8_t m, uint8_t num_levels_in, T* items,
+        uint32_t* in_levels, uint32_t* out_levels, bool is_level_zero_sorted)
+{
+  if (num_levels_in == 0) throw std::invalid_argument("num_levels_in == 0"); // things are too weird if zero levels are allowed
+  const uint32_t starting_item_count = in_levels[num_levels_in] - in_levels[0];
+  uint8_t current_num_levels = num_levels_in;
+  uint32_t current_item_count = starting_item_count; // decreases with each compaction
+  uint32_t target_item_count = compute_total_capacity(k, m, current_num_levels); // increases if we add levels
+  bool done_yet = false;
+  out_levels[0] = 0;
+  uint8_t current_level = 0;
+  while (!done_yet) {
+
+    // If we are at the current top level, add an empty level above it for convenience,
+    // but do not increment num_levels until later
+    if (current_level == (current_num_levels - 1)) {
+      in_levels[current_level + 2] = in_levels[current_level + 1];
+    }
+
+    const auto raw_beg = in_levels[current_level];
+    const auto raw_lim = in_levels[current_level + 1];
+    const auto raw_pop = raw_lim - raw_beg;
+
+    if ((current_item_count < target_item_count) || (raw_pop < level_capacity(k, current_num_levels, current_level, m))) {
+      // move level over as is
+      // make sure we are not moving data upwards
+      if (raw_beg < out_levels[current_level]) throw std::logic_error("wrong move");
+      std::move(&items[raw_beg], &items[raw_lim], &items[out_levels[current_level]]);
+      out_levels[current_level + 1] = out_levels[current_level] + raw_pop;
+    } else {
+      // The sketch is too full AND this level is too full, so we compact it
+      // Note: this can add a level and thus change the sketches capacities
+
+      const auto pop_above = in_levels[current_level + 2] - raw_lim;
+      const bool odd_pop = is_odd(raw_pop);
+      const auto adj_beg = odd_pop ? 1 + raw_beg : raw_beg;
+      const auto adj_pop = odd_pop ? raw_pop - 1 : raw_pop;
+      const auto half_adj_pop = adj_pop / 2;
+
+      if (odd_pop) { // move one guy over
+        items[out_levels[current_level]] = std::move(items[raw_beg]);
+        out_levels[current_level + 1] = out_levels[current_level] + 1;
+      } else { // even number of items
+        out_levels[current_level + 1] = out_levels[current_level];
+      }
+
+      // level zero might not be sorted, so we must sort it if we wish to compact it
+      if ((current_level == 0) && !is_level_zero_sorted) {
+        std::sort(&items[adj_beg], &items[adj_beg + adj_pop], C());
+      }
+
+      if (pop_above == 0) { // Level above is empty, so halve up
+        randomly_halve_up(items, adj_beg, adj_pop);
+      } else { // Level above is nonempty, so halve down, then merge up
+        randomly_halve_down(items, adj_beg, adj_pop);
+        merge_sorted_arrays<T, C>(items, adj_beg, half_adj_pop, raw_lim, pop_above, adj_beg + half_adj_pop);
+      }
+
+      // track the fact that we just eliminated some data
+      current_item_count -= half_adj_pop;
+
+      // adjust the boundaries of the level above
+      in_levels[current_level + 1] = in_levels[current_level + 1] - half_adj_pop;
+
+      // increment num_levels if we just compacted the old top level
+      // this creates some more capacity (the size of the new bottom level)
+      if (current_level == (current_num_levels - 1)) {
+        current_num_levels++;
+        target_item_count += level_capacity(k, current_num_levels, 0, m);
+      }
+
+    } // end of code for compacting a level
+
+    // determine whether we have processed all levels yet (including any new levels that we created)
+
+    if (current_level == (current_num_levels - 1)) done_yet = true;
+    current_level++;
+  } // end of loop over levels
+
+  if ((out_levels[current_num_levels] - out_levels[0]) != current_item_count) throw std::logic_error("inconsistent state");
+
+  for (uint32_t i = current_item_count; i < starting_item_count; i++) items[i].~T();
+
+  compress_result result;
+  result.final_num_levels = current_num_levels;
+  result.final_capacity = target_item_count;
+  result.final_num_items = current_item_count;
+  return result;
+}
+
+template<typename T>
+void kll_helper::copy_construct(const T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first) {
+  while (src_first != src_last) {
+    new (&dst[dst_first++]) T(src[src_first++]);
+  }
+}
+
+template<typename T>
+void kll_helper::move_construct(T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first, bool destroy) {
+  while (src_first != src_last) {
+    new (&dst[dst_first++]) T(std::move(src[src_first]));
+    if (destroy) src[src_first].~T();
+    src_first++;
+  }
+}
+
+#ifdef KLL_VALIDATION
+uint32_t kll_helper::deterministic_offset() {
+  const uint32_t result(kll_next_offset);
+  kll_next_offset = 1 - kll_next_offset;
+  return result;
+}
+#endif
+
+} /* namespace datasketches */
+
+#endif // KLL_HELPER_IMPL_HPP_
diff --git a/be/src/thirdparty/datasketches/kll_quantile_calculator.hpp b/be/src/thirdparty/datasketches/kll_quantile_calculator.hpp
new file mode 100644
index 0000000..f77071e
--- /dev/null
+++ b/be/src/thirdparty/datasketches/kll_quantile_calculator.hpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_QUANTILE_CALCULATOR_HPP_
+#define KLL_QUANTILE_CALCULATOR_HPP_
+
+#include <memory>
+
+namespace datasketches {
+
+template <typename T, typename C, typename A>
+class kll_quantile_calculator {
+  typedef typename std::allocator_traits<A>::template rebind_alloc<uint32_t> AllocU32;
+  typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64;
+  public:
+    // assumes that all levels are sorted including level 0
+    kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n);
+    ~kll_quantile_calculator();
+    T get_quantile(double fraction) const;
+
+  private:
+    uint64_t n_;
+    T* items_;
+    uint64_t* weights_;
+    uint32_t* levels_;
+    uint8_t levels_size_;
+    uint8_t num_levels_;
+
+    void populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
+    T approximately_answer_positional_query(uint64_t pos) const;
+    static void convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size);
+    static uint64_t pos_of_phi(double phi, uint64_t n);
+    static uint32_t chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos);
+    static uint32_t search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r);
+    static void blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
+    static void blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
+    static void tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2);
+};
+
+} /* namespace datasketches */
+
+#include "kll_quantile_calculator_impl.hpp"
+
+#endif // KLL_QUANTILE_CALCULATOR_HPP_
diff --git a/be/src/thirdparty/datasketches/kll_quantile_calculator_impl.hpp b/be/src/thirdparty/datasketches/kll_quantile_calculator_impl.hpp
new file mode 100644
index 0000000..d4f4c04
--- /dev/null
+++ b/be/src/thirdparty/datasketches/kll_quantile_calculator_impl.hpp
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_QUANTILE_CALCULATOR_IMPL_HPP_
+#define KLL_QUANTILE_CALCULATOR_IMPL_HPP_
+
+#include <memory>
+#include <cmath>
+
+#include "kll_helper.hpp"
+
+namespace datasketches {
+
+template <typename T, typename C, typename A>
+kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n) {
+  n_ = n;
+  const uint32_t num_items = levels[num_levels] - levels[0];
+  items_ = A().allocate(num_items);
+  weights_ = AllocU64().allocate(num_items + 1); // one more is intentional
+  levels_size_ = num_levels + 1;
+  levels_ = AllocU32().allocate(levels_size_);
+  populate_from_sketch(items, num_items, levels, num_levels);
+  blocky_tandem_merge_sort(items_, weights_, num_items, levels_, num_levels_);
+  convert_to_preceding_cummulative(weights_, num_items + 1);
+}
+
+template <typename T, typename C, typename A>
+kll_quantile_calculator<T, C, A>::~kll_quantile_calculator() {
+  const uint32_t num_items = levels_[num_levels_] - levels_[0];
+  for (uint32_t i = 0; i < num_items; i++) items_[i].~T();
+  A().deallocate(items_, num_items);
+  AllocU64().deallocate(weights_, num_items + 1);
+  AllocU32().deallocate(levels_, levels_size_);
+}
+
+template <typename T, typename C, typename A>
+T kll_quantile_calculator<T, C, A>::get_quantile(double fraction) const {
+  return approximately_answer_positional_query(pos_of_phi(fraction, n_));
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
+  kll_helper::copy_construct<T>(items, levels[0], levels[num_levels], items_, 0);
+  uint8_t src_level = 0;
+  uint8_t dst_level = 0;
+  uint64_t weight = 1;
+  uint32_t offset = levels[0];
+  while (src_level < num_levels) {
+    const uint32_t from_index(levels[src_level] - offset);
+    const uint32_t to_index(levels[src_level + 1] - offset); // exclusive
+    if (from_index < to_index) { // skip empty levels
+      std::fill(&weights_[from_index], &weights_[to_index], weight);
+      levels_[dst_level] = from_index;
+      levels_[dst_level + 1] = to_index;
+      dst_level++;
+    }
+    src_level++;
+    weight *= 2;
+  }
+  weights_[num_items] = 0;
+  num_levels_ = dst_level;
+}
+
+template <typename T, typename C, typename A>
+T kll_quantile_calculator<T, C, A>::approximately_answer_positional_query(uint64_t pos) const {
+  if (pos >= n_) throw std::logic_error("position out of range");
+  const uint32_t weights_size(levels_[num_levels_] + 1);
+  const uint32_t index = chunk_containing_pos(weights_, weights_size, pos);
+  return items_[index];
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size) {
+  uint64_t subtotal(0);
+  for (uint32_t i = 0; i < weights_size; i++) {
+    const uint32_t new_subtotal = subtotal + weights[i];
+    weights[i] = subtotal;
+    subtotal = new_subtotal;
+  }
+}
+
+template <typename T, typename C, typename A>
+uint64_t kll_quantile_calculator<T, C, A>::pos_of_phi(double phi, uint64_t n) {
+  const uint64_t pos = std::floor(phi * n);
+  return (pos == n) ? n - 1 : pos;
+}
+
+template <typename T, typename C, typename A>
+uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos) {
+  if (weights_size <= 1) throw std::logic_error("weights array too short"); // weights_ contains an "extra" position
+  const uint32_t nominal_length(weights_size - 1);
+  if (pos < weights[0]) throw std::logic_error("position too small");
+  if (pos >= weights[nominal_length]) throw std::logic_error("position too large");
+  return search_for_chunk_containing_pos(weights, pos, 0, nominal_length);
+}
+
+template <typename T, typename C, typename A>
+uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r) {
+  if (l + 1 == r) {
+    return l;
+  }
+  const uint32_t m(l + (r - l) / 2);
+  if (arr[m] <= pos) {
+    return search_for_chunk_containing_pos(arr, pos, m, r);
+  }
+  return search_for_chunk_containing_pos(arr, pos, l, m);
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
+  if (num_levels == 1) return;
+
+  // move the input in preparation for the "ping-pong" reduction strategy
+  auto tmp_items_deleter = [num_items](T* ptr) {
+    for (uint32_t i = 0; i < num_items; i++) ptr[i].~T();
+    A().deallocate(ptr, num_items);
+  };
+  std::unique_ptr<T, decltype(tmp_items_deleter)> tmp_items(A().allocate(num_items), tmp_items_deleter);
+  kll_helper::move_construct<T>(items, 0, num_items, tmp_items.get(), 0, false); // do not destroy since the items will be moved back
+  auto tmp_weights_deleter = [num_items](uint64_t* ptr) { AllocU64().deallocate(ptr, num_items); };
+  std::unique_ptr<uint64_t[], decltype(tmp_weights_deleter)> tmp_weights(AllocU64().allocate(num_items), tmp_weights_deleter); // don't need the extra one here
+  std::copy(weights, &weights[num_items], tmp_weights.get());
+  blocky_tandem_merge_sort_recursion(tmp_items.get(), tmp_weights.get(), items, weights, levels, 0, num_levels);
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels) {
+  if (num_levels == 1) return;
+  const uint8_t num_levels_1 = num_levels / 2;
+  const uint8_t num_levels_2 = num_levels - num_levels_1;
+  if (num_levels_1 < 1) throw std::logic_error("level above 0 expected");
+  if (num_levels_2 < num_levels_1) throw std::logic_error("wrong order of levels");
+  const uint8_t starting_level_1 = starting_level;
+  const uint8_t starting_level_2 = starting_level + num_levels_1;
+  // swap roles of src and dst
+  blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_1, num_levels_1);
+  blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_2, num_levels_2);
+  tandem_merge(items_src, weights_src, items_dst, weights_dst, levels, starting_level_1, num_levels_1, starting_level_2, num_levels_2);
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2) {
+  const auto from_index_1 = levels[starting_level_1];
+  const auto to_index_1 = levels[starting_level_1 + num_levels_1]; // exclusive
+  const auto from_index_2 = levels[starting_level_2];
+  const auto to_index_2 = levels[starting_level_2 + num_levels_2]; // exclusive
+  auto i_src_1 = from_index_1;
+  auto i_src_2 = from_index_2;
+  auto i_dst = from_index_1;
+
+  while ((i_src_1 < to_index_1) && (i_src_2 < to_index_2)) {
+    if (C()(items_src[i_src_1], items_src[i_src_2])) {
+      items_dst[i_dst] = std::move(items_src[i_src_1]);
+      weights_dst[i_dst] = weights_src[i_src_1];
+      i_src_1++;
+    } else {
+      items_dst[i_dst] = std::move(items_src[i_src_2]);
+      weights_dst[i_dst] = weights_src[i_src_2];
+      i_src_2++;
+    }
+    i_dst++;
+  }
+  if (i_src_1 < to_index_1) {
+    std::move(&items_src[i_src_1], &items_src[to_index_1], &items_dst[i_dst]);
+    std::copy(&weights_src[i_src_1], &weights_src[to_index_1], &weights_dst[i_dst]);
+  } else if (i_src_2 < to_index_2) {
+    std::move(&items_src[i_src_2], &items_src[to_index_2], &items_dst[i_dst]);
+    std::copy(&weights_src[i_src_2], &weights_src[to_index_2], &weights_dst[i_dst]);
+  }
+}
+
+} /* namespace datasketches */
+
+#endif // KLL_QUANTILE_CALCULATOR_IMPL_HPP_
diff --git a/be/src/thirdparty/datasketches/kll_sketch.hpp b/be/src/thirdparty/datasketches/kll_sketch.hpp
new file mode 100644
index 0000000..8b3409c
--- /dev/null
+++ b/be/src/thirdparty/datasketches/kll_sketch.hpp
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_SKETCH_HPP_
+#define KLL_SKETCH_HPP_
+
+#include <functional>
+#include <memory>
+#include <vector>
+
+#include "kll_quantile_calculator.hpp"
+#include "common_defs.hpp"
+#include "serde.hpp"
+
+namespace datasketches {
+
+/*
+ * Implementation of a very compact quantiles sketch with lazy compaction scheme
+ * and nearly optimal accuracy per retained item.
+ * See <a href="https://arxiv.org/abs/1603.05346v2">Optimal Quantile Approximation in Streams</a>.
+ *
+ * <p>This is a stochastic streaming sketch that enables near-real time analysis of the
+ * approximate distribution of values from a very large stream in a single pass, requiring only
+ * that the values are comparable.
+ * The analysis is obtained using <i>get_quantile()</i> or <i>get_quantiles()</i> functions or the
+ * inverse functions get_rank(), get_PMF() (Probability Mass Function), and get_CDF()
+ * (Cumulative Distribution Function).
+ *
+ * <p>As of May 2020, this implementation produces serialized sketches which are binary-compatible
+ * with the equivalent Java implementation only when template parameter T = float
+ * (32-bit single precision values).
+ * 
+ * <p>Given an input stream of <i>N</i> numeric values, the <i>absolute rank</i> of any specific
+ * value is defined as its index <i>(0 to N-1)</i> in the hypothetical sorted stream of all
+ * <i>N</i> input values.
+ *
+ * <p>The <i>normalized rank</i> (<i>rank</i>) of any specific value is defined as its
+ * <i>absolute rank</i> divided by <i>N</i>.
+ * Thus, the <i>normalized rank</i> is a value between zero and one.
+ * In the documentation for this sketch <i>absolute rank</i> is never used so any
+ * reference to just <i>rank</i> should be interpreted to mean <i>normalized rank</i>.
+ *
+ * <p>This sketch is configured with a parameter <i>k</i>, which affects the size of the sketch
+ * and its estimation error.
+ *
+ * <p>The estimation error is commonly called <i>epsilon</i> (or <i>eps</i>) and is a fraction
+ * between zero and one. Larger values of <i>k</i> result in smaller values of epsilon.
+ * Epsilon is always with respect to the rank and cannot be applied to the
+ * corresponding values.
+ *
+ * <p>The relationship between the normalized rank and the corresponding values can be viewed
+ * as a two dimensional monotonic plot with the normalized rank on one axis and the
+ * corresponding values on the other axis. If the y-axis is specified as the value-axis and
+ * the x-axis as the normalized rank, then <i>y = get_quantile(x)</i> is a monotonically
+ * increasing function.
+ *
+ * <p>The functions <i>get_quantile(rank)</i> and get_quantiles(...) translate ranks into
+ * corresponding values. The functions <i>get_rank(value),
+ * get_CDF(...) (Cumulative Distribution Function), and get_PMF(...)
+ * (Probability Mass Function)</i> perform the opposite operation and translate values into ranks.
+ *
+ * <p>The <i>getPMF(...)</i> function has about 13 to 47% worse rank error (depending
+ * on <i>k</i>) than the other queries because the mass of each "bin" of the PMF has
+ * "double-sided" error from the upper and lower edges of the bin as a result of a subtraction,
+ * as the errors from the two edges can sometimes add.
+ *
+ * <p>The default <i>k</i> of 200 yields a "single-sided" epsilon of about 1.33% and a
+ * "double-sided" (PMF) epsilon of about 1.65%.
+ *
+ * <p>A <i>get_quantile(rank)</i> query has the following guarantees:
+ * <ul>
+ * <li>Let <i>v = get_quantile(r)</i> where <i>r</i> is the rank between zero and one.</li>
+ * <li>The value <i>v</i> will be a value from the input stream.</li>
+ * <li>Let <i>trueRank</i> be the true rank of <i>v</i> derived from the hypothetical sorted
+ * stream of all <i>N</i> values.</li>
+ * <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
+ * <li>Then <i>r - eps &le; trueRank &le; r + eps</i> with a confidence of 99%. Note that the
+ * error is on the rank, not the value.</li>
+ * </ul>
+ *
+ * <p>A <i>get_rank(value)</i> query has the following guarantees:
+ * <ul>
+ * <li>Let <i>r = get_rank(v)</i> where <i>v</i> is a value between the min and max values of
+ * the input stream.</li>
+ * <li>Let <i>true_rank</i> be the true rank of <i>v</i> derived from the hypothetical sorted
+ * stream of all <i>N</i> values.</li>
+ * <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
+ * <li>Then <i>r - eps &le; trueRank &le; r + eps</i> with a confidence of 99%.</li>
+ * </ul>
+ *
+ * <p>A <i>get_PMF()</i> query has the following guarantees:
+ * <ul>
+ * <li>Let <i>{r1, r2, ..., r(m+1)} = get_PMF(v1, v2, ..., vm)</i> where <i>v1, v2</i> are values
+ * between the min and max values of the input stream.
+ * <li>Let <i>mass<sub>i</sub> = estimated mass between v<sub>i</sub> and v<sub>i+1</sub></i>.</li>
+ * <li>Let <i>trueMass</i> be the true mass between the values of <i>v<sub>i</sub>,
+ * v<sub>i+1</sub></i> derived from the hypothetical sorted stream of all <i>N</i> values.</li>
+ * <li>Let <i>eps = get_normalized_rank_error(true)</i>.</li>
+ * <li>then <i>mass - eps &le; trueMass &le; mass + eps</i> with a confidence of 99%.</li>
+ * <li>r(m+1) includes the mass of all points larger than vm.</li>
+ * </ul>
+ *
+ * <p>A <i>get_CDF(...)</i> query has the following guarantees;
+ * <ul>
+ * <li>Let <i>{r1, r2, ..., r(m+1)} = get_CDF(v1, v2, ..., vm)</i> where <i>v1, v2</i> are values
+ * between the min and max values of the input stream.
+ * <li>Let <i>mass<sub>i</sub> = r<sub>i+1</sub> - r<sub>i</sub></i>.</li>
+ * <li>Let <i>trueMass</i> be the true mass between the true ranks of <i>v<sub>i</sub>,
+ * v<sub>i+1</sub></i> derived from the hypothetical sorted stream of all <i>N</i> values.</li>
+ * <li>Let <i>eps = get_normalized_rank_error(true)</i>.</li>
+ * <li>then <i>mass - eps &le; trueMass &le; mass + eps</i> with a confidence of 99%.</li>
+ * <li>1 - r(m+1) includes the mass of all points larger than vm.</li>
+ * </ul>
+ *
+ * <p>From the above, it might seem like we could make some estimates to bound the
+ * <em>value</em> returned from a call to <em>get_quantile()</em>. The sketch, however, does not
+ * let us derive error bounds or confidences around values. Because errors are independent, we
+ * can approximately bracket a value as shown below, but there are no error estimates available.
+ * Additionally, the interval may be quite large for certain distributions.
+ * <ul>
+ * <li>Let <i>v = get_quantile(r)</i>, the estimated quantile value of rank <i>r</i>.</li>
+ * <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
+ * <li>Let <i>v<sub>lo</sub></i> = estimated quantile value of rank <i>(r - eps)</i>.</li>
+ * <li>Let <i>v<sub>hi</sub></i> = estimated quantile value of rank <i>(r + eps)</i>.</li>
+ * <li>Then <i>v<sub>lo</sub> &le; v &le; v<sub>hi</sub></i>, with 99% confidence.</li>
+ * </ul>
+ *
+ * author Kevin Lang
+ * author Alexander Saydakov
+ * author Lee Rhodes
+ */
+
+template<typename A> using AllocU8 = typename std::allocator_traits<A>::template rebind_alloc<uint8_t>;
+template<typename A> using vector_u8 = std::vector<uint8_t, AllocU8<A>>;
+template<typename A> using AllocU32 = typename std::allocator_traits<A>::template rebind_alloc<uint32_t>;
+template<typename A> using vector_u32 = std::vector<uint32_t, AllocU32<A>>;
+template<typename A> using AllocD = typename std::allocator_traits<A>::template rebind_alloc<double>;
+template<typename A> using vector_d = std::vector<double, AllocD<A>>;
+
+template <typename T, typename C = std::less<T>, typename S = serde<T>, typename A = std::allocator<T>>
+class kll_sketch {
+  public:
+    static const uint8_t DEFAULT_M = 8;
+    static const uint16_t DEFAULT_K = 200;
+    static const uint16_t MIN_K = DEFAULT_M;
+    static const uint16_t MAX_K = (1 << 16) - 1;
+
+    explicit kll_sketch(uint16_t k = DEFAULT_K);
+    kll_sketch(const kll_sketch& other);
+    kll_sketch(kll_sketch&& other) noexcept;
+    ~kll_sketch();
+    kll_sketch& operator=(const kll_sketch& other);
+    kll_sketch& operator=(kll_sketch&& other);
+
+    /**
+     * Updates this sketch with the given data item.
+     * This method takes lvalue.
+     * @param value an item from a stream of items
+     */
+    void update(const T& value);
+
+    /**
+     * Updates this sketch with the given data item.
+     * This method takes rvalue.
+     * @param value an item from a stream of items
+     */
+    void update(T&& value);
+
+    /**
+     * Merges another sketch into this one.
+     * This method takes lvalue.
+     * @param other sketch to merge into this one
+     */
+    void merge(const kll_sketch& other);
+
+    /**
+     * Merges another sketch into this one.
+     * This method takes rvalue.
+     * @param other sketch to merge into this one
+     */
+    void merge(kll_sketch&& other);
+
+    /**
+     * Returns true if this sketch is empty.
+     * @return empty flag
+     */
+    bool is_empty() const;
+
+    /**
+     * Returns the length of the input stream.
+     * @return stream length
+     */
+    uint64_t get_n() const;
+
+    /**
+     * Returns the number of retained items (samples) in the sketch.
+     * @return the number of retained items
+     */
+    uint32_t get_num_retained() const;
+
+    /**
+     * Returns true if this sketch is in estimation mode.
+     * @return estimation mode flag
+     */
+    bool is_estimation_mode() const;
+
+    /**
+     * Returns the min value of the stream.
+     * For floating point types: if the sketch is empty this returns NaN.
+     * For other types: if the sketch is empty this throws runtime_error.
+     * @return the min value of the stream
+     */
+    T get_min_value() const;
+
+    /**
+     * Returns the max value of the stream.
+     * For floating point types: if the sketch is empty this returns NaN.
+     * For other types: if the sketch is empty this throws runtime_error.
+     * @return the max value of the stream
+     */
+    T get_max_value() const;
+
+    /**
+     * Returns an approximation to the value of the data item
+     * that would be preceded by the given fraction of a hypothetical sorted
+     * version of the input stream so far.
+     * <p>
+     * Note that this method has a fairly large overhead (microseconds instead of nanoseconds)
+     * so it should not be called multiple times to get different quantiles from the same
+     * sketch. Instead use get_quantiles(), which pays the overhead only once.
+     * <p>
+     * For floating point types: if the sketch is empty this returns NaN.
+     * For other types: if the sketch is empty this throws runtime_error.
+     *
+     * @param fraction the specified fractional position in the hypothetical sorted stream.
+     * These are also called normalized ranks or fractional ranks.
+     * If fraction = 0.0, the true minimum value of the stream is returned.
+     * If fraction = 1.0, the true maximum value of the stream is returned.
+     *
+     * @return the approximation to the value at the given fraction
+     */
+    T get_quantile(double fraction) const;
+
+    /**
+     * This is a more efficient multiple-query version of get_quantile().
+     * <p>
+     * This returns an array that could have been generated by using get_quantile() for each
+     * fractional rank separately, but would be very inefficient.
+     * This method incurs the internal set-up overhead once and obtains multiple quantile values in
+     * a single query. It is strongly recommend that this method be used instead of multiple calls
+     * to get_quantile().
+     *
+     * <p>If the sketch is empty this returns an empty vector.
+     *
+     * @param fractions given array of fractional positions in the hypothetical sorted stream.
+     * These are also called normalized ranks or fractional ranks.
+     * These fractions must be in the interval [0.0, 1.0], inclusive.
+     *
+     * @return array of approximations to the given fractions in the same order as given fractions
+     * in the input array.
+     */
+    std::vector<T, A> get_quantiles(const double* fractions, uint32_t size) const;
+
+    /**
+     * This is a multiple-query version of get_quantile() that allows the caller to
+     * specify the number of evenly-spaced fractional ranks.
+     *
+     * <p>If the sketch is empty this returns an empty vector.
+     *
+     * @param num an integer that specifies the number of evenly-spaced fractional ranks.
+     * This must be an integer greater than 0. A value of 1 will return the min value.
+     * A value of 2 will return the min and the max value. A value of 3 will return the min,
+     * the median and the max value, etc.
+     *
+     * @return array of approximations to the given number of evenly-spaced fractional ranks.
+     */
+    std::vector<T, A> get_quantiles(size_t num) const;
+
+    /**
+     * Returns an approximation to the normalized (fractional) rank of the given value from 0 to 1,
+     * inclusive.
+     *
+     * <p>The resulting approximation has a probabilistic guarantee that can be obtained from the
+     * get_normalized_rank_error(false) function.
+     *
+     * <p>If the sketch is empty this returns NaN.
+     *
+     * @param value to be ranked
+     * @return an approximate rank of the given value
+     */
+    double get_rank(const T& value) const;
+
+    /**
+     * Returns an approximation to the Probability Mass Function (PMF) of the input stream
+     * given a set of split points (values).
+     *
+     * <p>The resulting approximations have a probabilistic guarantee that can be obtained from the
+     * get_normalized_rank_error(true) function.
+     *
+     * <p>If the sketch is empty this returns an empty vector.
+     *
+     * @param split_points an array of <i>m</i> unique, monotonically increasing float values
+     * that divide the real number line into <i>m+1</i> consecutive disjoint intervals.
+     * The definition of an "interval" is inclusive of the left split point (or minimum value) and
+     * exclusive of the right split point, with the exception that the last interval will include
+     * the maximum value.
+     * It is not necessary to include either the min or max values in these split points.
+     *
+     * @return an array of m+1 doubles each of which is an approximation
+     * to the fraction of the input stream values (the mass) that fall into one of those intervals.
+     * The definition of an "interval" is inclusive of the left split point and exclusive of the right
+     * split point, with the exception that the last interval will include maximum value.
+     */
+    vector_d<A> get_PMF(const T* split_points, uint32_t size) const;
+
+    /**
+     * Returns an approximation to the Cumulative Distribution Function (CDF), which is the
+     * cumulative analog of the PMF, of the input stream given a set of split points (values).
+     *
+     * <p>The resulting approximations have a probabilistic guarantee that can be obtained from the
+     * get_normalized_rank_error(false) function.
+     *
+     * <p>If the sketch is empty this returns an empty vector.
+     *
+     * @param split_points an array of <i>m</i> unique, monotonically increasing float values
+     * that divide the real number line into <i>m+1</i> consecutive disjoint intervals.
+     * The definition of an "interval" is inclusive of the left split point (or minimum value) and
+     * exclusive of the right split point, with the exception that the last interval will include
+     * the maximum value.
+     * It is not necessary to include either the min or max values in these split points.
+     *
+     * @return an array of m+1 double values, which are a consecutive approximation to the CDF
+     * of the input stream given the split_points. The value at array position j of the returned
+     * CDF array is the sum of the returned values in positions 0 through j of the returned PMF
+     * array.
+     */
+    vector_d<A> get_CDF(const T* split_points, uint32_t size) const;
+
+    /**
+     * Gets the approximate rank error of this sketch normalized as a fraction between zero and one.
+     * @param pmf if true, returns the "double-sided" normalized rank error for the get_PMF() function.
+     * Otherwise, it is the "single-sided" normalized rank error for all the other queries.
+     * @return if pmf is true, returns the normalized rank error for the get_PMF() function.
+     * Otherwise, it is the "single-sided" normalized rank error for all the other queries.
+     */
+    double get_normalized_rank_error(bool pmf) const;
+
+    /**
+     * Computes size needed to serialize the current state of the sketch.
+     * This version is for fixed-size arithmetic types (integral and floating point).
+     * @return size in bytes needed to serialize this sketch
+     */
+    template<typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
+    size_t get_serialized_size_bytes() const;
+
+    /**
+     * Computes size needed to serialize the current state of the sketch.
+     * This version is for all other types and can be expensive since every item needs to be looked at.
+     * @return size in bytes needed to serialize this sketch
+     */
+    template<typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
+    size_t get_serialized_size_bytes() const;
+
+    /**
+     * This method serializes the sketch into a given stream in a binary form
+     * @param os output stream
+     */
+    void serialize(std::ostream& os) const;
+
+    // This is a convenience alias for users
+    // The type returned by the following serialize method
+    typedef vector_u8<A> vector_bytes;
+
+    /**
+     * This method serializes the sketch as a vector of bytes.
+     * An optional header can be reserved in front of the sketch.
+     * It is a blank space of a given size.
+     * This header is used in Datasketches PostgreSQL extension.
+     * @param header_size_bytes space to reserve in front of the sketch
+     */
+    vector_bytes serialize(unsigned header_size_bytes = 0) const;
+
+    /**
+     * This method deserializes a sketch from a given stream.
+     * @param is input stream
+     * @return an instance of a sketch
+     */
+    static kll_sketch<T, C, S, A> deserialize(std::istream& is);
+
+    /**
+     * This method deserializes a sketch from a given array of bytes.
+     * @param bytes pointer to the array of bytes
+     * @param size the size of the array
+     * @return an instance of a sketch
+     */
+    static kll_sketch<T, C, S, A> deserialize(const void* bytes, size_t size);
+
+    /*
+     * Gets the normalized rank error given k and pmf.
+     * k - the configuration parameter
+     * pmf - if true, returns the "double-sided" normalized rank error for the get_PMF() function.
+     * Otherwise, it is the "single-sided" normalized rank error for all the other queries.
+     * Constants were derived as the best fit to 99 percentile empirically measured max error in thousands of trials
+     */
+    static double get_normalized_rank_error(uint16_t k, bool pmf);
+
+    /**
+     * Prints a summary of the sketch.
+     * @param print_levels if true include information about levels
+     * @param print_items if true include sketch data
+     */
+    string<A> to_string(bool print_levels = false, bool print_items = false) const;
+
+    class const_iterator;
+    const_iterator begin() const;
+    const_iterator end() const;
+
+    #ifdef KLL_VALIDATION
+    uint8_t get_num_levels() { return num_levels_; }
+    uint32_t* get_levels() { return levels_; }
+    T* get_items() { return items_; }
+    #endif
+
+  private:
+    /* Serialized sketch layout:
+     *  Adr:
+     *      ||    7    |   6   |    5   |    4   |    3   |    2    |    1   |      0       |
+     *  0   || unused  |   M   |--------K--------|  Flags |  FamID  | SerVer | PreambleInts |
+     *      ||   15    |   14  |   13   |   12   |   11   |   10    |    9   |      8       |
+     *  1   ||-----------------------------------N------------------------------------------|
+     *      ||   23    |   22  |   21   |   20   |   19   |    18   |   17   |      16      |
+     *  2   ||---------------data----------------|-unused-|numLevels|-------min K-----------|
+     */
+
+    static const size_t EMPTY_SIZE_BYTES = 8;
+    static const size_t DATA_START_SINGLE_ITEM = 8;
+    static const size_t DATA_START = 20;
+
+    static const uint8_t SERIAL_VERSION_1 = 1;
+    static const uint8_t SERIAL_VERSION_2 = 2;
+    static const uint8_t FAMILY = 15;
+
+    enum flags { IS_EMPTY, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM };
+
+    static const uint8_t PREAMBLE_INTS_SHORT = 2; // for empty and single item
+    static const uint8_t PREAMBLE_INTS_FULL = 5;
+
+    uint16_t k_;
+    uint8_t m_; // minimum buffer "width"
+    uint16_t min_k_; // for error estimation after merging with different k
+    uint64_t n_;
+    uint8_t num_levels_;
+    vector_u32<A> levels_;
+    T* items_;
+    uint32_t items_size_;
+    T* min_value_;
+    T* max_value_;
+    bool is_level_zero_sorted_;
+
+    // for deserialization
+    class item_deleter;
+    class items_deleter;
+    kll_sketch(uint16_t k, uint16_t min_k, uint64_t n, uint8_t num_levels, vector_u32<A>&& levels,
+        std::unique_ptr<T, items_deleter> items, uint32_t items_size, std::unique_ptr<T, item_deleter> min_value,
+        std::unique_ptr<T, item_deleter> max_value, bool is_level_zero_sorted);
+
+    // common update code
+    inline void update_min_max(const T& value);
+    inline uint32_t internal_update();
+
+    // The following code is only valid in the special case of exactly reaching capacity while updating.
+    // It cannot be used while merging, while reducing k, or anything else.
+    void compress_while_updating(void);
+
+    uint8_t find_level_to_compact() const;
+    void add_empty_top_level_to_completely_full_sketch();
+    void sort_level_zero();
+    std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> get_quantile_calculator();
+    vector_d<A> get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const;
+    void increment_buckets_unsorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
+        const T* split_points, uint32_t size, double* buckets) const;
+    void increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
+        const T* split_points, uint32_t size, double* buckets) const;
+    template<typename O> void merge_higher_levels(O&& other, uint64_t final_n);
+    void populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
+    void populate_work_arrays(kll_sketch&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
+    void assert_correct_total_weight() const;
+    uint32_t safe_level_size(uint8_t level) const;
+    uint32_t get_num_retained_above_level_zero() const;
+
+    static void check_m(uint8_t m);
+    static void check_preamble_ints(uint8_t preamble_ints, uint8_t flags_byte);
+    static void check_serial_version(uint8_t serial_version);
+    static void check_family_id(uint8_t family_id);
+
+    // implementations for floating point types
+    template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
+    static TT get_invalid_value() {
+      return std::numeric_limits<TT>::quiet_NaN();
+    }
+
+    template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
+    static inline bool check_update_value(TT value) {
+      return !std::isnan(value);
+    }
+
+    // implementations for all other types
+    template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
+    static TT get_invalid_value() {
+      throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values");
+    }
+
+    template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
+    static inline bool check_update_value(TT) {
+      return true;
+    }
+
+};
+
+template<typename T, typename C, typename S, typename A>
+class kll_sketch<T, C, S, A>::const_iterator: public std::iterator<std::input_iterator_tag, T> {
+public:
+  friend class kll_sketch<T, C, S, A>;
+  const_iterator& operator++();
+  const_iterator& operator++(int);
+  bool operator==(const const_iterator& other) const;
+  bool operator!=(const const_iterator& other) const;
+  const std::pair<const T&, const uint64_t> operator*() const;
+private:
+  const T* items;
+  const uint32_t* levels;
+  const uint8_t num_levels;
+  uint32_t index;
+  uint8_t level;
+  uint64_t weight;
+  const_iterator(const T* items, const uint32_t* levels, const uint8_t num_levels);
+};
+
+} /* namespace datasketches */
+
+#include "kll_sketch_impl.hpp"
+
+#endif
diff --git a/be/src/thirdparty/datasketches/kll_sketch_impl.hpp b/be/src/thirdparty/datasketches/kll_sketch_impl.hpp
new file mode 100644
index 0000000..9b6c68e
--- /dev/null
+++ b/be/src/thirdparty/datasketches/kll_sketch_impl.hpp
@@ -0,0 +1,1130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_SKETCH_IMPL_HPP_
+#define KLL_SKETCH_IMPL_HPP_
+
+#include <iostream>
+#include <iomanip>
+#include <sstream>
+
+#include "memory_operations.hpp"
+#include "kll_helper.hpp"
+
+namespace datasketches {
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>::kll_sketch(uint16_t k):
+k_(k),
+m_(DEFAULT_M),
+min_k_(k),
+n_(0),
+num_levels_(1),
+levels_(2),
+items_(nullptr),
+items_size_(k_),
+min_value_(nullptr),
+max_value_(nullptr),
+is_level_zero_sorted_(false)
+{
+  if (k < MIN_K || k > MAX_K) {
+    throw std::invalid_argument("K must be >= " + std::to_string(MIN_K) + " and <= " + std::to_string(MAX_K) + ": " + std::to_string(k));
+  }
+  levels_[0] = levels_[1] = k;
+  items_ = A().allocate(items_size_);
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>::kll_sketch(const kll_sketch& other):
+k_(other.k_),
+m_(other.m_),
+min_k_(other.min_k_),
+n_(other.n_),
+num_levels_(other.num_levels_),
+levels_(other.levels_),
+items_(nullptr),
+items_size_(other.items_size_),
+min_value_(nullptr),
+max_value_(nullptr),
+is_level_zero_sorted_(other.is_level_zero_sorted_)
+{
+  items_ = A().allocate(items_size_);
+  std::copy(&other.items_[levels_[0]], &other.items_[levels_[num_levels_]], &items_[levels_[0]]);
+  if (other.min_value_ != nullptr) min_value_ = new (A().allocate(1)) T(*other.min_value_);
+  if (other.max_value_ != nullptr) max_value_ = new (A().allocate(1)) T(*other.max_value_);
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>::kll_sketch(kll_sketch&& other) noexcept:
+k_(other.k_),
+m_(other.m_),
+min_k_(other.min_k_),
+n_(other.n_),
+num_levels_(other.num_levels_),
+levels_(std::move(other.levels_)),
+items_(other.items_),
+items_size_(other.items_size_),
+min_value_(other.min_value_),
+max_value_(other.max_value_),
+is_level_zero_sorted_(other.is_level_zero_sorted_)
+{
+  other.items_ = nullptr;
+  other.min_value_ = nullptr;
+  other.max_value_ = nullptr;
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>& kll_sketch<T, C, S, A>::operator=(const kll_sketch& other) {
+  kll_sketch<T, C, S, A> copy(other);
+  std::swap(k_, copy.k_);
+  std::swap(m_, copy.m_);
+  std::swap(min_k_, copy.min_k_);
+  std::swap(n_, copy.n_);
+  std::swap(num_levels_, copy.num_levels_);
+  std::swap(levels_, copy.levels_);
+  std::swap(items_, copy.items_);
+  std::swap(items_size_, copy.items_size_);
+  std::swap(min_value_, copy.min_value_);
+  std::swap(max_value_, copy.max_value_);
+  std::swap(is_level_zero_sorted_, copy.is_level_zero_sorted_);
+  return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>& kll_sketch<T, C, S, A>::operator=(kll_sketch&& other) {
+  std::swap(k_, other.k_);
+  std::swap(m_, other.m_);
+  std::swap(min_k_, other.min_k_);
+  std::swap(n_, other.n_);
+  std::swap(num_levels_, other.num_levels_);
+  std::swap(levels_, other.levels_);
+  std::swap(items_, other.items_);
+  std::swap(items_size_, other.items_size_);
+  std::swap(min_value_, other.min_value_);
+  std::swap(max_value_, other.max_value_);
+  std::swap(is_level_zero_sorted_, other.is_level_zero_sorted_);
+  return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>::~kll_sketch() {
+  if (items_ != nullptr) {
+    const uint32_t begin = levels_[0];
+    const uint32_t end = levels_[num_levels_];
+    for (uint32_t i = begin; i < end; i++) items_[i].~T();
+    A().deallocate(items_, items_size_);
+  }
+  if (min_value_ != nullptr) {
+    min_value_->~T();
+    A().deallocate(min_value_, 1);
+  }
+  if (max_value_ != nullptr) {
+    max_value_->~T();
+    A().deallocate(max_value_, 1);
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::update(const T& value) {
+  if (!check_update_value(value)) { return; }
+  update_min_max(value);
+  const uint32_t index = internal_update();
+  new (&items_[index]) T(value);
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::update(T&& value) {
+  if (!check_update_value(value)) { return; }
+  update_min_max(value);
+  const uint32_t index = internal_update();
+  new (&items_[index]) T(std::move(value));
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::update_min_max(const T& value) {
+  if (is_empty()) {
+    min_value_ = new (A().allocate(1)) T(value);
+    max_value_ = new (A().allocate(1)) T(value);
+  } else {
+    if (C()(value, *min_value_)) *min_value_ = value;
+    if (C()(*max_value_, value)) *max_value_ = value;
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t kll_sketch<T, C, S, A>::internal_update() {
+  if (levels_[0] == 0) compress_while_updating();
+  n_++;
+  is_level_zero_sorted_ = false;
+  return --levels_[0];
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::merge(const kll_sketch& other) {
+  if (other.is_empty()) return;
+  if (m_ != other.m_) {
+    throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
+  }
+  if (is_empty()) {
+    min_value_ = new (A().allocate(1)) T(*other.min_value_);
+    max_value_ = new (A().allocate(1)) T(*other.max_value_);
+  } else {
+    if (C()(*other.min_value_, *min_value_)) *min_value_ = *other.min_value_;
+    if (C()(*max_value_, *other.max_value_)) *max_value_ = *other.max_value_;
+  }
+  const uint64_t final_n = n_ + other.n_;
+  for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
+    const uint32_t index = internal_update();
+    new (&items_[index]) T(other.items_[i]);
+  }
+  if (other.num_levels_ >= 2) merge_higher_levels(other, final_n);
+  n_ = final_n;
+  if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
+  assert_correct_total_weight();
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::merge(kll_sketch&& other) {
+  if (other.is_empty()) return;
+  if (m_ != other.m_) {
+    throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
+  }
+  if (is_empty()) {
+    min_value_ = new (A().allocate(1)) T(std::move(*other.min_value_));
+    max_value_ = new (A().allocate(1)) T(std::move(*other.max_value_));
+  } else {
+    if (C()(*other.min_value_, *min_value_)) *min_value_ = std::move(*other.min_value_);
+    if (C()(*max_value_, *other.max_value_)) *max_value_ = std::move(*other.max_value_);
+  }
+  const uint64_t final_n = n_ + other.n_;
+  for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
+    const uint32_t index = internal_update();
+    new (&items_[index]) T(std::move(other.items_[i]));
+  }
+  if (other.num_levels_ >= 2) merge_higher_levels(std::forward<kll_sketch>(other), final_n);
+  n_ = final_n;
+  if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
+  assert_correct_total_weight();
+}
+
+template<typename T, typename C, typename S, typename A>
+bool kll_sketch<T, C, S, A>::is_empty() const {
+  return n_ == 0;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint64_t kll_sketch<T, C, S, A>::get_n() const {
+  return n_;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t kll_sketch<T, C, S, A>::get_num_retained() const {
+  return levels_[num_levels_] - levels_[0];
+}
+
+template<typename T, typename C, typename S, typename A>
+bool kll_sketch<T, C, S, A>::is_estimation_mode() const {
+  return num_levels_ > 1;
+}
+
+template<typename T, typename C, typename S, typename A>
+T kll_sketch<T, C, S, A>::get_min_value() const {
+  if (is_empty()) return get_invalid_value();
+  return *min_value_;
+}
+
+template<typename T, typename C, typename S, typename A>
+T kll_sketch<T, C, S, A>::get_max_value() const {
+  if (is_empty()) return get_invalid_value();
+  return *max_value_;
+}
+
+template<typename T, typename C, typename S, typename A>
+T kll_sketch<T, C, S, A>::get_quantile(double fraction) const {
+  if (is_empty()) return get_invalid_value();
+  if (fraction == 0.0) return *min_value_;
+  if (fraction == 1.0) return *max_value_;
+  if ((fraction < 0.0) || (fraction > 1.0)) {
+    throw std::invalid_argument("Fraction cannot be less than zero or greater than 1.0");
+  }
+  // has side effect of sorting level zero if needed
+  auto quantile_calculator(const_cast<kll_sketch*>(this)->get_quantile_calculator());
+  return quantile_calculator->get_quantile(fraction);
+}
+
+template<typename T, typename C, typename S, typename A>
+std::vector<T, A> kll_sketch<T, C, S, A>::get_quantiles(const double* fractions, uint32_t size) const {
+  std::vector<T, A> quantiles;
+  if (is_empty()) return quantiles;
+  std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> quantile_calculator;
+  quantiles.reserve(size);
+  for (uint32_t i = 0; i < size; i++) {
+    const double fraction = fractions[i];
+    if ((fraction < 0.0) || (fraction > 1.0)) {
+      throw std::invalid_argument("Fraction cannot be less than zero or greater than 1.0");
+    }
+    if      (fraction == 0.0) quantiles.push_back(*min_value_);
+    else if (fraction == 1.0) quantiles.push_back(*max_value_);
+    else {
+      if (!quantile_calculator) {
+        // has side effect of sorting level zero if needed
+        quantile_calculator = const_cast<kll_sketch*>(this)->get_quantile_calculator();
+      }
+      quantiles.push_back(quantile_calculator->get_quantile(fraction));
+    }
+  }
+  return quantiles;
+}
+
+template<typename T, typename C, typename S, typename A>
+std::vector<T, A> kll_sketch<T, C, S, A>::get_quantiles(size_t num) const {
+  if (is_empty()) return std::vector<T, A>();
+  if (num == 0) {
+    throw std::invalid_argument("num must be > 0");
+  }
+  std::vector<double> fractions(num);
+  fractions[0] = 0.0;
+  for (size_t i = 1; i < num; i++) {
+    fractions[i] = static_cast<double>(i) / (num - 1);
+  }
+  if (num > 1) {
+    fractions[num - 1] = 1.0;
+  }
+  return get_quantiles(fractions.data(), num);
+}
+
+template<typename T, typename C, typename S, typename A>
+double kll_sketch<T, C, S, A>::get_rank(const T& value) const {
+  if (is_empty()) return std::numeric_limits<double>::quiet_NaN();
+  uint8_t level(0);
+  uint64_t weight(1);
+  uint64_t total(0);
+  while (level < num_levels_) {
+    const auto from_index(levels_[level]);
+    const auto to_index(levels_[level + 1]); // exclusive
+    for (uint32_t i = from_index; i < to_index; i++) {
+      if (C()(items_[i], value)) {
+        total += weight;
+      } else if ((level > 0) || is_level_zero_sorted_) {
+        break; // levels above 0 are sorted, no point comparing further
+      }
+    }
+    level++;
+    weight *= 2;
+  }
+  return (double) total / n_;
+}
+
+template<typename T, typename C, typename S, typename A>
+vector_d<A> kll_sketch<T, C, S, A>::get_PMF(const T* split_points, uint32_t size) const {
+  return get_PMF_or_CDF(split_points, size, false);
+}
+
+template<typename T, typename C, typename S, typename A>
+vector_d<A> kll_sketch<T, C, S, A>::get_CDF(const T* split_points, uint32_t size) const {
+  return get_PMF_or_CDF(split_points, size, true);
+}
+
+template<typename T, typename C, typename S, typename A>
+double kll_sketch<T, C, S, A>::get_normalized_rank_error(bool pmf) const {
+  return get_normalized_rank_error(min_k_, pmf);
+}
+
+// implementation for fixed-size arithmetic types (integral and floating point)
+template<typename T, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
+size_t kll_sketch<T, C, S, A>::get_serialized_size_bytes() const {
+  if (is_empty()) { return EMPTY_SIZE_BYTES; }
+  if (num_levels_ == 1 && get_num_retained() == 1) {
+    return DATA_START_SINGLE_ITEM + sizeof(TT);
+  }
+  // the last integer in the levels_ array is not serialized because it can be derived
+  return DATA_START + num_levels_ * sizeof(uint32_t) + (get_num_retained() + 2) * sizeof(TT);
+}
+
+// implementation for all other types
+template<typename T, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
+size_t kll_sketch<T, C, S, A>::get_serialized_size_bytes() const {
+  if (is_empty()) { return EMPTY_SIZE_BYTES; }
+  if (num_levels_ == 1 && get_num_retained() == 1) {
+    return DATA_START_SINGLE_ITEM + S().size_of_item(items_[levels_[0]]);
+  }
+  // the last integer in the levels_ array is not serialized because it can be derived
+  size_t size = DATA_START + num_levels_ * sizeof(uint32_t);
+  size += S().size_of_item(*min_value_);
+  size += S().size_of_item(*max_value_);
+  for (auto& it: *this) size += S().size_of_item(it.first);
+  return size;
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::serialize(std::ostream& os) const {
+  const bool is_single_item = n_ == 1;
+  const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
+  os.write(reinterpret_cast<const char*>(&preamble_ints), sizeof(preamble_ints));
+  const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
+  os.write(reinterpret_cast<const char*>(&serial_version), sizeof(serial_version));
+  const uint8_t family(FAMILY);
+  os.write(reinterpret_cast<const char*>(&family), sizeof(family));
+  const uint8_t flags_byte(
+      (is_empty() ? 1 << flags::IS_EMPTY : 0)
+    | (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
+    | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
+  );
+  os.write(reinterpret_cast<const char*>(&flags_byte), sizeof(flags_byte));
+  os.write((char*)&k_, sizeof(k_));
+  os.write((char*)&m_, sizeof(m_));
+  const uint8_t unused(0);
+  os.write(reinterpret_cast<const char*>(&unused), sizeof(unused));
+  if (is_empty()) return;
+  if (!is_single_item) {
+    os.write((char*)&n_, sizeof(n_));
+    os.write((char*)&min_k_, sizeof(min_k_));
+    os.write((char*)&num_levels_, sizeof(num_levels_));
+    os.write((char*)&unused, sizeof(unused));
+    os.write((char*)levels_.data(), sizeof(levels_[0]) * num_levels_);
+    S().serialize(os, min_value_, 1);
+    S().serialize(os, max_value_, 1);
+  }
+  S().serialize(os, &items_[levels_[0]], get_num_retained());
+}
+
+template<typename T, typename C, typename S, typename A>
+vector_u8<A> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const {
+  const bool is_single_item = n_ == 1;
+  const size_t size = header_size_bytes + get_serialized_size_bytes();
+  vector_u8<A> bytes(size);
+  uint8_t* ptr = bytes.data() + header_size_bytes;
+  uint8_t* end_ptr = ptr + size;
+  const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
+  ptr += copy_to_mem(&preamble_ints, ptr, sizeof(preamble_ints));
+  const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
+  ptr += copy_to_mem(&serial_version, ptr, sizeof(serial_version));
+  const uint8_t family(FAMILY);
+  ptr += copy_to_mem(&family, ptr, sizeof(family));
+  const uint8_t flags_byte(
+      (is_empty() ? 1 << flags::IS_EMPTY : 0)
+    | (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
+    | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
+  );
+  ptr += copy_to_mem(&flags_byte, ptr, sizeof(flags_byte));
+  ptr += copy_to_mem(&k_, ptr, sizeof(k_));
+  ptr += copy_to_mem(&m_, ptr, sizeof(m_));
+  const uint8_t unused(0);
+  ptr += copy_to_mem(&unused, ptr, sizeof(unused));
+  if (!is_empty()) {
+    if (!is_single_item) {
+      ptr += copy_to_mem(&n_, ptr, sizeof(n_));
+      ptr += copy_to_mem(&min_k_, ptr, sizeof(min_k_));
+      ptr += copy_to_mem(&num_levels_, ptr, sizeof(num_levels_));
+      ptr += copy_to_mem(&unused, ptr, sizeof(unused));
+      ptr += copy_to_mem(levels_.data(), ptr, sizeof(levels_[0]) * num_levels_);
+      ptr += S().serialize(ptr, end_ptr - ptr, min_value_, 1);
+      ptr += S().serialize(ptr, end_ptr - ptr, max_value_, 1);
+    }
+    const size_t bytes_remaining = end_ptr - ptr;
+    ptr += S().serialize(ptr, bytes_remaining, &items_[levels_[0]], get_num_retained());
+  }
+  const size_t delta = ptr - bytes.data();
+  if (delta != size) throw std::logic_error("serialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size));
+  return bytes;
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(std::istream& is) {
+  uint8_t preamble_ints;
+  is.read((char*)&preamble_ints, sizeof(preamble_ints));
+  uint8_t serial_version;
+  is.read((char*)&serial_version, sizeof(serial_version));
+  uint8_t family_id;
+  is.read((char*)&family_id, sizeof(family_id));
+  uint8_t flags_byte;
+  is.read((char*)&flags_byte, sizeof(flags_byte));
+  uint16_t k;
+  is.read((char*)&k, sizeof(k));
+  uint8_t m;
+  is.read((char*)&m, sizeof(m));
+  uint8_t unused;
+  is.read((char*)&unused, sizeof(unused));
+
+  check_m(m);
+  check_preamble_ints(preamble_ints, flags_byte);
+  check_serial_version(serial_version);
+  check_family_id(family_id);
+
+  const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
+  if (!is.good()) throw std::runtime_error("error reading from std::istream"); 
+  if (is_empty) return kll_sketch<T, C, S, A>(k);
+
+  uint64_t n;
+  uint16_t min_k;
+  uint8_t num_levels;
+  const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM)); // used in serial version 2
+  if (is_single_item) {
+    n = 1;
+    min_k = k;
+    num_levels = 1;
+  } else {
+    is.read((char*)&n, sizeof(n_));
+    is.read((char*)&min_k, sizeof(min_k_));
+    is.read((char*)&num_levels, sizeof(num_levels));
+    is.read((char*)&unused, sizeof(unused));
+  }
+  vector_u32<A> levels(num_levels + 1);
+  const uint32_t capacity(kll_helper::compute_total_capacity(k, m, num_levels));
+  if (is_single_item) {
+    levels[0] = capacity - 1;
+  } else {
+    // the last integer in levels_ is not serialized because it can be derived
+    is.read((char*)levels.data(), sizeof(levels[0]) * num_levels);
+  }
+  levels[num_levels] = capacity;
+  auto item_buffer_deleter = [](T* ptr) { A().deallocate(ptr, 1); };
+  std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(A().allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(A().allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, item_deleter> min_value;
+  std::unique_ptr<T, item_deleter> max_value;
+  if (!is_single_item) {
+    S().deserialize(is, min_value_buffer.get(), 1);
+    // serde call did not throw, repackage with destrtuctor
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+    S().deserialize(is, max_value_buffer.get(), 1);
+    // serde call did not throw, repackage with destrtuctor
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+  }
+  auto items_buffer_deleter = [capacity](T* ptr) { A().deallocate(ptr, capacity); };
+  std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(A().allocate(capacity), items_buffer_deleter);
+  const auto num_items = levels[num_levels] - levels[0];
+  S().deserialize(is, &items_buffer.get()[levels[0]], num_items);
+  // serde call did not throw, repackage with destrtuctors
+  std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity));
+  const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
+  if (is_single_item) {
+    new (min_value_buffer.get()) T(items.get()[levels[0]]);
+    // copy did not throw, repackage with destrtuctor
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+    new (max_value_buffer.get()) T(items.get()[levels[0]]);
+    // copy did not throw, repackage with destrtuctor
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+  }
+  if (!is.good())
+    throw std::runtime_error("error reading from std::istream");
+  return kll_sketch(k, min_k, n, num_levels, std::move(levels), std::move(items), capacity,
+      std::move(min_value), std::move(max_value), is_level_zero_sorted);
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size) {
+  ensure_minimum_memory(size, 8);
+  const char* ptr = static_cast<const char*>(bytes);
+  uint8_t preamble_ints;
+  ptr += copy_from_mem(ptr, &preamble_ints, sizeof(preamble_ints));
+  uint8_t serial_version;
+  ptr += copy_from_mem(ptr, &serial_version, sizeof(serial_version));
+  uint8_t family_id;
+  ptr += copy_from_mem(ptr, &family_id, sizeof(family_id));
+  uint8_t flags_byte;
+  ptr += copy_from_mem(ptr, &flags_byte, sizeof(flags_byte));
+  uint16_t k;
+  ptr += copy_from_mem(ptr, &k, sizeof(k));
+  uint8_t m;
+  ptr += copy_from_mem(ptr, &m, sizeof(m));
+  ptr++; // skip unused byte
+
+  check_m(m);
+  check_preamble_ints(preamble_ints, flags_byte);
+  check_serial_version(serial_version);
+  check_family_id(family_id);
+  ensure_minimum_memory(size, 1 << preamble_ints);
+
+  const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
+  if (is_empty) return kll_sketch<T, C, S, A>(k);
+
+  uint64_t n;
+  uint16_t min_k;
+  uint8_t num_levels;
+  const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM)); // used in serial version 2
+  const char* end_ptr = static_cast<const char*>(bytes) + size;
+  if (is_single_item) {
+    n = 1;
+    min_k = k;
+    num_levels = 1;
+  } else {
+    ptr += copy_from_mem(ptr, &n, sizeof(n));
+    ptr += copy_from_mem(ptr, &min_k, sizeof(min_k));
+    ptr += copy_from_mem(ptr, &num_levels, sizeof(num_levels));
+    ptr++; // skip unused byte
+  }
+  vector_u32<A> levels(num_levels + 1);
+  const uint32_t capacity(kll_helper::compute_total_capacity(k, m, num_levels));
+  if (is_single_item) {
+    levels[0] = capacity - 1;
+  } else {
+    // the last integer in levels_ is not serialized because it can be derived
+    ptr += copy_from_mem(ptr, levels.data(), sizeof(levels[0]) * num_levels);
+  }
+  levels[num_levels] = capacity;
+  auto item_buffer_deleter = [](T* ptr) { A().deallocate(ptr, 1); };
+  std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(A().allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(A().allocate(1), item_buffer_deleter);
+  std::unique_ptr<T, item_deleter> min_value;
+  std::unique_ptr<T, item_deleter> max_value;
+  if (!is_single_item) {
+    ptr += S().deserialize(ptr, end_ptr - ptr, min_value_buffer.get(), 1);
+    // serde call did not throw, repackage with destrtuctor
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+    ptr += S().deserialize(ptr, end_ptr - ptr, max_value_buffer.get(), 1);
+    // serde call did not throw, repackage with destrtuctor
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+  }
+  auto items_buffer_deleter = [capacity](T* ptr) { A().deallocate(ptr, capacity); };
+  std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(A().allocate(capacity), items_buffer_deleter);
+  const auto num_items = levels[num_levels] - levels[0];
+  ptr += S().deserialize(ptr, end_ptr - ptr, &items_buffer.get()[levels[0]], num_items);
+  // serde call did not throw, repackage with destrtuctors
+  std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity));
+  const size_t delta = ptr - static_cast<const char*>(bytes);
+  if (delta != size) throw std::logic_error("deserialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size));
+  const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
+  if (is_single_item) {
+    new (min_value_buffer.get()) T(items.get()[levels[0]]);
+    // copy did not throw, repackage with destrtuctor
+    min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+    new (max_value_buffer.get()) T(items.get()[levels[0]]);
+    // copy did not throw, repackage with destrtuctor
+    max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+  }
+  return kll_sketch(k, min_k, n, num_levels, std::move(levels), std::move(items), capacity,
+      std::move(min_value), std::move(max_value), is_level_zero_sorted);
+}
+
+/*
+ * Gets the normalized rank error given k and pmf.
+ * k - the configuration parameter
+ * pmf - if true, returns the "double-sided" normalized rank error for the get_PMF() function.
+ * Otherwise, it is the "single-sided" normalized rank error for all the other queries.
+ * Constants were derived as the best fit to 99 percentile empirically measured max error in thousands of trials
+ */
+template<typename T, typename C, typename S, typename A>
+double kll_sketch<T, C, S, A>::get_normalized_rank_error(uint16_t k, bool pmf) {
+  return pmf
+      ? 2.446 / pow(k, 0.9433)
+      : 2.296 / pow(k, 0.9723);
+}
+
+// for deserialization
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint16_t min_k, uint64_t n, uint8_t num_levels, vector_u32<A>&& levels,
+    std::unique_ptr<T, items_deleter> items, uint32_t items_size, std::unique_ptr<T, item_deleter> min_value,
+    std::unique_ptr<T, item_deleter> max_value, bool is_level_zero_sorted):
+k_(k),
+m_(DEFAULT_M),
+min_k_(min_k),
+n_(n),
+num_levels_(num_levels),
+levels_(std::move(levels)),
+items_(items.release()),
+items_size_(items_size),
+min_value_(min_value.release()),
+max_value_(max_value.release()),
+is_level_zero_sorted_(is_level_zero_sorted)
+{}
+
+// The following code is only valid in the special case of exactly reaching capacity while updating.
+// It cannot be used while merging, while reducing k, or anything else.
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::compress_while_updating(void) {
+  const uint8_t level = find_level_to_compact();
+
+  // It is important to add the new top level right here. Be aware that this operation
+  // grows the buffer and shifts the data and also the boundaries of the data and grows the
+  // levels array and increments num_levels_
+  if (level == (num_levels_ - 1)) {
+    add_empty_top_level_to_completely_full_sketch();
+  }
+
+  const uint32_t raw_beg = levels_[level];
+  const uint32_t raw_lim = levels_[level + 1];
+  // +2 is OK because we already added a new top level if necessary
+  const uint32_t pop_above = levels_[level + 2] - raw_lim;
+  const uint32_t raw_pop = raw_lim - raw_beg;
+  const bool odd_pop = kll_helper::is_odd(raw_pop);
+  const uint32_t adj_beg = odd_pop ? raw_beg + 1 : raw_beg;
+  const uint32_t adj_pop = odd_pop ? raw_pop - 1 : raw_pop;
+  const uint32_t half_adj_pop = adj_pop / 2;
+  const uint32_t destroy_beg = levels_[0];
+
+  // level zero might not be sorted, so we must sort it if we wish to compact it
+  // sort_level_zero() is not used here because of the adjustment for odd number of items
+  if ((level == 0) && !is_level_zero_sorted_) {
+    std::sort(&items_[adj_beg], &items_[adj_beg + adj_pop], C());
+  }
+  if (pop_above == 0) {
+    kll_helper::randomly_halve_up(items_, adj_beg, adj_pop);
+  } else {
+    kll_helper::randomly_halve_down(items_, adj_beg, adj_pop);
+    kll_helper::merge_sorted_arrays<T, C>(items_, adj_beg, half_adj_pop, raw_lim, pop_above, adj_beg + half_adj_pop);
+  }
+  levels_[level + 1] -= half_adj_pop; // adjust boundaries of the level above
+  if (odd_pop) {
+    levels_[level] = levels_[level + 1] - 1; // the current level now contains one item
+    if (levels_[level] != raw_beg) items_[levels_[level]] = std::move(items_[raw_beg]); // namely this leftover guy
+  } else {
+    levels_[level] = levels_[level + 1]; // the current level is now empty
+  }
+
+  // verify that we freed up half_adj_pop array slots just below the current level
+  if (levels_[level] != (raw_beg + half_adj_pop)) throw std::logic_error("compaction error");
+
+  // finally, we need to shift up the data in the levels below
+  // so that the freed-up space can be used by level zero
+  if (level > 0) {
+    const uint32_t amount = raw_beg - levels_[0];
+    std::move_backward(&items_[levels_[0]], &items_[levels_[0] + amount], &items_[levels_[0] + half_adj_pop + amount]);
+    for (uint8_t lvl = 0; lvl < level; lvl++) levels_[lvl] += half_adj_pop;
+  }
+  for (uint32_t i = 0; i < half_adj_pop; i++) items_[i + destroy_beg].~T();
+}
+
+template<typename T, typename C, typename S, typename A>
+uint8_t kll_sketch<T, C, S, A>::find_level_to_compact() const {
+  uint8_t level = 0;
+  while (true) {
+    if (level >= num_levels_) throw std::logic_error("capacity calculation error");
+    const uint32_t pop = levels_[level + 1] - levels_[level];
+    const uint32_t cap = kll_helper::level_capacity(k_, num_levels_, level, m_);
+    if (pop >= cap) {
+      return level;
+    }
+    level++;
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::add_empty_top_level_to_completely_full_sketch() {
+  const uint32_t cur_total_cap = levels_[num_levels_];
+
+  // make sure that we are following a certain growth scheme
+  if (levels_[0] != 0) throw std::logic_error("full sketch expected");
+  if (items_size_ != cur_total_cap) throw std::logic_error("current capacity mismatch");
+
+  // note that merging MIGHT over-grow levels_, in which case we might not have to grow it here
+  const uint8_t new_levels_size = num_levels_ + 2;
+  if (levels_.size() < new_levels_size) {
+    levels_.resize(new_levels_size);
+  }
+
+  const uint32_t delta_cap = kll_helper::level_capacity(k_, num_levels_ + 1, 0, m_);
+  const uint32_t new_total_cap = cur_total_cap + delta_cap;
+
+  // move (and shift) the current data into the new buffer
+  T* new_buf = A().allocate(new_total_cap);
+  kll_helper::move_construct<T>(items_, 0, cur_total_cap, new_buf, delta_cap, true);
+  A().deallocate(items_, items_size_);
+  items_ = new_buf;
+  items_size_ = new_total_cap;
+
+  // this loop includes the old "extra" index at the top
+  for (uint8_t i = 0; i <= num_levels_; i++) {
+    levels_[i] += delta_cap;
+  }
+
+  if (levels_[num_levels_] != new_total_cap) throw std::logic_error("new capacity mismatch");
+
+  num_levels_++;
+  levels_[num_levels_] = new_total_cap; // initialize the new "extra" index at the top
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::sort_level_zero() {
+  if (!is_level_zero_sorted_) {
+    std::sort(&items_[levels_[0]], &items_[levels_[1]], C());
+    is_level_zero_sorted_ = true;
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> kll_sketch<T, C, S, A>::get_quantile_calculator() {
+  sort_level_zero();
+  typedef typename std::allocator_traits<A>::template rebind_alloc<kll_quantile_calculator<T, C, A>> AllocCalc;
+  std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> quantile_calculator(
+    new (AllocCalc().allocate(1)) kll_quantile_calculator<T, C, A>(items_, levels_.data(), num_levels_, n_),
+    [](kll_quantile_calculator<T, C, A>* ptr){ ptr->~kll_quantile_calculator<T, C, A>(); AllocCalc().deallocate(ptr, 1); }
+  );
+  return quantile_calculator;
+}
+
+template<typename T, typename C, typename S, typename A>
+vector_d<A> kll_sketch<T, C, S, A>::get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const {
+  if (is_empty()) return vector_d<A>();
+  kll_helper::validate_values<T, C>(split_points, size);
+  vector_d<A> buckets(size + 1, 0);
+  uint8_t level(0);
+  uint64_t weight(1);
+  while (level < num_levels_) {
+    const auto from_index = levels_[level];
+    const auto to_index = levels_[level + 1]; // exclusive
+    if ((level == 0) && !is_level_zero_sorted_) {
+      increment_buckets_unsorted_level(from_index, to_index, weight, split_points, size, buckets.data());
+    } else {
+      increment_buckets_sorted_level(from_index, to_index, weight, split_points, size, buckets.data());
+    }
+    level++;
+    weight *= 2;
+  }
+  // normalize and, if CDF, convert to cumulative
+  if (is_CDF) {
+    double subtotal = 0;
+    for (uint32_t i = 0; i <= size; i++) {
+      subtotal += buckets[i];
+      buckets[i] = subtotal / n_;
+    }
+  } else {
+    for (uint32_t i = 0; i <= size; i++) {
+      buckets[i] /= n_;
+    }
+  }
+  return buckets;
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::increment_buckets_unsorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
+    const T* split_points, uint32_t size, double* buckets) const
+{
+  for (uint32_t i = from_index; i < to_index; i++) {
+    uint32_t j;
+    for (j = 0; j < size; j++) {
+      if (C()(items_[i], split_points[j])) {
+        break;
+      }
+    }
+    buckets[j] += weight;
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
+    const T* split_points, uint32_t size, double* buckets) const
+{
+  uint32_t i = from_index;
+  uint32_t j = 0;
+  while ((i <  to_index) && (j < size)) {
+    if (C()(items_[i], split_points[j])) {
+      buckets[j] += weight; // this sample goes into this bucket
+      i++; // move on to next sample and see whether it also goes into this bucket
+    } else {
+      j++; // no more samples for this bucket
+    }
+  }
+  // now either i == to_index (we are out of samples), or
+  // j == size (we are out of buckets, but there are more samples remaining)
+  // we only need to do something in the latter case
+  if (j == size) {
+    buckets[j] += weight * (to_index - i);
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+template<typename O>
+void kll_sketch<T, C, S, A>::merge_higher_levels(O&& other, uint64_t final_n) {
+  const uint32_t tmp_num_items = get_num_retained() + other.get_num_retained_above_level_zero();
+  auto tmp_items_deleter = [tmp_num_items](T* ptr) { A().deallocate(ptr, tmp_num_items); }; // no destructor needed
+  const std::unique_ptr<T, decltype(tmp_items_deleter)> workbuf(A().allocate(tmp_num_items), tmp_items_deleter);
+  const uint8_t ub = kll_helper::ub_on_num_levels(final_n);
+  const size_t work_levels_size = ub + 2; // ub+1 does not work
+  vector_u32<A> worklevels(work_levels_size);
+  vector_u32<A> outlevels(work_levels_size);
+
+  const uint8_t provisional_num_levels = std::max(num_levels_, other.num_levels_);
+
+  populate_work_arrays(std::forward<O>(other), workbuf.get(), worklevels.data(), provisional_num_levels);
+
+  const kll_helper::compress_result result = kll_helper::general_compress<T, C>(k_, m_, provisional_num_levels, workbuf.get(),
+      worklevels.data(), outlevels.data(), is_level_zero_sorted_);
+
+  // ub can sometimes be much bigger
+  if (result.final_num_levels > ub) throw std::logic_error("merge error");
+
+  // now we need to transfer the results back into "this" sketch
+  if (result.final_capacity != items_size_) {
+    A().deallocate(items_, items_size_);
+    items_size_ = result.final_capacity;
+    items_ = A().allocate(items_size_);
+  }
+  const uint32_t free_space_at_bottom = result.final_capacity - result.final_num_items;
+  kll_helper::move_construct<T>(workbuf.get(), outlevels[0], outlevels[0] + result.final_num_items, items_, free_space_at_bottom, true);
+
+  if (levels_.size() < (result.final_num_levels + 1)) {
+    levels_.resize(result.final_num_levels + 1);
+  }
+  const uint32_t offset = free_space_at_bottom - outlevels[0];
+  for (uint8_t lvl = 0; lvl < levels_.size(); lvl++) { // includes the "extra" index
+    levels_[lvl] = outlevels[lvl] + offset;
+  }
+  num_levels_ = result.final_num_levels;
+}
+
+// this leaves items_ uninitialized (all objects moved out and destroyed)
+// this version copies objects from the incoming sketch
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
+  worklevels[0] = 0;
+
+  // the level zero data from "other" was already inserted into "this"
+  kll_helper::move_construct<T>(items_, levels_[0], levels_[1], workbuf, 0, true);
+  worklevels[1] = safe_level_size(0);
+
+  for (uint8_t lvl = 1; lvl < provisional_num_levels; lvl++) {
+    const uint32_t self_pop = safe_level_size(lvl);
+    const uint32_t other_pop = other.safe_level_size(lvl);
+    worklevels[lvl + 1] = worklevels[lvl] + self_pop + other_pop;
+
+    if ((self_pop > 0) && (other_pop == 0)) {
+      kll_helper::move_construct<T>(items_, levels_[lvl], levels_[lvl] + self_pop, workbuf, worklevels[lvl], true);
+    } else if ((self_pop == 0) && (other_pop > 0)) {
+      kll_helper::copy_construct<T>(other.items_, other.levels_[lvl], other.levels_[lvl] + other_pop, workbuf, worklevels[lvl]);
+    } else if ((self_pop > 0) && (other_pop > 0)) {
+      kll_helper::merge_sorted_arrays<T, C>(items_, levels_[lvl], self_pop, other.items_, other.levels_[lvl], other_pop, workbuf, worklevels[lvl]);
+    }
+  }
+}
+
+// this leaves items_ uninitialized (all objects moved out and destroyed)
+// this version moves objects from the incoming sketch
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::populate_work_arrays(kll_sketch&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
+  worklevels[0] = 0;
+
+  // the level zero data from "other" was already inserted into "this"
+  kll_helper::move_construct<T>(items_, levels_[0], levels_[1], workbuf, 0, true);
+  worklevels[1] = safe_level_size(0);
+
+  for (uint8_t lvl = 1; lvl < provisional_num_levels; lvl++) {
+    const uint32_t self_pop = safe_level_size(lvl);
+    const uint32_t other_pop = other.safe_level_size(lvl);
+    worklevels[lvl + 1] = worklevels[lvl] + self_pop + other_pop;
+
+    if ((self_pop > 0) && (other_pop == 0)) {
+      kll_helper::move_construct<T>(items_, levels_[lvl], levels_[lvl] + self_pop, workbuf, worklevels[lvl], true);
+    } else if ((self_pop == 0) && (other_pop > 0)) {
+      kll_helper::move_construct<T>(other.items_, other.levels_[lvl], other.levels_[lvl] + other_pop, workbuf, worklevels[lvl], false);
+    } else if ((self_pop > 0) && (other_pop > 0)) {
+      kll_helper::merge_sorted_arrays<T, C>(items_, levels_[lvl], self_pop, other.items_, other.levels_[lvl], other_pop, workbuf, worklevels[lvl]);
+    }
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::assert_correct_total_weight() const {
+  const uint64_t total(kll_helper::sum_the_sample_weights(num_levels_, levels_.data()));
+  if (total != n_) {
+    throw std::logic_error("Total weight does not match N");
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t kll_sketch<T, C, S, A>::safe_level_size(uint8_t level) const {
+  if (level >= num_levels_) return 0;
+  return levels_[level + 1] - levels_[level];
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t kll_sketch<T, C, S, A>::get_num_retained_above_level_zero() const {
+  if (num_levels_ == 1) return 0;
+  return levels_[num_levels_] - levels_[1];
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::check_m(uint8_t m) {
+  if (m != DEFAULT_M) {
+    throw std::invalid_argument("Possible corruption: M must be " + std::to_string(DEFAULT_M)
+        + ": " + std::to_string(m));
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::check_preamble_ints(uint8_t preamble_ints, uint8_t flags_byte) {
+  const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
+  const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM));
+  if (is_empty || is_single_item) {
+    if (preamble_ints != PREAMBLE_INTS_SHORT) {
+      throw std::invalid_argument("Possible corruption: preamble ints must be "
+          + std::to_string(PREAMBLE_INTS_SHORT) + " for an empty or single item sketch: " + std::to_string(preamble_ints));
+    }
+  } else {
+    if (preamble_ints != PREAMBLE_INTS_FULL) {
+      throw std::invalid_argument("Possible corruption: preamble ints must be "
+          + std::to_string(PREAMBLE_INTS_FULL) + " for a sketch with more than one item: " + std::to_string(preamble_ints));
+    }
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::check_serial_version(uint8_t serial_version) {
+  if (serial_version != SERIAL_VERSION_1 && serial_version != SERIAL_VERSION_2) {
+    throw std::invalid_argument("Possible corruption: serial version mismatch: expected "
+        + std::to_string(SERIAL_VERSION_1) + " or " + std::to_string(SERIAL_VERSION_2)
+        + ", got " + std::to_string(serial_version));
+  }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::check_family_id(uint8_t family_id) {
+  if (family_id != FAMILY) {
+    throw std::invalid_argument("Possible corruption: family mismatch: expected "
+        + std::to_string(FAMILY) + ", got " + std::to_string(family_id));
+  }
+}
+
+template <typename T, typename C, typename S, typename A>
+string<A> kll_sketch<T, C, S, A>::to_string(bool print_levels, bool print_items) const {
+  std::basic_ostringstream<char, std::char_traits<char>, AllocChar<A>> os;
+  os << "### KLL sketch summary:" << std::endl;
+  os << "   K              : " << k_ << std::endl;
+  os << "   min K          : " << min_k_ << std::endl;
+  os << "   M              : " << (unsigned int) m_ << std::endl;
+  os << "   N              : " << n_ << std::endl;
+  os << "   Epsilon        : " << std::setprecision(3) << get_normalized_rank_error(false) * 100 << "%" << std::endl;
+  os << "   Epsilon PMF    : " << get_normalized_rank_error(true) * 100 << "%" << std::endl;
+  os << "   Empty          : " << (is_empty() ? "true" : "false") << std::endl;
+  os << "   Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
+  os << "   Levels         : " << (unsigned int) num_levels_ << std::endl;
+  os << "   Sorted         : " << (is_level_zero_sorted_ ? "true" : "false") << std::endl;
+  os << "   Capacity items : " << items_size_ << std::endl;
+  os << "   Retained items : " << get_num_retained() << std::endl;
+  os << "   Storage bytes  : " << get_serialized_size_bytes() << std::endl;
+  if (!is_empty()) {
+    os << "   Min value      : " << *min_value_ << std::endl;
+    os << "   Max value      : " << *max_value_ << std::endl;
+  }
+  os << "### End sketch summary" << std::endl;
+
+  if (print_levels) {
+    os << "### KLL sketch levels:" << std::endl;
+    os << "   index: nominal capacity, actual size" << std::endl;
+    for (uint8_t i = 0; i < num_levels_; i++) {
+      os << "   " << (unsigned int) i << ": " << kll_helper::level_capacity(k_, num_levels_, i, m_) << ", " << safe_level_size(i) << std::endl;
+    }
+    os << "### End sketch levels" << std::endl;
+  }
+
+  if (print_items) {
+    os << "### KLL sketch data:" << std::endl;
+    uint8_t level(0);
+    while (level < num_levels_) {
+      const uint32_t from_index = levels_[level];
+      const uint32_t to_index = levels_[level + 1]; // exclusive
+      if (from_index < to_index) {
+        os << " level " << (unsigned int) level << ":" << std::endl;
+      }
+      for (uint32_t i = from_index; i < to_index; i++) {
+        os << "   " << items_[i] << std::endl;
+      }
+      level++;
+    }
+    os << "### End sketch data" << std::endl;
+  }
+  return os.str();
+}
+
+template <typename T, typename C, typename S, typename A>
+typename kll_sketch<T, C, S, A>::const_iterator kll_sketch<T, C, S, A>::begin() const {
+  return kll_sketch<T, C, S, A>::const_iterator(items_, levels_.data(), num_levels_);
+}
+
+template <typename T, typename C, typename S, typename A>
+typename kll_sketch<T, C, S, A>::const_iterator kll_sketch<T, C, S, A>::end() const {
+  return kll_sketch<T, C, S, A>::const_iterator(nullptr, nullptr, num_levels_);
+}
+
+// kll_sketch::const_iterator implementation
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>::const_iterator::const_iterator(const T* items, const uint32_t* levels, const uint8_t num_levels):
+items(items), levels(levels), num_levels(num_levels), index(levels == nullptr ? 0 : levels[0]), level(levels == nullptr ? num_levels : 0), weight(1)
+{}
+
+template<typename T, typename C, typename S, typename A>
+typename kll_sketch<T, C, S, A>::const_iterator& kll_sketch<T, C, S, A>::const_iterator::operator++() {
+  ++index;
+  if (index == levels[level + 1]) { // go to the next non-empty level
+    do {
+      ++level;
+      weight *= 2;
+    } while (level < num_levels && levels[level] == levels[level + 1]);
+  }
+  return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+typename kll_sketch<T, C, S, A>::const_iterator& kll_sketch<T, C, S, A>::const_iterator::operator++(int) {
+  const_iterator tmp(*this);
+  operator++();
+  return tmp;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool kll_sketch<T, C, S, A>::const_iterator::operator==(const const_iterator& other) const {
+  if (level != other.level) return false;
+  if (level == num_levels) return true; // end
+  return index == other.index;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool kll_sketch<T, C, S, A>::const_iterator::operator!=(const const_iterator& other) const {
+  return !operator==(other);
+}
+
+template<typename T, typename C, typename S, typename A>
+const std::pair<const T&, const uint64_t> kll_sketch<T, C, S, A>::const_iterator::operator*() const {
+  return std::pair<const T&, const uint64_t>(items[index], weight);
+}
+
+template<typename T, typename C, typename S, typename A>
+class kll_sketch<T, C, S, A>::item_deleter {
+  public:
+  void operator() (T* ptr) const {
+    if (ptr != nullptr) {
+      ptr->~T();
+      A().deallocate(ptr, 1);
+    }
+  }
+};
+
+template<typename T, typename C, typename S, typename A>
+class kll_sketch<T, C, S, A>::items_deleter {
+  public:
+  items_deleter(uint32_t start, uint32_t num): start(start), num(num) {}
+  void operator() (T* ptr) const {
+    if (ptr != nullptr) {
+      for (uint32_t i = start; i < num; ++i) {
+        ptr[i].~T();
+      }
+      A().deallocate(ptr, num);
+    }
+  }
+  private:
+  uint32_t start;
+  uint32_t num;
+};
+
+} /* namespace datasketches */
+
+#endif
diff --git a/be/src/thirdparty/datasketches/memory_operations.hpp b/be/src/thirdparty/datasketches/memory_operations.hpp
new file mode 100644
index 0000000..80dc3a3
--- /dev/null
+++ b/be/src/thirdparty/datasketches/memory_operations.hpp
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _MEMORY_OPERATIONS_HPP_
+#define _MEMORY_OPERATIONS_HPP_
+
+#include <memory>
+#include <exception>
+#include <iostream>
+
+namespace datasketches {
+
+static inline void ensure_minimum_memory(size_t bytes_available, size_t min_needed) {
+  if (bytes_available < min_needed) {
+    throw std::out_of_range("Insufficient buffer size detected: bytes available "
+    + std::to_string(bytes_available) + ", minimum needed " + std::to_string(min_needed));
+  }  
+}
+
+static inline void check_memory_size(size_t requested_index, size_t capacity) {
+  if (requested_index > capacity) {
+    throw std::out_of_range("Attempt to access memory beyond limits: requested index "
+    + std::to_string(requested_index) + ", capacity " + std::to_string(capacity));
+  }
+}
+
+// note: size is in bytes, not items
+static inline size_t copy_from_mem(const void* src, void* dst, size_t size) {
+  memcpy(dst, src, size);
+  return size;
+}
+
+// note: size is in bytes, not items
+static inline size_t copy_to_mem(const void* src, void* dst, size_t size) {
+  memcpy(dst, src, size);
+  return size;
+}
+
+} // namespace
+
+#endif // _MEMORY_OPERATIONS_HPP_
diff --git a/be/src/thirdparty/datasketches/serde.hpp b/be/src/thirdparty/datasketches/serde.hpp
new file mode 100644
index 0000000..d0819d8
--- /dev/null
+++ b/be/src/thirdparty/datasketches/serde.hpp
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATASKETCHES_SERDE_HPP_
+#define DATASKETCHES_SERDE_HPP_
+
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <exception>
+
+#include "memory_operations.hpp"
+
+namespace datasketches {
+
+// serialize and deserialize
+template<typename T, typename Enable = void> struct serde {
+  // stream serialization
+  void serialize(std::ostream& os, const T* items, unsigned num);
+  void deserialize(std::istream& is, T* items, unsigned num); // items allocated but not initialized
+
+  // raw bytes serialization
+  size_t size_of_item(const T& item);
+  size_t serialize(void* ptr, size_t capacity, const T* items, unsigned num);
+  size_t deserialize(const void* ptr, size_t capacity, T* items, unsigned num); // items allocated but not initialized
+};
+
+// serde for all fixed-size arithmetic types (int and float of different sizes)
+// in particular, kll_sketch<int64_t> should produce sketches binary-compatible
+// with LongsSketch and ItemsSketch<Long> with ArrayOfLongsSerDe in Java
+template<typename T>
+struct serde<T, typename std::enable_if<std::is_arithmetic<T>::value>::type> {
+  void serialize(std::ostream& os, const T* items, unsigned num) {
+    bool failure = false;
+    try {
+      os.write(reinterpret_cast<const char*>(items), sizeof(T) * num);
+    } catch (std::ostream::failure& e) {
+      failure = true;
+    }
+    if (failure || !os.good()) {
+      throw std::runtime_error("error writing to std::ostream with " + std::to_string(num) + " items");
+    }
+  }
+  void deserialize(std::istream& is, T* items, unsigned num) {
+    bool failure = false;
+    try {
+      is.read((char*)items, sizeof(T) * num);
+    } catch (std::istream::failure& e) {
+      failure = true;
+    }
+    if (failure || !is.good()) {
+      throw std::runtime_error("error reading from std::istream with " + std::to_string(num) + " items");
+    }
+  }
+
+  size_t size_of_item(const T&) {
+    return sizeof(T);
+  }
+  size_t serialize(void* ptr, size_t capacity, const T* items, unsigned num) {
+    const size_t bytes_written = sizeof(T) * num;
+    check_memory_size(bytes_written, capacity);
+    memcpy(ptr, items, bytes_written);
+    return bytes_written;
+  }
+  size_t deserialize(const void* ptr, size_t capacity, T* items, unsigned num) {
+    const size_t bytes_read = sizeof(T) * num;
+    check_memory_size(bytes_read, capacity);
+    memcpy(items, ptr, bytes_read);
+    return bytes_read;
+  }
+};
+
+// serde for std::string items
+// This should produce sketches binary-compatible with
+// ItemsSketch<String> with ArrayOfStringsSerDe in Java.
+// The length of each string is stored as a 32-bit integer (historically),
+// which may be too wasteful. Treat this as an example.
+template<>
+struct serde<std::string> {
+  void serialize(std::ostream& os, const std::string* items, unsigned num) {
+    unsigned i = 0;
+    bool failure = false;
+    try {
+      for (; i < num && os.good(); i++) {
+        uint32_t length = items[i].size();
+        os.write((char*)&length, sizeof(length));
+        os.write(items[i].c_str(), length);
+      }
+    } catch (std::ostream::failure& e) {
+      failure = true;
+    }
+    if (failure || !os.good()) {
+      throw std::runtime_error("error writing to std::ostream at item " + std::to_string(i));
+    }
+  }
+  void deserialize(std::istream& is, std::string* items, unsigned num) {
+    unsigned i = 0;
+    bool failure = false;
+    try {
+      for (; i < num; i++) {
+        uint32_t length;
+        is.read((char*)&length, sizeof(length));
+        if (!is.good()) { break; }
+        std::string str;
+        str.reserve(length);
+        for (uint32_t j = 0; j < length; j++) {
+          str.push_back(is.get());
+        }
+        if (!is.good()) { break; }
+        new (&items[i]) std::string(std::move(str));
+      }
+    } catch (std::istream::failure& e) {
+      failure = true;
+    }
+    if (failure || !is.good()) {
+      // clean up what we've already allocated
+      for (unsigned j = 0; j < i; ++j) {
+        items[j].~basic_string();
+      }
+      throw std::runtime_error("error reading from std::istream at item " + std::to_string(i)); 
+    }
+  }
+  size_t size_of_item(const std::string& item) {
+    return sizeof(uint32_t) + item.size();
+  }
+  size_t serialize(void* ptr, size_t capacity, const std::string* items, unsigned num) {
+    size_t bytes_written = 0;
+    for (unsigned i = 0; i < num; ++i) {
+      const uint32_t length = items[i].size();
+      const size_t new_bytes = length + sizeof(length);
+      check_memory_size(bytes_written + new_bytes, capacity);
+      memcpy(ptr, &length, sizeof(length));
+      ptr = static_cast<char*>(ptr) + sizeof(uint32_t);
+      memcpy(ptr, items[i].c_str(), length);
+      ptr = static_cast<char*>(ptr) + length;
+      bytes_written += new_bytes;
+    }
+    return bytes_written;
+  }
+  size_t deserialize(const void* ptr, size_t capacity, std::string* items, unsigned num) {
+    size_t bytes_read = 0;
+    unsigned i = 0;
+    bool failure = false;
+    for (; i < num && !failure; ++i) {
+      uint32_t length;
+      if (bytes_read + sizeof(length) > capacity) {
+        bytes_read += sizeof(length); // we'll use this to report the error
+        failure = true;
+        break;
+      }
+      memcpy(&length, ptr, sizeof(length));
+      ptr = static_cast<const char*>(ptr) + sizeof(uint32_t);
+      bytes_read += sizeof(length);
+
+      if (bytes_read + length > capacity) {
+        bytes_read += length; // we'll use this to report the error
+        failure = true;
+        break;
+      }
+      new (&items[i]) std::string(static_cast<const char*>(ptr), length);
+      ptr = static_cast<const char*>(ptr) + length;
+      bytes_read += length;
+    }
+
+    if (failure) {
+      // clean up what we've already allocated
+      for (unsigned j = 0; j < i; ++j)
+        items[j].~basic_string();
+      // using this for a consistent error message
+      check_memory_size(bytes_read, capacity);
+    }
+
+    return bytes_read;
+  }
+};
+
+} /* namespace datasketches */
+
+# endif