You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/07/28 08:00:23 UTC
[impala] 01/02: IMPALA-9882: Import KLL functionality from Apache
DataSketches
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit a8fd3213f6e54d73658b66cddcc5b0c955dda283
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Wed Jun 24 10:46:13 2020 +0200
IMPALA-9882: Import KLL functionality from Apache DataSketches
First, I updated our existing snapshot of DataSketches to the
following commit:
c67d92faad3827932ca3b5d864222e64977f2c20
"Merge pull request #166 from gaborkaszab/const_cast"
This affects files originated from kll/ and common/ directories of
the DataSketches repo.
Then I copied all the files needed for KLL into our snapshot
directory.
You can find the original Apache DataSketches files here:
https://github.com/apache/incubator-datasketches-cpp
This new snapshot however, broke the interface we used for
serializing hll_union objects with dropping serialize_compact(). As a
solution I had to make changes to the serialization and merging
phases of the union operator by not serializing hll_union itself but
the underlying hll_sketch instead.
Change-Id: I848488d5145c808109bd50aecfbf3ef83f981943
Reviewed-on: http://gerrit.cloudera.org:8080/16196
Reviewed-by: Gabor Kaszab <ga...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exprs/CMakeLists.txt | 2 +-
be/src/exprs/aggregate-functions-ir.cc | 39 +-
be/src/exprs/datasketches-test.cc | 43 +-
.../datasketches/AuxHashMap-internal.hpp | 12 +-
be/src/thirdparty/datasketches/CommonUtil.hpp | 71 --
.../CompositeInterpolationXTable-internal.hpp | 6 +-
.../datasketches/CompositeInterpolationXTable.hpp | 6 +-
.../datasketches/CouponHashSet-internal.hpp | 19 +-
.../datasketches/CouponList-internal.hpp | 18 +-
.../thirdparty/datasketches/Hll4Array-internal.hpp | 2 +-
.../thirdparty/datasketches/HllArray-internal.hpp | 24 +-
.../thirdparty/datasketches/HllSketch-internal.hpp | 50 +-
.../datasketches/HllSketchImplFactory.hpp | 1 -
.../thirdparty/datasketches/HllUnion-internal.hpp | 48 +-
be/src/thirdparty/datasketches/HllUtil.hpp | 54 +-
be/src/thirdparty/datasketches/MurmurHash3.h | 28 +-
be/src/thirdparty/datasketches/README.md | 19 +-
.../datasketches/bounds_binomial_proportions.hpp | 291 +++++
...siteInterpolationXTable.hpp => common_defs.hpp} | 22 +-
be/src/thirdparty/datasketches/count_zeros.hpp | 114 ++
be/src/thirdparty/datasketches/hll.hpp | 87 +-
be/src/thirdparty/datasketches/kll_helper.hpp | 150 +++
be/src/thirdparty/datasketches/kll_helper_impl.hpp | 319 ++++++
.../datasketches/kll_quantile_calculator.hpp | 60 ++
.../datasketches/kll_quantile_calculator_impl.hpp | 190 ++++
be/src/thirdparty/datasketches/kll_sketch.hpp | 559 ++++++++++
be/src/thirdparty/datasketches/kll_sketch_impl.hpp | 1130 ++++++++++++++++++++
.../thirdparty/datasketches/memory_operations.hpp | 57 +
be/src/thirdparty/datasketches/serde.hpp | 196 ++++
29 files changed, 3256 insertions(+), 361 deletions(-)
diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt
index 7af6145..649be16 100644
--- a/be/src/exprs/CMakeLists.txt
+++ b/be/src/exprs/CMakeLists.txt
@@ -80,7 +80,7 @@ add_library(ExprsTests STATIC
)
add_dependencies(ExprsTests gen-deps)
-ADD_UNIFIED_BE_LSAN_TEST(datasketches-test "TestDataSketchesHll.*")
+ADD_UNIFIED_BE_LSAN_TEST(datasketches-test "TestDataSketchesHll.*:TestDataSketchesKll.*")
ADD_UNIFIED_BE_LSAN_TEST(expr-test "Instantiations/ExprTest.*")
# Exception to unified be tests: custom main initiailizes LLVM
ADD_BE_LSAN_TEST(expr-codegen-test)
diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 5b87d0b..09629b8 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -1612,21 +1612,31 @@ BigIntVal AggregateFunctions::HllFinalize(FunctionContext* ctx, const StringVal&
return estimate;
}
-/// Auxiliary function that receives an input type that has a serialize_compact()
-/// function (e.g. hll_sketch or hll_union) and returns the serialized version of it
-/// wrapped into a StringVal.
-/// Introducing this variable in the .cc to avoid including the whole DataSketches HLL
+/// Auxiliary function that receives a hll_sketch and returns the serialized version of
+/// it wrapped into a StringVal.
+/// Introducing this function in the .cc to avoid including the whole DataSketches HLL
/// functionality into the header.
-template <typename T>
-StringVal SerializeCompactDsHll(FunctionContext* ctx, const T& input) {
+StringVal SerializeCompactDsHllSketch(FunctionContext* ctx,
+ const datasketches::hll_sketch& sketch) {
std::stringstream serialized_input;
- input.serialize_compact(serialized_input);
+ sketch.serialize_compact(serialized_input);
std::string serialized_input_str = serialized_input.str();
StringVal dst(ctx, serialized_input_str.size());
memcpy(dst.ptr, serialized_input_str.c_str(), serialized_input_str.size());
return dst;
}
+/// Auxiliary function that receives a hll_union, gets the underlying HLL sketch from the
+/// union object and returns the serialized, compacted HLL sketch wrapped into StringVal.
+/// Introducing this function in the .cc to avoid including the whole DataSketches HLL
+/// functionality into the header.
+StringVal SerializeDsHllUnion(FunctionContext* ctx,
+ const datasketches::hll_union& ds_union) {
+ std::stringstream serialized_input;
+ datasketches::hll_sketch sketch = ds_union.get_result(DS_HLL_TYPE);
+ return SerializeCompactDsHllSketch(ctx, sketch);
+}
+
void AggregateFunctions::DsHllInit(FunctionContext* ctx, StringVal* dst) {
AllocBuffer(ctx, dst, sizeof(datasketches::hll_sketch));
if (UNLIKELY(dst->is_null)) {
@@ -1670,7 +1680,7 @@ StringVal AggregateFunctions::DsHllSerialize(FunctionContext* ctx,
DCHECK_EQ(src.len, sizeof(datasketches::hll_sketch));
datasketches::hll_sketch* sketch_ptr =
reinterpret_cast<datasketches::hll_sketch*>(src.ptr);
- StringVal dst = SerializeCompactDsHll(ctx, *sketch_ptr);
+ StringVal dst = SerializeCompactDsHllSketch(ctx, *sketch_ptr);
ctx->Free(src.ptr);
return dst;
}
@@ -1711,7 +1721,7 @@ StringVal AggregateFunctions::DsHllFinalizeSketch(FunctionContext* ctx,
reinterpret_cast<datasketches::hll_sketch*>(src.ptr);
StringVal result_str = StringVal::null();
if (sketch_ptr->get_estimate() > 0.0) {
- result_str = SerializeCompactDsHll(ctx, *sketch_ptr);
+ result_str = SerializeCompactDsHllSketch(ctx, *sketch_ptr);
}
ctx->Free(src.ptr);
return result_str;
@@ -1751,7 +1761,7 @@ StringVal AggregateFunctions::DsHllUnionSerialize(FunctionContext* ctx,
DCHECK_EQ(src.len, sizeof(datasketches::hll_union));
datasketches::hll_union* union_ptr =
reinterpret_cast<datasketches::hll_union*>(src.ptr);
- StringVal dst = SerializeCompactDsHll(ctx, *union_ptr);
+ StringVal dst = SerializeDsHllUnion(ctx, *union_ptr);
ctx->Free(src.ptr);
return dst;
}
@@ -1762,13 +1772,14 @@ void AggregateFunctions::DsHllUnionMerge(
DCHECK(!dst->is_null);
DCHECK_EQ(dst->len, sizeof(datasketches::hll_union));
- datasketches::hll_union src_union =
- datasketches::hll_union::deserialize(reinterpret_cast<char*>(src.ptr), src.len);
+ // Note, 'src' is a serialized hll_sketch and not a serialized hll_union.
+ datasketches::hll_sketch src_sketch =
+ datasketches::hll_sketch::deserialize(reinterpret_cast<char*>(src.ptr), src.len);
datasketches::hll_union* dst_union_ptr =
reinterpret_cast<datasketches::hll_union*>(dst->ptr);
- dst_union_ptr->update(src_union.get_result(DS_HLL_TYPE));
+ dst_union_ptr->update(src_sketch);
}
StringVal AggregateFunctions::DsHllUnionFinalize(FunctionContext* ctx,
@@ -1782,7 +1793,7 @@ StringVal AggregateFunctions::DsHllUnionFinalize(FunctionContext* ctx,
ctx->Free(src.ptr);
return StringVal::null();
}
- StringVal result = SerializeCompactDsHll(ctx, sketch);
+ StringVal result = SerializeCompactDsHllSketch(ctx, sketch);
ctx->Free(src.ptr);
return result;
}
diff --git a/be/src/exprs/datasketches-test.cc b/be/src/exprs/datasketches-test.cc
index afd2505..a4175f7 100644
--- a/be/src/exprs/datasketches-test.cc
+++ b/be/src/exprs/datasketches-test.cc
@@ -16,8 +16,10 @@
// under the License.
#include "thirdparty/datasketches/hll.hpp"
+#include "thirdparty/datasketches/kll_sketch.hpp"
#include <sstream>
+#include <stdlib.h>
#include "testutil/gtest-util.h"
@@ -29,8 +31,7 @@ namespace impala {
// The below code is mostly a copy-paste from the example code found on the official
// DataSketches web page: https://datasketches.apache.org/docs/HLL/HllCppExample.html
// The purpose is to create 2 HLL sketches that have overlap in their data, serialize
-// these sketches into files, deserialize them and give a cardinality estimate combining
-// the 2 sketches.
+// them, deserialize them and give a cardinality estimate combining the 2 sketches.
TEST(TestDataSketchesHll, UseDataSketchesInterface) {
const int lg_k = 11;
const auto type = datasketches::HLL_4;
@@ -69,4 +70,42 @@ TEST(TestDataSketchesHll, UseDataSketchesInterface) {
}
}
+
+// This test is meant to cover that the KLL algorithm from the DataSketches library can
+// be imported into Impala, builds without errors and the basic functionality is
+// available to use.
+// The below code is mostly a copy-paste from the example code found on the official
+// DataSketches web page:
+// https://datasketches.apache.org/docs/Quantiles/QuantilesCppExample.html
+// The purpose is to create 2 KLL sketches that have overlap in their data, serialize
+// them, deserialize them and get an estimate for quantiles after combining the 2
+// sketches.
+TEST(TestDataSketchesKll, UseDataSketchesInterface) {
+ std::stringstream sketch_stream1;
+ std::stringstream sketch_stream2;
+ {
+ datasketches::kll_sketch<float> sketch1;
+ for (int i = 0; i < 100000; ++i) sketch1.update(i);
+ sketch1.serialize(sketch_stream1);
+
+ datasketches::kll_sketch<float> sketch2;
+ for (int i = 30000; i < 130000; ++i) sketch2.update(i);
+ sketch2.serialize(sketch_stream2);
+ }
+
+ {
+ auto sketch1 = datasketches::kll_sketch<float>::deserialize(sketch_stream1);
+ auto sketch2 = datasketches::kll_sketch<float>::deserialize(sketch_stream2);
+ sketch1.merge(sketch2);
+
+ const double fractions[3] {0, 0.5, 1};
+ auto quantiles = sketch1.get_quantiles(fractions, 3);
+ EXPECT_EQ(0, quantiles[0]);
+ // The median is an approximate. Here we check that it is in 2% error range.
+ int exact_median = 65000;
+ EXPECT_LE(abs(quantiles[1] - exact_median), exact_median * 0.02);
+ EXPECT_EQ(129999, quantiles[2]);
+ }
+}
+
}
diff --git a/be/src/thirdparty/datasketches/AuxHashMap-internal.hpp b/be/src/thirdparty/datasketches/AuxHashMap-internal.hpp
index 0d7db7a..9a8e135 100644
--- a/be/src/thirdparty/datasketches/AuxHashMap-internal.hpp
+++ b/be/src/thirdparty/datasketches/AuxHashMap-internal.hpp
@@ -75,7 +75,7 @@ AuxHashMap<A>* AuxHashMap<A>::deserialize(const void* bytes, size_t len,
const int* auxPtr = static_cast<const int*>(bytes);
if (srcCompact) {
if (len < auxCount * sizeof(int)) {
- throw std::invalid_argument("Input array too small to hold AuxHashMap image");
+ throw std::out_of_range("Input array too small to hold AuxHashMap image");
}
auxHashMap = new (ahmAlloc().allocate(1)) AuxHashMap<A>(lgArrInts, lgConfigK);
for (int i = 0; i < auxCount; ++i) {
@@ -87,7 +87,7 @@ AuxHashMap<A>* AuxHashMap<A>::deserialize(const void* bytes, size_t len,
} else { // updatable
int itemsToRead = 1 << lgAuxArrInts;
if (len < itemsToRead * sizeof(int)) {
- throw std::invalid_argument("Input array too small to hold AuxHashMap image");
+ throw std::out_of_range("Input array too small to hold AuxHashMap image");
}
auxHashMap = new (ahmAlloc().allocate(1)) AuxHashMap<A>(lgArrInts, lgConfigK);
for (int i = 0; i < itemsToRead; ++i) {
@@ -118,8 +118,10 @@ AuxHashMap<A>* AuxHashMap<A>::deserialize(std::istream& is, const int lgConfigK,
lgArrInts = lgAuxArrInts;
}
- // TODO: truncated stream will throw exception without freeing memory
AuxHashMap<A>* auxHashMap = new (ahmAlloc().allocate(1)) AuxHashMap<A>(lgArrInts, lgConfigK);
+ typedef std::unique_ptr<AuxHashMap<A>, std::function<void(AuxHashMap<A>*)>> aux_hash_map_ptr;
+ aux_hash_map_ptr aux_ptr(auxHashMap, auxHashMap->make_deleter());
+
int configKmask = (1 << lgConfigK) - 1;
if (srcCompact) {
@@ -147,7 +149,7 @@ AuxHashMap<A>* AuxHashMap<A>::deserialize(std::istream& is, const int lgConfigK,
throw std::invalid_argument("Deserialized AuxHashMap has wrong number of entries");
}
- return auxHashMap;
+ return aux_ptr.release();
}
template<typename A>
@@ -159,7 +161,7 @@ AuxHashMap<A>::~AuxHashMap<A>() {
template<typename A>
std::function<void(AuxHashMap<A>*)> AuxHashMap<A>::make_deleter() {
- return [](AuxHashMap<A>* ptr) {
+ return [](AuxHashMap<A>* ptr) {
ptr->~AuxHashMap();
ahmAlloc().deallocate(ptr, 1);
};
diff --git a/be/src/thirdparty/datasketches/CommonUtil.hpp b/be/src/thirdparty/datasketches/CommonUtil.hpp
deleted file mode 100644
index 55e7dfc..0000000
--- a/be/src/thirdparty/datasketches/CommonUtil.hpp
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// author Kevin Lang, Yahoo Research
-// author Jon Malkin, Yahoo Research
-
-#ifndef _COMMONUTIL_HPP_
-#define _COMMONUTIL_HPP_
-
-#include <cstdint>
-
-namespace datasketches {
-
-class CommonUtil final {
- public:
- static unsigned int getNumberOfLeadingZeros(uint64_t x);
-};
-
-#define FCLZ_MASK_56 ((uint64_t) 0x00ffffffffffffff)
-#define FCLZ_MASK_48 ((uint64_t) 0x0000ffffffffffff)
-#define FCLZ_MASK_40 ((uint64_t) 0x000000ffffffffff)
-#define FCLZ_MASK_32 ((uint64_t) 0x00000000ffffffff)
-#define FCLZ_MASK_24 ((uint64_t) 0x0000000000ffffff)
-#define FCLZ_MASK_16 ((uint64_t) 0x000000000000ffff)
-#define FCLZ_MASK_08 ((uint64_t) 0x00000000000000ff)
-
-inline unsigned int CommonUtil::getNumberOfLeadingZeros(const uint64_t x) {
- if (x == 0)
- return 64;
-
- static const uint8_t clzByteCount[256] = {8,7,6,6,5,5,5,5,4,4,4,4,4,4,4,4,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,3,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, [...]
-
- if (x > FCLZ_MASK_56)
- return ((unsigned int) ( 0 + clzByteCount[(x >> 56) & FCLZ_MASK_08]));
- if (x > FCLZ_MASK_48)
- return ((unsigned int) ( 8 + clzByteCount[(x >> 48) & FCLZ_MASK_08]));
- if (x > FCLZ_MASK_40)
- return ((unsigned int) (16 + clzByteCount[(x >> 40) & FCLZ_MASK_08]));
- if (x > FCLZ_MASK_32)
- return ((unsigned int) (24 + clzByteCount[(x >> 32) & FCLZ_MASK_08]));
- if (x > FCLZ_MASK_24)
- return ((unsigned int) (32 + clzByteCount[(x >> 24) & FCLZ_MASK_08]));
- if (x > FCLZ_MASK_16)
- return ((unsigned int) (40 + clzByteCount[(x >> 16) & FCLZ_MASK_08]));
- if (x > FCLZ_MASK_08)
- return ((unsigned int) (48 + clzByteCount[(x >> 8) & FCLZ_MASK_08]));
- if (1)
- return ((unsigned int) (56 + clzByteCount[(x >> 0) & FCLZ_MASK_08]));
-
-}
-
-
-}
-
-#endif // _COMMONUTIL_HPP_
\ No newline at end of file
diff --git a/be/src/thirdparty/datasketches/CompositeInterpolationXTable-internal.hpp b/be/src/thirdparty/datasketches/CompositeInterpolationXTable-internal.hpp
index a3d0302..10aa047 100644
--- a/be/src/thirdparty/datasketches/CompositeInterpolationXTable-internal.hpp
+++ b/be/src/thirdparty/datasketches/CompositeInterpolationXTable-internal.hpp
@@ -36,7 +36,7 @@ static const int yStrides[] =
{1, 2, 3, 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480, 40960, 81920};
template<typename A>
-const int CompositeInterpolationXTable<A>::get_y_stride(const int logK) {
+int CompositeInterpolationXTable<A>::get_y_stride(const int logK) {
if (logK < HllUtil<A>::MIN_LOG_K || logK > HllUtil<A>::MAX_LOG_K) {
throw std::invalid_argument("logK must be in range [" + std::to_string(HllUtil<A>::MIN_LOG_K)
+ ", " + std::to_string(HllUtil<A>::MAX_LOG_K) + "]. Found: "
@@ -46,7 +46,7 @@ const int CompositeInterpolationXTable<A>::get_y_stride(const int logK) {
}
template<typename A>
-const int CompositeInterpolationXTable<A>::get_x_arr_length(const int logK) {
+int CompositeInterpolationXTable<A>::get_x_arr_length() {
return numXArrValues;
}
@@ -797,7 +797,7 @@ static const double xArr[18][numXArrValues] = {
};
template<typename A>
-const double* const CompositeInterpolationXTable<A>::get_x_arr(const int logK) {
+const double* CompositeInterpolationXTable<A>::get_x_arr(const int logK) {
if (logK < HllUtil<A>::MIN_LOG_K || logK > HllUtil<A>::MAX_LOG_K) {
throw std::invalid_argument("logK must be in range [" + std::to_string(HllUtil<A>::MIN_LOG_K)
+ ", " + std::to_string(HllUtil<A>::MAX_LOG_K) + "]. Found: "
diff --git a/be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp b/be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp
index 67563c5..8baecbe 100644
--- a/be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp
+++ b/be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp
@@ -27,10 +27,10 @@ namespace datasketches {
template<typename A = std::allocator<char>>
class CompositeInterpolationXTable {
public:
- static const int get_y_stride(int logK);
+ static int get_y_stride(int logK);
- static const double* const get_x_arr(int logK);
- static const int get_x_arr_length(int logK);
+ static const double* get_x_arr(int logK);
+ static int get_x_arr_length();
};
}
diff --git a/be/src/thirdparty/datasketches/CouponHashSet-internal.hpp b/be/src/thirdparty/datasketches/CouponHashSet-internal.hpp
index 84c8199..35facfe 100644
--- a/be/src/thirdparty/datasketches/CouponHashSet-internal.hpp
+++ b/be/src/thirdparty/datasketches/CouponHashSet-internal.hpp
@@ -63,7 +63,7 @@ std::function<void(HllSketchImpl<A>*)> CouponHashSet<A>::get_deleter() const {
template<typename A>
CouponHashSet<A>* CouponHashSet<A>::newSet(const void* bytes, size_t len) {
if (len < HllUtil<A>::HASH_SET_INT_ARR_START) { // hard-coded
- throw std::invalid_argument("Input data length insufficient to hold CouponHashSet");
+ throw std::out_of_range("Input data length insufficient to hold CouponHashSet");
}
const uint8_t* data = static_cast<const uint8_t*>(bytes);
@@ -102,12 +102,11 @@ CouponHashSet<A>* CouponHashSet<A>::newSet(const void* bytes, size_t len) {
const int couponsInArray = (compactFlag ? couponCount : (1 << lgArrInts));
const size_t expectedLength = HllUtil<A>::HASH_SET_INT_ARR_START + (couponsInArray * sizeof(int));
if (len < expectedLength) {
- throw std::invalid_argument("Byte array too short for sketch. Expected " + std::to_string(expectedLength)
+ throw std::out_of_range("Byte array too short for sketch. Expected " + std::to_string(expectedLength)
+ ", found: " + std::to_string(len));
}
CouponHashSet<A>* sketch = new (chsAlloc().allocate(1)) CouponHashSet<A>(lgK, tgtHllType);
- sketch->putOutOfOrderFlag(true);
if (compactFlag) {
const uint8_t* curPos = data + HllUtil<A>::HASH_SET_INT_ARR_START;
@@ -170,7 +169,8 @@ CouponHashSet<A>* CouponHashSet<A>::newSet(std::istream& is) {
}
CouponHashSet<A>* sketch = new (chsAlloc().allocate(1)) CouponHashSet<A>(lgK, tgtHllType);
- sketch->putOutOfOrderFlag(true);
+ typedef std::unique_ptr<CouponHashSet<A>, std::function<void(HllSketchImpl<A>*)>> coupon_hash_set_ptr;
+ coupon_hash_set_ptr ptr(sketch, sketch->get_deleter());
// Don't set couponCount here;
// we'll set later if updatable, and increment with updates if compact
@@ -181,18 +181,19 @@ CouponHashSet<A>* CouponHashSet<A>::newSet(std::istream& is) {
sketch->couponUpdate(coupon);
}
} else {
- int* oldArr = sketch->couponIntArr;
- const size_t oldArrLen = 1 << sketch->lgCouponArrInts;
- sketch->lgCouponArrInts = lgArrInts;
typedef typename std::allocator_traits<A>::template rebind_alloc<int> intAlloc;
+ intAlloc().deallocate(sketch->couponIntArr, 1 << sketch->lgCouponArrInts);
+ sketch->lgCouponArrInts = lgArrInts;
sketch->couponIntArr = intAlloc().allocate(1 << lgArrInts);
sketch->couponCount = couponCount;
// for stream processing, read entire list so read pointer ends up set correctly
is.read((char*)sketch->couponIntArr, (1 << sketch->lgCouponArrInts) * sizeof(int));
- intAlloc().deallocate(oldArr, oldArrLen);
}
- return sketch;
+ if (!is.good())
+ throw std::runtime_error("error reading from std::istream");
+
+ return ptr.release();
}
template<typename A>
diff --git a/be/src/thirdparty/datasketches/CouponList-internal.hpp b/be/src/thirdparty/datasketches/CouponList-internal.hpp
index f9d3ca0..1800a37 100644
--- a/be/src/thirdparty/datasketches/CouponList-internal.hpp
+++ b/be/src/thirdparty/datasketches/CouponList-internal.hpp
@@ -34,11 +34,10 @@ CouponList<A>::CouponList(const int lgConfigK, const target_hll_type tgtHllType,
: HllSketchImpl<A>(lgConfigK, tgtHllType, mode, false) {
if (mode == hll_mode::LIST) {
lgCouponArrInts = HllUtil<A>::LG_INIT_LIST_SIZE;
- oooFlag = false;
} else { // mode == SET
lgCouponArrInts = HllUtil<A>::LG_INIT_SET_SIZE;
- oooFlag = true;
}
+ oooFlag = false;
const int arrayLen = 1 << lgCouponArrInts;
typedef typename std::allocator_traits<A>::template rebind_alloc<int> intAlloc;
couponIntArr = intAlloc().allocate(arrayLen);
@@ -100,7 +99,7 @@ CouponList<A>* CouponList<A>::copyAs(target_hll_type tgtHllType) const {
template<typename A>
CouponList<A>* CouponList<A>::newList(const void* bytes, size_t len) {
if (len < HllUtil<A>::LIST_INT_ARR_START) {
- throw std::invalid_argument("Input data length insufficient to hold CouponHashSet");
+ throw std::out_of_range("Input data length insufficient to hold CouponHashSet");
}
const uint8_t* data = static_cast<const uint8_t*>(bytes);
@@ -130,7 +129,7 @@ CouponList<A>* CouponList<A>::newList(const void* bytes, size_t len) {
const int couponsInArray = (compact ? couponCount : (1 << HllUtil<A>::computeLgArrInts(LIST, couponCount, lgK)));
const size_t expectedLength = HllUtil<A>::LIST_INT_ARR_START + (couponsInArray * sizeof(int));
if (len < expectedLength) {
- throw std::invalid_argument("Byte array too short for sketch. Expected " + std::to_string(expectedLength)
+ throw std::out_of_range("Byte array too short for sketch. Expected " + std::to_string(expectedLength)
+ ", found: " + std::to_string(len));
}
@@ -174,6 +173,8 @@ CouponList<A>* CouponList<A>::newList(std::istream& is) {
const bool emptyFlag = ((listHeader[HllUtil<A>::FLAGS_BYTE] & HllUtil<A>::EMPTY_FLAG_MASK) ? true : false);
CouponList<A>* sketch = new (clAlloc().allocate(1)) CouponList<A>(lgK, tgtHllType, mode);
+ typedef std::unique_ptr<CouponList<A>, std::function<void(HllSketchImpl<A>*)>> coupon_list_ptr;
+ coupon_list_ptr ptr(sketch, sketch->get_deleter());
const int couponCount = listHeader[HllUtil<A>::LIST_COUNT_BYTE];
sketch->couponCount = couponCount;
sketch->putOutOfOrderFlag(oooFlag); // should always be false for LIST
@@ -186,7 +187,10 @@ CouponList<A>* CouponList<A>::newList(std::istream& is) {
is.read((char*)sketch->couponIntArr, numToRead * sizeof(int));
}
- return sketch;
+ if (!is.good())
+ throw std::runtime_error("error reading from std::istream");
+
+ return ptr.release();
}
template<typename A>
@@ -296,9 +300,9 @@ HllSketchImpl<A>* CouponList<A>::couponUpdate(int coupon) {
++couponCount;
if (couponCount >= len) { // array full
if (this->lgConfigK < 8) {
- return promoteHeapListOrSetToHll(*this); // oooFlag = false
+ return promoteHeapListOrSetToHll(*this);
}
- return promoteHeapListToSet(*this); // oooFlag = true;
+ return promoteHeapListToSet(*this);
}
return this;
}
diff --git a/be/src/thirdparty/datasketches/Hll4Array-internal.hpp b/be/src/thirdparty/datasketches/Hll4Array-internal.hpp
index 30b24f7..8498bb8 100644
--- a/be/src/thirdparty/datasketches/Hll4Array-internal.hpp
+++ b/be/src/thirdparty/datasketches/Hll4Array-internal.hpp
@@ -175,7 +175,7 @@ void Hll4Array<A>::internalHll4Update(const int slotNo, const int newVal) {
//assert(shiftedNewValue >= 0);
if (rawStoredOldValue == HllUtil<A>::AUX_TOKEN) { // 879
- // Given that we have an AUX_TOKEN, tehre are 4 cases for how to
+ // Given that we have an AUX_TOKEN, there are 4 cases for how to
// actually modify the data structure
if (shiftedNewValue >= HllUtil<A>::AUX_TOKEN) { // case 1: 881
diff --git a/be/src/thirdparty/datasketches/HllArray-internal.hpp b/be/src/thirdparty/datasketches/HllArray-internal.hpp
index b8c0a57..0a4bdce 100644
--- a/be/src/thirdparty/datasketches/HllArray-internal.hpp
+++ b/be/src/thirdparty/datasketches/HllArray-internal.hpp
@@ -95,7 +95,7 @@ HllArray<A>* HllArray<A>::copyAs(const target_hll_type tgtHllType) const {
template<typename A>
HllArray<A>* HllArray<A>::newHll(const void* bytes, size_t len) {
if (len < HllUtil<A>::HLL_BYTE_ARR_START) {
- throw std::invalid_argument("Input data length insufficient to hold HLL array");
+ throw std::out_of_range("Input data length insufficient to hold HLL array");
}
const uint8_t* data = static_cast<const uint8_t*>(bytes);
@@ -124,7 +124,7 @@ HllArray<A>* HllArray<A>::newHll(const void* bytes, size_t len) {
const int arrayBytes = hllArrBytes(tgtHllType, lgK);
if (len < static_cast<size_t>(HllUtil<A>::HLL_BYTE_ARR_START + arrayBytes)) {
- throw std::invalid_argument("Input array too small to hold sketch image");
+ throw std::out_of_range("Input array too small to hold sketch image");
}
double hip, kxq0, kxq1;
@@ -137,17 +137,20 @@ HllArray<A>* HllArray<A>::newHll(const void* bytes, size_t len) {
std::memcpy(&auxCount, data + HllUtil<A>::AUX_COUNT_INT, sizeof(int));
AuxHashMap<A>* auxHashMap = nullptr;
+ typedef std::unique_ptr<AuxHashMap<A>, std::function<void(AuxHashMap<A>*)>> aux_hash_map_ptr;
+ aux_hash_map_ptr aux_ptr;
if (auxCount > 0) { // necessarily TgtHllType == HLL_4
int auxLgIntArrSize = (int) data[4];
const size_t offset = HllUtil<A>::HLL_BYTE_ARR_START + arrayBytes;
const uint8_t* auxDataStart = data + offset;
auxHashMap = AuxHashMap<A>::deserialize(auxDataStart, len - offset, lgK, auxCount, auxLgIntArrSize, comapctFlag);
+ aux_ptr = aux_hash_map_ptr(auxHashMap, auxHashMap->make_deleter());
}
HllArray<A>* sketch = HllSketchImplFactory<A>::newHll(lgK, tgtHllType, startFullSizeFlag);
sketch->putCurMin(curMin);
sketch->putOutOfOrderFlag(oooFlag);
- sketch->putHipAccum(hip);
+ if (!oooFlag) sketch->putHipAccum(hip);
sketch->putKxQ0(kxq0);
sketch->putKxQ1(kxq1);
sketch->putNumAtCurMin(numAtCurMin);
@@ -157,6 +160,7 @@ HllArray<A>* HllArray<A>::newHll(const void* bytes, size_t len) {
if (auxHashMap != nullptr)
((Hll4Array<A>*)sketch)->putAuxHashMap(auxHashMap);
+ aux_ptr.release();
return sketch;
}
@@ -188,8 +192,9 @@ HllArray<A>* HllArray<A>::newHll(std::istream& is) {
const int lgK = (int) listHeader[HllUtil<A>::LG_K_BYTE];
const int curMin = (int) listHeader[HllUtil<A>::HLL_CUR_MIN_BYTE];
- // TODO: truncated stream will throw exception without freeing memory
HllArray* sketch = HllSketchImplFactory<A>::newHll(lgK, tgtHllType, startFullSizeFlag);
+ typedef std::unique_ptr<HllArray<A>, std::function<void(HllSketchImpl<A>*)>> hll_array_ptr;
+ hll_array_ptr sketch_ptr(sketch, sketch->get_deleter());
sketch->putCurMin(curMin);
sketch->putOutOfOrderFlag(oooFlag);
@@ -197,7 +202,7 @@ HllArray<A>* HllArray<A>::newHll(std::istream& is) {
is.read((char*)&hip, sizeof(hip));
is.read((char*)&kxq0, sizeof(kxq0));
is.read((char*)&kxq1, sizeof(kxq1));
- sketch->putHipAccum(hip);
+ if (!oooFlag) sketch->putHipAccum(hip);
sketch->putKxQ0(kxq0);
sketch->putKxQ1(kxq1);
@@ -214,7 +219,10 @@ HllArray<A>* HllArray<A>::newHll(std::istream& is) {
((Hll4Array<A>*)sketch)->putAuxHashMap(auxHashMap);
}
- return sketch;
+ if (!is.good())
+ throw std::runtime_error("error reading from std::istream");
+
+ return sketch_ptr.release();
}
template<typename A>
@@ -405,7 +413,7 @@ double HllArray<A>::getCompositeEstimate() const {
const double rawEst = getHllRawEstimate(this->lgConfigK, kxq0 + kxq1);
const double* xArr = CompositeInterpolationXTable<A>::get_x_arr(this->lgConfigK);
- const int xArrLen = CompositeInterpolationXTable<A>::get_x_arr_length(this->lgConfigK);
+ const int xArrLen = CompositeInterpolationXTable<A>::get_x_arr_length();
const double yStride = CompositeInterpolationXTable<A>::get_y_stride(this->lgConfigK);
if (rawEst < xArr[0]) {
@@ -588,7 +596,7 @@ template<typename A>
void HllArray<A>::hipAndKxQIncrementalUpdate(uint8_t oldValue, uint8_t newValue) {
const int configK = 1 << this->getLgConfigK();
// update hip BEFORE updating kxq
- hipAccum += configK / (kxq0 + kxq1);
+ if (!oooFlag) hipAccum += configK / (kxq0 + kxq1);
// update kxq0 and kxq1; subtract first, then add
if (oldValue < 32) { kxq0 -= INVERSE_POWERS_OF_2[oldValue]; }
else { kxq1 -= INVERSE_POWERS_OF_2[oldValue]; }
diff --git a/be/src/thirdparty/datasketches/HllSketch-internal.hpp b/be/src/thirdparty/datasketches/HllSketch-internal.hpp
index 6587fe8..dd16955 100644
--- a/be/src/thirdparty/datasketches/HllSketch-internal.hpp
+++ b/be/src/thirdparty/datasketches/HllSketch-internal.hpp
@@ -25,6 +25,7 @@
#include "HllSketchImplFactory.hpp"
#include "CouponList.hpp"
#include "HllArray.hpp"
+#include "common_defs.hpp"
#include <cstdio>
#include <cstdlib>
@@ -73,11 +74,6 @@ hll_sketch_alloc<A>::~hll_sketch_alloc() {
}
template<typename A>
-std::ostream& operator<<(std::ostream& os, const hll_sketch_alloc<A>& sketch) {
- return sketch.to_string(os, true, true, false, false);
-}
-
-template<typename A>
hll_sketch_alloc<A>::hll_sketch_alloc(const hll_sketch_alloc<A>& that) :
sketch_impl(that.sketch_impl->copy())
{}
@@ -123,7 +119,7 @@ template<typename A>
void hll_sketch_alloc<A>::update(const std::string& datum) {
if (datum.empty()) { return; }
HashState hashResult;
- HllUtil<A>::hash(datum.c_str(), datum.length(), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+ HllUtil<A>::hash(datum.c_str(), datum.length(), DEFAULT_SEED, hashResult);
coupon_update(HllUtil<A>::coupon(hashResult));
}
@@ -131,7 +127,7 @@ template<typename A>
void hll_sketch_alloc<A>::update(const uint64_t datum) {
// no sign extension with 64 bits so no need to cast to signed value
HashState hashResult;
- HllUtil<A>::hash(&datum, sizeof(uint64_t), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+ HllUtil<A>::hash(&datum, sizeof(uint64_t), DEFAULT_SEED, hashResult);
coupon_update(HllUtil<A>::coupon(hashResult));
}
@@ -153,7 +149,7 @@ void hll_sketch_alloc<A>::update(const uint8_t datum) {
template<typename A>
void hll_sketch_alloc<A>::update(const int64_t datum) {
HashState hashResult;
- HllUtil<A>::hash(&datum, sizeof(int64_t), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+ HllUtil<A>::hash(&datum, sizeof(int64_t), DEFAULT_SEED, hashResult);
coupon_update(HllUtil<A>::coupon(hashResult));
}
@@ -161,7 +157,7 @@ template<typename A>
void hll_sketch_alloc<A>::update(const int32_t datum) {
int64_t val = static_cast<int64_t>(datum);
HashState hashResult;
- HllUtil<A>::hash(&val, sizeof(int64_t), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+ HllUtil<A>::hash(&val, sizeof(int64_t), DEFAULT_SEED, hashResult);
coupon_update(HllUtil<A>::coupon(hashResult));
}
@@ -169,7 +165,7 @@ template<typename A>
void hll_sketch_alloc<A>::update(const int16_t datum) {
int64_t val = static_cast<int64_t>(datum);
HashState hashResult;
- HllUtil<A>::hash(&val, sizeof(int64_t), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+ HllUtil<A>::hash(&val, sizeof(int64_t), DEFAULT_SEED, hashResult);
coupon_update(HllUtil<A>::coupon(hashResult));
}
@@ -177,7 +173,7 @@ template<typename A>
void hll_sketch_alloc<A>::update(const int8_t datum) {
int64_t val = static_cast<int64_t>(datum);
HashState hashResult;
- HllUtil<A>::hash(&val, sizeof(int64_t), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+ HllUtil<A>::hash(&val, sizeof(int64_t), DEFAULT_SEED, hashResult);
coupon_update(HllUtil<A>::coupon(hashResult));
}
@@ -191,7 +187,7 @@ void hll_sketch_alloc<A>::update(const double datum) {
d.longBytes = 0x7ff8000000000000L; // canonicalize NaN using value from Java's Double.doubleToLongBits()
}
HashState hashResult;
- HllUtil<A>::hash(&d, sizeof(double), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+ HllUtil<A>::hash(&d, sizeof(double), DEFAULT_SEED, hashResult);
coupon_update(HllUtil<A>::coupon(hashResult));
}
@@ -205,7 +201,7 @@ void hll_sketch_alloc<A>::update(const float datum) {
d.longBytes = 0x7ff8000000000000L; // canonicalize NaN using value from Java's Double.doubleToLongBits()
}
HashState hashResult;
- HllUtil<A>::hash(&d, sizeof(double), HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+ HllUtil<A>::hash(&d, sizeof(double), DEFAULT_SEED, hashResult);
coupon_update(HllUtil<A>::coupon(hashResult));
}
@@ -213,7 +209,7 @@ template<typename A>
void hll_sketch_alloc<A>::update(const void* data, const size_t lengthBytes) {
if (data == nullptr) { return; }
HashState hashResult;
- HllUtil<A>::hash(data, lengthBytes, HllUtil<A>::DEFAULT_UPDATE_SEED, hashResult);
+ HllUtil<A>::hash(data, lengthBytes, DEFAULT_SEED, hashResult);
coupon_update(HllUtil<A>::coupon(hashResult));
}
@@ -248,21 +244,11 @@ vector_u8<A> hll_sketch_alloc<A>::serialize_updatable() const {
}
template<typename A>
-std::string hll_sketch_alloc<A>::to_string(const bool summary,
- const bool detail,
- const bool aux_detail,
- const bool all) const {
- std::ostringstream oss;
- to_string(oss, summary, detail, aux_detail, all);
- return oss.str();
-}
-
-template<typename A>
-std::ostream& hll_sketch_alloc<A>::to_string(std::ostream& os,
- const bool summary,
- const bool detail,
- const bool aux_detail,
- const bool all) const {
+string<A> hll_sketch_alloc<A>::to_string(const bool summary,
+ const bool detail,
+ const bool aux_detail,
+ const bool all) const {
+ std::basic_ostringstream<char, std::char_traits<char>, AllocChar<A>> os;
if (summary) {
os << "### HLL sketch summary:" << std::endl
<< " Log Config K : " << get_lg_config_k() << std::endl
@@ -279,6 +265,10 @@ std::ostream& hll_sketch_alloc<A>::to_string(std::ostream& os,
<< " HipAccum : " << hllArray->getHipAccum() << std::endl
<< " KxQ0 : " << hllArray->getKxQ0() << std::endl
<< " KxQ1 : " << hllArray->getKxQ1() << std::endl;
+ if (get_target_type() == HLL_4) {
+ const Hll4Array<A>* hll4_ptr = static_cast<const Hll4Array<A>*>(sketch_impl);
+ os << " Aux table? : " << (hll4_ptr->getAuxHashMap() != nullptr ? "true" : "false") << std::endl;
+ }
} else {
os << " Coupon count : "
<< std::to_string(((CouponList<A>*) sketch_impl)->getCouponCount()) << std::endl;
@@ -350,7 +340,7 @@ std::ostream& hll_sketch_alloc<A>::to_string(std::ostream& os,
}
}
- return os;
+ return os.str();
}
template<typename A>
diff --git a/be/src/thirdparty/datasketches/HllSketchImplFactory.hpp b/be/src/thirdparty/datasketches/HllSketchImplFactory.hpp
index eae6f75..eb8dd77 100644
--- a/be/src/thirdparty/datasketches/HllSketchImplFactory.hpp
+++ b/be/src/thirdparty/datasketches/HllSketchImplFactory.hpp
@@ -56,7 +56,6 @@ CouponHashSet<A>* HllSketchImplFactory<A>::promoteListToSet(const CouponList<A>&
for (auto coupon: list) {
chSet->couponUpdate(coupon);
}
- chSet->putOutOfOrderFlag(true);
return chSet;
}
diff --git a/be/src/thirdparty/datasketches/HllUnion-internal.hpp b/be/src/thirdparty/datasketches/HllUnion-internal.hpp
index 0d039f2..5aa184f 100644
--- a/be/src/thirdparty/datasketches/HllUnion-internal.hpp
+++ b/be/src/thirdparty/datasketches/HllUnion-internal.hpp
@@ -66,11 +66,6 @@ hll_union_alloc<A> hll_union_alloc<A>::deserialize(std::istream& is) {
}
template<typename A>
-static std::ostream& operator<<(std::ostream& os, const hll_union_alloc<A>& hllUnion) {
- return hllUnion.to_string(os, true, true, false, false);
-}
-
-template<typename A>
hll_sketch_alloc<A> hll_union_alloc<A>::get_result(target_hll_type target_type) const {
return hll_sketch_alloc<A>(gadget, target_type);
}
@@ -163,38 +158,6 @@ void hll_union_alloc<A>::coupon_update(const int coupon) {
}
template<typename A>
-vector_u8<A> hll_union_alloc<A>::serialize_compact() const {
- return gadget.serialize_compact();
-}
-
-template<typename A>
-vector_u8<A> hll_union_alloc<A>::serialize_updatable() const {
- return gadget.serialize_updatable();
-}
-
-template<typename A>
-void hll_union_alloc<A>::serialize_compact(std::ostream& os) const {
- return gadget.serialize_compact(os);
-}
-
-template<typename A>
-void hll_union_alloc<A>::serialize_updatable(std::ostream& os) const {
- return gadget.serialize_updatable(os);
-}
-
-template<typename A>
-std::ostream& hll_union_alloc<A>::to_string(std::ostream& os, const bool summary,
- const bool detail, const bool aux_detail, const bool all) const {
- return gadget.to_string(os, summary, detail, aux_detail, all);
-}
-
-template<typename A>
-std::string hll_union_alloc<A>::to_string(const bool summary, const bool detail,
- const bool aux_detail, const bool all) const {
- return gadget.to_string(summary, detail, aux_detail, all);
-}
-
-template<typename A>
double hll_union_alloc<A>::get_estimate() const {
return gadget.get_estimate();
}
@@ -315,7 +278,7 @@ void hll_union_alloc<A>::union_impl(const hll_sketch_alloc<A>& sketch, const int
if (src_impl->getCurMode() == LIST || src_impl->getCurMode() == SET) {
if (dst_impl->isEmpty() && src_impl->getLgConfigK() == dst_impl->getLgConfigK()) {
dst_impl = src_impl->copyAs(HLL_8);
- gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget replaced
+ gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget to be replaced
} else {
const CouponList<A>* src = static_cast<const CouponList<A>*>(src_impl);
for (auto coupon: *src) {
@@ -329,21 +292,22 @@ void hll_union_alloc<A>::union_impl(const hll_sketch_alloc<A>& sketch, const int
const CouponList<A>* src = static_cast<const CouponList<A>*>(dst_impl);
dst_impl = copy_or_downsample(src_impl, lg_max_k);
static_cast<Hll8Array<A>*>(dst_impl)->mergeList(*src);
- gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget replaced
+ gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget to be replaced
} else { // gadget is HLL
if (src_impl->getLgConfigK() < dst_impl->getLgConfigK()) {
dst_impl = copy_or_downsample(dst_impl, sketch.get_lg_config_k());
- gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget replaced
+ gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget to be replaced
}
const HllArray<A>* src = static_cast<const HllArray<A>*>(src_impl);
static_cast<Hll8Array<A>*>(dst_impl)->mergeHll(*src);
dst_impl->putOutOfOrderFlag(true);
+ static_cast<Hll8Array<A>*>(dst_impl)->putHipAccum(0);
}
} else { // src is HLL, gadget is empty
dst_impl = copy_or_downsample(src_impl, lg_max_k);
- gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget replaced
+ gadget.sketch_impl->get_deleter()(gadget.sketch_impl); // gadget to be replaced
}
- gadget.sketch_impl = dst_impl;
+ gadget.sketch_impl = dst_impl; // gadget replaced
}
}
diff --git a/be/src/thirdparty/datasketches/HllUtil.hpp b/be/src/thirdparty/datasketches/HllUtil.hpp
index eb956bc..1bea84a 100644
--- a/be/src/thirdparty/datasketches/HllUtil.hpp
+++ b/be/src/thirdparty/datasketches/HllUtil.hpp
@@ -22,7 +22,8 @@
#include "MurmurHash3.h"
#include "RelativeErrorTables.hpp"
-#include "CommonUtil.hpp"
+#include "count_zeros.hpp"
+#include "common_defs.hpp"
#include <cmath>
#include <stdexcept>
@@ -83,8 +84,6 @@ public:
static const int MIN_LOG_K = 4;
static const int MAX_LOG_K = 21;
- static const uint64_t DEFAULT_UPDATE_SEED = 9001L;
-
static const double HLL_HIP_RSE_FACTOR; // sqrt(log(2.0)) = 0.8325546
static const double HLL_NON_HIP_RSE_FACTOR; // sqrt((3.0 * log(2.0)) - 1.0) = 1.03896
static const double COUPON_RSE_FACTOR; // 0.409 at transition point not the asymptote
@@ -108,7 +107,6 @@ public:
static int coupon(const uint64_t hash[]);
static int coupon(const HashState& hashState);
static void hash(const void* key, int keyLen, uint64_t seed, HashState& result);
-
static int checkLgK(int lgK);
static void checkMemSize(uint64_t minBytes, uint64_t capBytes);
static inline void checkNumStdDev(int numStdDev);
@@ -118,8 +116,6 @@ public:
static double invPow2(int e);
static unsigned int ceilingPowerOf2(unsigned int n);
static unsigned int simpleIntLog2(unsigned int n); // n must be power of 2
- static unsigned int getNumberOfLeadingZeros(uint64_t x);
- static unsigned int numberOfTrailingZeros(uint32_t n);
static int computeLgArrInts(hll_mode mode, int count, int lgConfigK);
static double getRelErr(bool upperBound, bool unioned,
int lgConfigK, int numStdDev);
@@ -144,7 +140,7 @@ const int HllUtil<A>::LG_AUX_ARR_INTS[] = {
template<typename A>
inline int HllUtil<A>::coupon(const uint64_t hash[]) {
int addr26 = (int) (hash[0] & KEY_MASK_26);
- int lz = CommonUtil::getNumberOfLeadingZeros(hash[1]);
+ int lz = count_leading_zeros_in_u64(hash[1]);
int value = ((lz > 62 ? 62 : lz) + 1);
return (value << KEY_BITS_26) | addr26;
}
@@ -152,14 +148,14 @@ inline int HllUtil<A>::coupon(const uint64_t hash[]) {
template<typename A>
inline int HllUtil<A>::coupon(const HashState& hashState) {
int addr26 = (int) (hashState.h1 & KEY_MASK_26);
- int lz = CommonUtil::getNumberOfLeadingZeros(hashState.h2);
+ int lz = count_leading_zeros_in_u64(hashState.h2);
int value = ((lz > 62 ? 62 : lz) + 1);
return (value << KEY_BITS_26) | addr26;
}
template<typename A>
inline void HllUtil<A>::hash(const void* key, const int keyLen, const uint64_t seed, HashState& result) {
- MurmurHash3_x64_128(key, keyLen, DEFAULT_UPDATE_SEED, result);
+ MurmurHash3_x64_128(key, keyLen, seed, result);
}
template<typename A>
@@ -219,7 +215,7 @@ inline double HllUtil<A>::invPow2(const int e) {
// compute the next highest power of 2 of 32-bit n
// taken from https://graphics.stanford.edu/~seander/bithacks.html
template<typename A>
-inline unsigned int HllUtil<A>::ceilingPowerOf2(unsigned int n) {
+inline uint32_t HllUtil<A>::ceilingPowerOf2(uint32_t n) {
--n;
n |= n >> 1;
n |= n >> 2;
@@ -230,45 +226,11 @@ inline unsigned int HllUtil<A>::ceilingPowerOf2(unsigned int n) {
}
template<typename A>
-inline unsigned int HllUtil<A>::simpleIntLog2(unsigned int n) {
+inline uint32_t HllUtil<A>::simpleIntLog2(uint32_t n) {
if (n == 0) {
throw std::logic_error("cannot take log of 0");
}
- const unsigned int e = numberOfTrailingZeros(n);
- return e;
-}
-
-// taken from https://graphics.stanford.edu/~seander/bithacks.html
-// input is 32-bit word to count zero bits on right
-template<typename A>
-inline unsigned int HllUtil<A>::numberOfTrailingZeros(uint32_t v) {
- unsigned int c; // c will be the number of zero bits on the right,
- // so if v is 1101000 (base 2), then c will be 3
- // NOTE: if 0 == v, then c = 31.
- if (v & 0x1) {
- // special case for odd v (assumed to happen half of the time)
- c = 0;
- } else {
- c = 1;
- if ((v & 0xffff) == 0) {
- v >>= 16;
- c += 16;
- }
- if ((v & 0xff) == 0) {
- v >>= 8;
- c += 8;
- }
- if ((v & 0xf) == 0) {
- v >>= 4;
- c += 4;
- }
- if ((v & 0x3) == 0) {
- v >>= 2;
- c += 2;
- }
- c -= v & 0x1;
- }
- return c;
+ return count_trailing_zeros_in_u32(n);
}
template<typename A>
diff --git a/be/src/thirdparty/datasketches/MurmurHash3.h b/be/src/thirdparty/datasketches/MurmurHash3.h
index f68e989..b438c7d 100644
--- a/be/src/thirdparty/datasketches/MurmurHash3.h
+++ b/be/src/thirdparty/datasketches/MurmurHash3.h
@@ -132,22 +132,22 @@ FORCE_INLINE void MurmurHash3_x64_128(const void* key, int lenBytes, uint64_t se
switch(lenBytes & 15)
{
- case 15: k2 ^= ((uint64_t)tail[14]) << 48; //@suppress("No break at end of case")
- case 14: k2 ^= ((uint64_t)tail[13]) << 40; //@suppress("No break at end of case")
- case 13: k2 ^= ((uint64_t)tail[12]) << 32; //@suppress("No break at end of case")
- case 12: k2 ^= ((uint64_t)tail[11]) << 24; //@suppress("No break at end of case")
- case 11: k2 ^= ((uint64_t)tail[10]) << 16; //@suppress("No break at end of case")
- case 10: k2 ^= ((uint64_t)tail[ 9]) << 8; //@suppress("No break at end of case")
+ case 15: k2 ^= ((uint64_t)tail[14]) << 48; // falls through
+ case 14: k2 ^= ((uint64_t)tail[13]) << 40; // falls through
+ case 13: k2 ^= ((uint64_t)tail[12]) << 32; // falls through
+ case 12: k2 ^= ((uint64_t)tail[11]) << 24; // falls through
+ case 11: k2 ^= ((uint64_t)tail[10]) << 16; // falls through
+ case 10: k2 ^= ((uint64_t)tail[ 9]) << 8; // falls through
case 9: k2 ^= ((uint64_t)tail[ 8]) << 0;
k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; out.h2 ^= k2;
- //@suppress("No break at end of case")
- case 8: k1 ^= ((uint64_t)tail[ 7]) << 56; //@suppress("No break at end of case")
- case 7: k1 ^= ((uint64_t)tail[ 6]) << 48; //@suppress("No break at end of case")
- case 6: k1 ^= ((uint64_t)tail[ 5]) << 40; //@suppress("No break at end of case")
- case 5: k1 ^= ((uint64_t)tail[ 4]) << 32; //@suppress("No break at end of case")
- case 4: k1 ^= ((uint64_t)tail[ 3]) << 24; //@suppress("No break at end of case")
- case 3: k1 ^= ((uint64_t)tail[ 2]) << 16; //@suppress("No break at end of case")
- case 2: k1 ^= ((uint64_t)tail[ 1]) << 8; //@suppress("No break at end of case")
+ // falls through
+ case 8: k1 ^= ((uint64_t)tail[ 7]) << 56; // falls through
+ case 7: k1 ^= ((uint64_t)tail[ 6]) << 48; // falls through
+ case 6: k1 ^= ((uint64_t)tail[ 5]) << 40; // falls through
+ case 5: k1 ^= ((uint64_t)tail[ 4]) << 32; // falls through
+ case 4: k1 ^= ((uint64_t)tail[ 3]) << 24; // falls through
+ case 3: k1 ^= ((uint64_t)tail[ 2]) << 16; // falls through
+ case 2: k1 ^= ((uint64_t)tail[ 1]) << 8; // falls through
case 1: k1 ^= ((uint64_t)tail[ 0]) << 0;
k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; out.h1 ^= k1;
};
diff --git a/be/src/thirdparty/datasketches/README.md b/be/src/thirdparty/datasketches/README.md
index 51e1d57..d5c56ce 100644
--- a/be/src/thirdparty/datasketches/README.md
+++ b/be/src/thirdparty/datasketches/README.md
@@ -1,14 +1,15 @@
-The content of this folder imports the functionality needed for HLL approximate
-algorithm from Apache DataSketches by copying the necessary files from that
-project into this folder. Note, that the original structure of files was
-changed during this process as originally hll/ and common/ libraries were
-both affected but I copied these into the same directory so that Impala can
-compile them without rewriting the include paths in the files themselves. Also
-note, that not the whole common/ directory was copied just the files needed for
-HLL.
+The content of this folder imports the functionality needed for HLL and KLL
+approximate algorithms from Apache DataSketches by copying the necessary files
+from that project into this folder. Note, that the original structure of files was
+changed during this process as originally the following folders were affected:
+ hll/include/
+ kll/include/
+ common/include/
+I copied the content of these folders into the same directory so that Impala
+can compile them without rewriting the include paths in the files themselves.
The git hash of the snapshot I used as a source for the files:
-a6265b307a03085abe26c20413fdbf7d7a5eaf29
+c67d92faad3827932ca3b5d864222e64977f2c20
Browse the source files here:
https://github.com/apache/incubator-datasketches-cpp
diff --git a/be/src/thirdparty/datasketches/bounds_binomial_proportions.hpp b/be/src/thirdparty/datasketches/bounds_binomial_proportions.hpp
new file mode 100644
index 0000000..06ab484
--- /dev/null
+++ b/be/src/thirdparty/datasketches/bounds_binomial_proportions.hpp
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _BOUNDS_BINOMIAL_PROPORTIONS_HPP_
+#define _BOUNDS_BINOMIAL_PROPORTIONS_HPP_
+
+#include <cmath>
+#include <stdexcept>
+
+namespace datasketches {
+
+/**
+ * Confidence intervals for binomial proportions.
+ *
+ * <p>This class computes an approximation to the Clopper-Pearson confidence interval
+ * for a binomial proportion. Exact Clopper-Pearson intervals are strictly
+ * conservative, but these approximations are not.</p>
+ *
+ * <p>The main inputs are numbers <i>n</i> and <i>k</i>, which are not the same as other things
+ * that are called <i>n</i> and <i>k</i> in our sketching library. There is also a third
+ * parameter, numStdDev, that specifies the desired confidence level.</p>
+ * <ul>
+ * <li><i>n</i> is the number of independent randomized trials. It is given and therefore known.
+ * </li>
+ * <li><i>p</i> is the probability of a trial being a success. It is unknown.</li>
+ * <li><i>k</i> is the number of trials (out of <i>n</i>) that turn out to be successes. It is
+ * a random variable governed by a binomial distribution. After any given
+ * batch of <i>n</i> independent trials, the random variable <i>k</i> has a specific
+ * value which is observed and is therefore known.</li>
+ * <li><i>pHat</i> = <i>k</i> / <i>n</i> is an unbiased estimate of the unknown success
+ * probability <i>p</i>.</li>
+ * </ul>
+ *
+ * <p>Alternatively, consider a coin with unknown heads probability <i>p</i>. Where
+ * <i>n</i> is the number of independent flips of that coin, and <i>k</i> is the number
+ * of times that the coin comes up heads during a given batch of <i>n</i> flips.
+ * This class computes a frequentist confidence interval [lowerBoundOnP, upperBoundOnP] for the
+ * unknown <i>p</i>.</p>
+ *
+ * <p>Conceptually, the desired confidence level is specified by a tail probability delta.</p>
+ *
+ * <p>Ideally, over a large ensemble of independent batches of trials,
+ * the fraction of batches in which the true <i>p</i> lies below lowerBoundOnP would be at most
+ * delta, and the fraction of batches in which the true <i>p</i> lies above upperBoundOnP
+ * would also be at most delta.
+ *
+ * <p>Setting aside the philosophical difficulties attaching to that statement, it isn't quite
+ * true because we are approximating the Clopper-Pearson interval.</p>
+ *
+ * <p>Finally, we point out that in this class's interface, the confidence parameter delta is
+ * not specified directly, but rather through a "number of standard deviations" numStdDev.
+ * The library effectively converts that to a delta via delta = normalCDF (-1.0 * numStdDev).</p>
+ *
+ * <p>It is perhaps worth emphasizing that the library is NOT merely adding and subtracting
+ * numStdDev standard deviations to the estimate. It is doing something better, that to some
+ * extent accounts for the fact that the binomial distribution has a non-gaussian shape.</p>
+ *
+ * <p>In particular, it is using an approximation to the inverse of the incomplete beta function
+ * that appears as formula 26.5.22 on page 945 of the "Handbook of Mathematical Functions"
+ * by Abramowitz and Stegun.</p>
+ *
+ * @author Kevin Lang
+ * @author Jon Malkin
+ */
+class bounds_binomial_proportions { // confidence intervals for binomial proportions
+
+public:
+ /**
+ * Computes lower bound of approximate Clopper-Pearson confidence interval for a binomial
+ * proportion.
+ *
+ * <p>Implementation Notes:<br>
+ * The approximateLowerBoundOnP is defined with respect to the right tail of the binomial
+ * distribution.</p>
+ * <ul>
+ * <li>We want to solve for the <i>p</i> for which sum<sub><i>j,k,n</i></sub>bino(<i>j;n,p</i>)
+ * = delta.</li>
+ * <li>We now restate that in terms of the left tail.</li>
+ * <li>We want to solve for the p for which sum<sub><i>j,0,(k-1)</i></sub>bino(<i>j;n,p</i>)
+ * = 1 - delta.</li>
+ * <li>Define <i>x</i> = 1-<i>p</i>.</li>
+ * <li>We want to solve for the <i>x</i> for which I<sub><i>x(n-k+1,k)</i></sub> = 1 - delta.</li>
+ * <li>We specify 1-delta via numStdDevs through the right tail of the standard normal
+ * distribution.</li>
+ * <li>Smaller values of numStdDevs correspond to bigger values of 1-delta and hence to smaller
+ * values of delta. In fact, usefully small values of delta correspond to negative values of
+ * numStdDevs.</li>
+ * <li>return <i>p</i> = 1-<i>x</i>.</li>
+ * </ul>
+ *
+ * @param n is the number of trials. Must be non-negative.
+ * @param k is the number of successes. Must be non-negative, and cannot exceed n.
+ * @param num_std_devs the number of standard deviations defining the confidence interval
+ * @return the lower bound of the approximate Clopper-Pearson confidence interval for the
+ * unknown success probability.
+ */
+ static inline double approximate_lower_bound_on_p(long n, long k, double num_std_devs) {
+ check_inputs(n, k);
+ if (n == 0) { return 0.0; } // the coin was never flipped, so we know nothing
+ else if (k == 0) { return 0.0; }
+ else if (k == 1) { return (exact_lower_bound_on_p_k_eq_1(n, delta_of_num_stdevs(num_std_devs))); }
+ else if (k == n) { return (exact_lower_bound_on_p_k_eq_n(n, delta_of_num_stdevs(num_std_devs))); }
+ else {
+ double x = abramowitz_stegun_formula_26p5p22((n - k) + 1, k, (-1.0 * num_std_devs));
+ return (1.0 - x); // which is p
+ }
+ }
+
+ /**
+ * Computes upper bound of approximate Clopper-Pearson confidence interval for a binomial
+ * proportion.
+ *
+ * <p>Implementation Notes:<br>
+ * The approximateUpperBoundOnP is defined with respect to the left tail of the binomial
+ * distribution.</p>
+ * <ul>
+ * <li>We want to solve for the <i>p</i> for which sum<sub><i>j,0,k</i></sub>bino(<i>j;n,p</i>)
+ * = delta.</li>
+ * <li>Define <i>x</i> = 1-<i>p</i>.</li>
+ * <li>We want to solve for the <i>x</i> for which I<sub><i>x(n-k,k+1)</i></sub> = delta.</li>
+ * <li>We specify delta via numStdDevs through the right tail of the standard normal
+ * distribution.</li>
+ * <li>Bigger values of numStdDevs correspond to smaller values of delta.</li>
+ * <li>return <i>p</i> = 1-<i>x</i>.</li>
+ * </ul>
+ * @param n is the number of trials. Must be non-negative.
+ * @param k is the number of successes. Must be non-negative, and cannot exceed <i>n</i>.
+ * @param num_std_devs the number of standard deviations defining the confidence interval
+ * @return the upper bound of the approximate Clopper-Pearson confidence interval for the
+ * unknown success probability.
+ */
+ static inline double approximate_upper_bound_on_p(long n, long k, double num_std_devs) {
+ check_inputs(n, k);
+ if (n == 0) { return 1.0; } // the coin was never flipped, so we know nothing
+ else if (k == n) { return 1.0; }
+ else if (k == (n - 1)) {
+ return (exactU_upper_bound_on_p_k_eq_minusone(n, delta_of_num_stdevs(num_std_devs)));
+ }
+ else if (k == 0) {
+ return (exact_upper_bound_on_p_k_eq_zero(n, delta_of_num_stdevs(num_std_devs)));
+ }
+ else {
+ double x = abramowitz_stegun_formula_26p5p22(n - k, k + 1, num_std_devs);
+ return (1.0 - x); // which is p
+ }
+ }
+
+ /**
+ * Computes an estimate of an unknown binomial proportion.
+ * @param n is the number of trials. Must be non-negative.
+ * @param k is the number of successes. Must be non-negative, and cannot exceed n.
+ * @return the estimate of the unknown binomial proportion.
+ */
+ static inline double estimate_unknown_p(long n, long k) {
+ check_inputs(n, k);
+ if (n == 0) { return 0.5; } // the coin was never flipped, so we know nothing
+ else { return ((double) k / (double) n); }
+ }
+
+ /**
+ * Computes an approximation to the erf() function.
+ * @param x is the input to the erf function
+ * @return returns erf(x), accurate to roughly 7 decimal digits.
+ */
+ static inline double erf(double x) {
+ if (x < 0.0) { return (-1.0 * (erf_of_nonneg(-1.0 * x))); }
+ else { return (erf_of_nonneg(x)); }
+ }
+
+ /**
+ * Computes an approximation to normal_cdf(x).
+ * @param x is the input to the normal_cdf function
+ * @return returns the approximation to normalCDF(x).
+ */
+ static inline double normal_cdf(double x) {
+ return (0.5 * (1.0 + (erf(x / (sqrt(2.0))))));
+ }
+
+private:
+ static inline void check_inputs(long n, long k) {
+ if (n < 0) { throw std::invalid_argument("N must be non-negative"); }
+ if (k < 0) { throw std::invalid_argument("K must be non-negative"); }
+ if (k > n) { throw std::invalid_argument("K cannot exceed N"); }
+ }
+
+ //@formatter:off
+ // Abramowitz and Stegun formula 7.1.28, p. 88; Claims accuracy of about 7 decimal digits */
+ static inline double erf_of_nonneg(double x) {
+ // The constants that appear below, formatted for easy checking against the book.
+ // a1 = 0.07052 30784
+ // a3 = 0.00927 05272
+ // a5 = 0.00027 65672
+ // a2 = 0.04228 20123
+ // a4 = 0.00015 20143
+ // a6 = 0.00004 30638
+ static const double a1 = 0.0705230784;
+ static const double a3 = 0.0092705272;
+ static const double a5 = 0.0002765672;
+ static const double a2 = 0.0422820123;
+ static const double a4 = 0.0001520143;
+ static const double a6 = 0.0000430638;
+ const double x2 = x * x; // x squared, x cubed, etc.
+ const double x3 = x2 * x;
+ const double x4 = x2 * x2;
+ const double x5 = x2 * x3;
+ const double x6 = x3 * x3;
+ const double sum = ( 1.0
+ + (a1 * x)
+ + (a2 * x2)
+ + (a3 * x3)
+ + (a4 * x4)
+ + (a5 * x5)
+ + (a6 * x6) );
+ const double sum2 = sum * sum; // raise the sum to the 16th power
+ const double sum4 = sum2 * sum2;
+ const double sum8 = sum4 * sum4;
+ const double sum16 = sum8 * sum8;
+ return (1.0 - (1.0 / sum16));
+ }
+
+ static inline double delta_of_num_stdevs(double kappa) {
+ return (normal_cdf(-1.0 * kappa));
+ }
+
+ //@formatter:on
+ // Formula 26.5.22 on page 945 of Abramowitz & Stegun, which is an approximation
+ // of the inverse of the incomplete beta function I_x(a,b) = delta
+ // viewed as a scalar function of x.
+ // In other words, we specify delta, and it gives us x (with a and b held constant).
+ // However, delta is specified in an indirect way through yp which
+ // is the number of stdDevs that leaves delta probability in the right
+ // tail of a standard gaussian distribution.
+
+ // We point out that the variable names correspond to those in the book,
+ // and it is worth keeping it that way so that it will always be easy to verify
+ // that the formula was typed in correctly.
+
+ static inline double abramowitz_stegun_formula_26p5p22(double a, double b,
+ double yp) {
+ const double b2m1 = (2.0 * b) - 1.0;
+ const double a2m1 = (2.0 * a) - 1.0;
+ const double lambda = ((yp * yp) - 3.0) / 6.0;
+ const double htmp = (1.0 / a2m1) + (1.0 / b2m1);
+ const double h = 2.0 / htmp;
+ const double term1 = (yp * (sqrt(h + lambda))) / h;
+ const double term2 = (1.0 / b2m1) - (1.0 / a2m1);
+ const double term3 = (lambda + (5.0 / 6.0)) - (2.0 / (3.0 * h));
+ const double w = term1 - (term2 * term3);
+ const double xp = a / (a + (b * (exp(2.0 * w))));
+ return xp;
+ }
+
+ // Formulas for some special cases.
+
+ static inline double exact_upper_bound_on_p_k_eq_zero(double n, double delta) {
+ return (1.0 - pow(delta, (1.0 / n)));
+ }
+
+ static inline double exact_lower_bound_on_p_k_eq_n(double n, double delta) {
+ return (pow(delta, (1.0 / n)));
+ }
+
+ static inline double exact_lower_bound_on_p_k_eq_1(double n, double delta) {
+ return (1.0 - pow((1.0 - delta), (1.0 / n)));
+ }
+
+ static inline double exactU_upper_bound_on_p_k_eq_minusone(double n, double delta) {
+ return (pow((1.0 - delta), (1.0 / n)));
+ }
+
+};
+
+}
+
+#endif // _BOUNDS_BINOMIAL_PROPORTIONS_HPP_
diff --git a/be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp b/be/src/thirdparty/datasketches/common_defs.hpp
similarity index 66%
copy from be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp
copy to be/src/thirdparty/datasketches/common_defs.hpp
index 67563c5..7a7b40d 100644
--- a/be/src/thirdparty/datasketches/CompositeInterpolationXTable.hpp
+++ b/be/src/thirdparty/datasketches/common_defs.hpp
@@ -17,24 +17,20 @@
* under the License.
*/
-#ifndef _COMPOSITEINTERPOLATIONXTABLE_HPP_
-#define _COMPOSITEINTERPOLATIONXTABLE_HPP_
+#ifndef _COMMON_DEFS_HPP_
+#define _COMMON_DEFS_HPP_
+#include <cstdint>
+#include <string>
#include <memory>
namespace datasketches {
-template<typename A = std::allocator<char>>
-class CompositeInterpolationXTable {
- public:
- static const int get_y_stride(int logK);
+static const uint64_t DEFAULT_SEED = 9001;
- static const double* const get_x_arr(int logK);
- static const int get_x_arr_length(int logK);
-};
+template<typename A> using AllocChar = typename std::allocator_traits<A>::template rebind_alloc<char>;
+template<typename A> using string = std::basic_string<char, std::char_traits<char>, AllocChar<A>>;
-}
+} // namespace
-#include "CompositeInterpolationXTable-internal.hpp"
-
-#endif /* _COMPOSITEINTERPOLATIONXTABLE_HPP_ */
\ No newline at end of file
+#endif // _COMMON_DEFS_HPP_
diff --git a/be/src/thirdparty/datasketches/count_zeros.hpp b/be/src/thirdparty/datasketches/count_zeros.hpp
new file mode 100644
index 0000000..0c9f6b4
--- /dev/null
+++ b/be/src/thirdparty/datasketches/count_zeros.hpp
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _COUNT_ZEROS_HPP_
+#define _COUNT_ZEROS_HPP_
+
+#include <cstdint>
+
+#include <stdio.h>
+
+namespace datasketches {
+
+static const uint8_t byte_leading_zeros_table[256] = {
+ 8, 7, 6, 6, 5, 5, 5, 5, 4, 4, 4, 4, 4, 4, 4, 4,
+ 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
+ 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
+ 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
+ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
+};
+
+static const uint8_t byte_trailing_zeros_table[256] = {
+ 8, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 6, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 7, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 6, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 5, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0,
+ 4, 0, 1, 0, 2, 0, 1, 0, 3, 0, 1, 0, 2, 0, 1, 0
+};
+
+static const uint64_t FCLZ_MASK_56 = 0x00ffffffffffffff;
+static const uint64_t FCLZ_MASK_48 = 0x0000ffffffffffff;
+static const uint64_t FCLZ_MASK_40 = 0x000000ffffffffff;
+static const uint64_t FCLZ_MASK_32 = 0x00000000ffffffff;
+static const uint64_t FCLZ_MASK_24 = 0x0000000000ffffff;
+static const uint64_t FCLZ_MASK_16 = 0x000000000000ffff;
+static const uint64_t FCLZ_MASK_08 = 0x00000000000000ff;
+
+static inline uint8_t count_leading_zeros_in_u64(uint64_t input) {
+ if (input > FCLZ_MASK_56)
+ return byte_leading_zeros_table[(input >> 56) & FCLZ_MASK_08];
+ if (input > FCLZ_MASK_48)
+ return 8 + byte_leading_zeros_table[(input >> 48) & FCLZ_MASK_08];
+ if (input > FCLZ_MASK_40)
+ return 16 + byte_leading_zeros_table[(input >> 40) & FCLZ_MASK_08];
+ if (input > FCLZ_MASK_32)
+ return 24 + byte_leading_zeros_table[(input >> 32) & FCLZ_MASK_08];
+ if (input > FCLZ_MASK_24)
+ return 32 + byte_leading_zeros_table[(input >> 24) & FCLZ_MASK_08];
+ if (input > FCLZ_MASK_16)
+ return 40 + byte_leading_zeros_table[(input >> 16) & FCLZ_MASK_08];
+ if (input > FCLZ_MASK_08)
+ return 48 + byte_leading_zeros_table[(input >> 8) & FCLZ_MASK_08];
+ if (true)
+ return 56 + byte_leading_zeros_table[(input ) & FCLZ_MASK_08];
+}
+
+static inline uint8_t count_trailing_zeros_in_u32(uint32_t input) {
+ for (int i = 0; i < 4; i++) {
+ const int byte = input & 0xff;
+ if (byte != 0) return (i << 3) + byte_trailing_zeros_table[byte];
+ input >>= 8;
+ }
+ return 32;
+}
+
+static inline uint8_t count_trailing_zeros_in_u64(uint64_t input) {
+ for (int i = 0; i < 8; i++) {
+ const int byte = input & 0xff;
+ if (byte != 0) return (i << 3) + byte_trailing_zeros_table[byte];
+ input >>= 8;
+ }
+ return 64;
+}
+
+} /* namespace datasketches */
+
+#endif // _COUNT_ZEROS_HPP_
diff --git a/be/src/thirdparty/datasketches/hll.hpp b/be/src/thirdparty/datasketches/hll.hpp
index 31db794..c76ba48 100644
--- a/be/src/thirdparty/datasketches/hll.hpp
+++ b/be/src/thirdparty/datasketches/hll.hpp
@@ -20,6 +20,7 @@
#ifndef _HLL_HPP_
#define _HLL_HPP_
+#include "common_defs.hpp"
#include "HllUtil.hpp"
#include <memory>
@@ -194,31 +195,16 @@ class hll_sketch_alloc final {
/**
* Human readable summary with optional detail
- * @param os std::ostram to which the summary is written
* @param summary if true, output the sketch summary
* @param detail if true, output the internal data array
* @param auxDetail if true, output the internal Aux array, if it exists.
* @param all if true, outputs all entries including empty ones
* @return human readable string with optional detail.
*/
- std::ostream& to_string(std::ostream& os,
- bool summary = true,
- bool detail = false,
- bool aux_detail = false,
- bool all = false) const;
-
- /**
- * Human readable summary with optional detail
- * @param summary if true, output the sketch summary
- * @param detail if true, output the internal data array
- * @param auxDetail if true, output the internal Aux array, if it exists.
- * @param all if true, outputs all entries including empty ones
- * @return human readable string with optional detail.
- */
- std::string to_string(bool summary = true,
- bool detail = false,
- bool aux_detail = false,
- bool all = false) const;
+ string<A> to_string(bool summary = true,
+ bool detail = false,
+ bool aux_detail = false,
+ bool all = false) const;
/**
* Present the given std::string as a potential unique item.
@@ -547,63 +533,6 @@ class hll_union_alloc {
*/
hll_sketch_alloc<A> get_result(target_hll_type tgt_type = HLL_4) const;
- typedef vector_u8<A> vector_bytes; // alias for users
-
- /**
- * Serializes the sketch to a byte array, compacting data structures
- * where feasible to eliminate unused storage in the serialized image.
- * @param header_size_bytes Allows for PostgreSQL integration
- */
- vector_bytes serialize_compact() const;
-
- /**
- * Serializes the sketch to a byte array, retaining all internal
- * data structures in their current form.
- */
- vector_bytes serialize_updatable() const;
-
- /**
- * Serializes the sketch to an ostream, compacting data structures
- * where feasible to eliminate unused storage in the serialized image.
- * @param os std::ostream to use for output.
- */
- void serialize_compact(std::ostream& os) const;
-
- /**
- * Serializes the sketch to an ostream, retaining all internal data
- * structures in their current form.
- * @param os std::ostream to use for output.
- */
- void serialize_updatable(std::ostream& os) const;
-
- /**
- * Human readable summary with optional detail
- * @param os std::ostram to which the summary is written
- * @param summary if true, output the sketch summary
- * @param detail if true, output the internal data array
- * @param auxDetail if true, output the internal Aux array, if it exists.
- * @param all if true, outputs all entries including empty ones
- * @return human readable string with optional detail.
- */
- std::ostream& to_string(std::ostream& os,
- bool summary = true,
- bool detail = false,
- bool aux_Detail = false,
- bool all = false) const;
-
- /**
- * Human readable summary with optional detail
- * @param summary if true, output the sketch summary
- * @param detail if true, output the internal data array
- * @param auxDetail if true, output the internal Aux array, if it exists.
- * @param all if true, outputs all entries including empty ones
- * @return human readable string with optional detail.
- */
- std::string to_string(bool summary = true,
- bool detail = false,
- bool aux_detail = false,
- bool all = false) const;
-
/**
* Update this union operator with the given sketch.
* @param The given sketch.
@@ -742,12 +671,6 @@ class hll_union_alloc {
hll_sketch_alloc<A> gadget;
};
-template<typename A>
-static std::ostream& operator<<(std::ostream& os, const hll_sketch_alloc<A>& sketch);
-
-template<typename A>
-static std::ostream& operator<<(std::ostream& os, const hll_union_alloc<A>& union_in);
-
/// convenience alias for hll_sketch with default allocator
typedef hll_sketch_alloc<> hll_sketch;
diff --git a/be/src/thirdparty/datasketches/kll_helper.hpp b/be/src/thirdparty/datasketches/kll_helper.hpp
new file mode 100644
index 0000000..4857f51
--- /dev/null
+++ b/be/src/thirdparty/datasketches/kll_helper.hpp
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_HELPER_HPP_
+#define KLL_HELPER_HPP_
+
+#include <random>
+#include <stdexcept>
+#include <chrono>
+
+namespace datasketches {
+
+static std::independent_bits_engine<std::mt19937, 1, uint32_t> random_bit(std::chrono::system_clock::now().time_since_epoch().count());
+
+#ifdef KLL_VALIDATION
+extern uint32_t kll_next_offset;
+#endif
+
+// 0 <= power <= 30
+static const uint64_t powers_of_three[] = {1, 3, 9, 27, 81, 243, 729, 2187, 6561, 19683, 59049, 177147, 531441,
+1594323, 4782969, 14348907, 43046721, 129140163, 387420489, 1162261467,
+3486784401, 10460353203, 31381059609, 94143178827, 282429536481,
+847288609443, 2541865828329, 7625597484987, 22876792454961, 68630377364883,
+205891132094649};
+
+class kll_helper {
+ public:
+ static inline bool is_even(uint32_t value);
+ static inline bool is_odd(uint32_t value);
+ static inline uint8_t floor_of_log2_of_fraction(uint64_t numer, uint64_t denom);
+ static inline uint8_t ub_on_num_levels(uint64_t n);
+ static inline uint32_t compute_total_capacity(uint16_t k, uint8_t m, uint8_t num_levels);
+ static inline uint32_t level_capacity(uint16_t k, uint8_t numLevels, uint8_t height, uint8_t min_wid);
+ static inline uint32_t int_cap_aux(uint16_t k, uint8_t depth);
+ static inline uint32_t int_cap_aux_aux(uint16_t k, uint8_t depth);
+ static inline uint64_t sum_the_sample_weights(uint8_t num_levels, const uint32_t* levels);
+
+ /*
+ * This version is for floating point types
+ * Checks the sequential validity of the given array of values.
+ * They must be unique, monotonically increasing and not NaN.
+ */
+ template <typename T, typename C>
+ static typename std::enable_if<std::is_floating_point<T>::value, void>::type
+ validate_values(const T* values, uint32_t size) {
+ for (uint32_t i = 0; i < size ; i++) {
+ if (std::isnan(values[i])) {
+ throw std::invalid_argument("Values must not be NaN");
+ }
+ if ((i < (size - 1)) && !(C()(values[i], values[i + 1]))) {
+ throw std::invalid_argument("Values must be unique and monotonically increasing");
+ }
+ }
+ }
+ /*
+ * This version is for non-floating point types
+ * Checks the sequential validity of the given array of values.
+ * They must be unique and monotonically increasing.
+ */
+ template <typename T, typename C>
+ static typename std::enable_if<!std::is_floating_point<T>::value, void>::type
+ validate_values(const T* values, uint32_t size) {
+ for (uint32_t i = 0; i < size ; i++) {
+ if ((i < (size - 1)) && !(C()(values[i], values[i + 1]))) {
+ throw std::invalid_argument("Values must be unique and monotonically increasing");
+ }
+ }
+ }
+
+ template <typename T>
+ static void randomly_halve_down(T* buf, uint32_t start, uint32_t length);
+
+ template <typename T>
+ static void randomly_halve_up(T* buf, uint32_t start, uint32_t length);
+
+ // this version moves objects within the same buffer
+ // assumes that destination has initialized objects
+ // does not destroy the originals after the move
+ template <typename T, typename C>
+ static void merge_sorted_arrays(T* buf, uint32_t start_a, uint32_t len_a, uint32_t start_b, uint32_t len_b, uint32_t start_c);
+
+ // this version is to merge from two different buffers into a third buffer
+ // initializes objects is the destination buffer
+ // moves objects from buf_a and destroys the originals
+ // copies objects from buf_b
+ template <typename T, typename C>
+ static void merge_sorted_arrays(const T* buf_a, uint32_t start_a, uint32_t len_a, const T* buf_b, uint32_t start_b, uint32_t len_b, T* buf_c, uint32_t start_c);
+
+ struct compress_result {
+ uint8_t final_num_levels;
+ uint32_t final_capacity;
+ uint32_t final_num_items;
+ };
+
+ /*
+ * Here is what we do for each level:
+ * If it does not need to be compacted, then simply copy it over.
+ *
+ * Otherwise, it does need to be compacted, so...
+ * Copy zero or one guy over.
+ * If the level above is empty, halve up.
+ * Else the level above is nonempty, so...
+ * halve down, then merge up.
+ * Adjust the boundaries of the level above.
+ *
+ * It can be proved that general_compress returns a sketch that satisfies the space constraints
+ * no matter how much data is passed in.
+ * All levels except for level zero must be sorted before calling this, and will still be
+ * sorted afterwards.
+ * Level zero is not required to be sorted before, and may not be sorted afterwards.
+ */
+ template <typename T, typename C>
+ static compress_result general_compress(uint16_t k, uint8_t m, uint8_t num_levels_in, T* items,
+ uint32_t* in_levels, uint32_t* out_levels, bool is_level_zero_sorted);
+
+ template<typename T>
+ static void copy_construct(const T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first);
+
+ template<typename T>
+ static void move_construct(T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first, bool destroy);
+
+#ifdef KLL_VALIDATION
+ private:
+
+ static inline uint32_t deterministic_offset();
+#endif
+
+};
+
+} /* namespace datasketches */
+
+#include "kll_helper_impl.hpp"
+
+#endif // KLL_HELPER_HPP_
diff --git a/be/src/thirdparty/datasketches/kll_helper_impl.hpp b/be/src/thirdparty/datasketches/kll_helper_impl.hpp
new file mode 100644
index 0000000..011a78d
--- /dev/null
+++ b/be/src/thirdparty/datasketches/kll_helper_impl.hpp
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_HELPER_IMPL_HPP_
+#define KLL_HELPER_IMPL_HPP_
+
+#include <algorithm>
+
+namespace datasketches {
+
+bool kll_helper::is_even(uint32_t value) {
+ return (value & 1) == 0;
+}
+
+bool kll_helper::is_odd(uint32_t value) {
+ return (value & 1) > 0;
+}
+
+uint8_t kll_helper::floor_of_log2_of_fraction(uint64_t numer, uint64_t denom) {
+ if (denom > numer) return 0;
+ uint8_t count = 0;
+ while (true) {
+ denom <<= 1;
+ if (denom > numer) return count;
+ count++;
+ }
+}
+
+uint8_t kll_helper::ub_on_num_levels(uint64_t n) {
+ if (n == 0) return 1;
+ return 1 + floor_of_log2_of_fraction(n, 1);
+}
+
+uint32_t kll_helper::compute_total_capacity(uint16_t k, uint8_t m, uint8_t num_levels) {
+ uint32_t total = 0;
+ for (uint8_t h = 0; h < num_levels; h++) {
+ total += level_capacity(k, num_levels, h, m);
+ }
+ return total;
+}
+
+uint32_t kll_helper::level_capacity(uint16_t k, uint8_t numLevels, uint8_t height, uint8_t min_wid) {
+ if (height >= numLevels) throw std::invalid_argument("height >= numLevels");
+ const uint8_t depth = numLevels - height - 1;
+ return std::max((uint32_t) min_wid, int_cap_aux(k, depth));
+}
+
+uint32_t kll_helper::int_cap_aux(uint16_t k, uint8_t depth) {
+ if (depth > 60) throw std::invalid_argument("depth > 60");
+ if (depth <= 30) return int_cap_aux_aux(k, depth);
+ const uint8_t half = depth / 2;
+ const uint8_t rest = depth - half;
+ const uint32_t tmp = int_cap_aux_aux(k, half);
+ return int_cap_aux_aux(tmp, rest);
+}
+
+uint32_t kll_helper::int_cap_aux_aux(uint16_t k, uint8_t depth) {
+ if (depth > 30) throw std::invalid_argument("depth > 30");
+ const uint64_t twok = k << 1; // for rounding, we pre-multiply by 2
+ const uint64_t tmp = (uint64_t) (((uint64_t) twok << depth) / powers_of_three[depth]);
+ const uint64_t result = (tmp + 1) >> 1; // then here we add 1 and divide by 2
+ if (result > k) throw std::logic_error("result > k");
+ return result;
+}
+
+uint64_t kll_helper::sum_the_sample_weights(uint8_t num_levels, const uint32_t* levels) {
+ uint64_t total = 0;
+ uint64_t weight = 1;
+ for (uint8_t lvl = 0; lvl < num_levels; lvl++) {
+ total += weight * (levels[lvl + 1] - levels[lvl]);
+ weight *= 2;
+ }
+ return total;
+}
+
+template <typename T>
+void kll_helper::randomly_halve_down(T* buf, uint32_t start, uint32_t length) {
+ if (!is_even(length)) throw std::invalid_argument("length must be even");
+ const uint32_t half_length = length / 2;
+#ifdef KLL_VALIDATION
+ const uint32_t offset = deterministic_offset();
+#else
+ const uint32_t offset = random_bit();
+#endif
+ uint32_t j = start + offset;
+ for (uint32_t i = start; i < (start + half_length); i++) {
+ if (i != j) buf[i] = std::move(buf[j]);
+ j += 2;
+ }
+}
+
+template <typename T>
+void kll_helper::randomly_halve_up(T* buf, uint32_t start, uint32_t length) {
+ if (!is_even(length)) throw std::invalid_argument("length must be even");
+ const uint32_t half_length = length / 2;
+#ifdef KLL_VALIDATION
+ const uint32_t offset = deterministic_offset();
+#else
+ const uint32_t offset = random_bit();
+#endif
+ uint32_t j = (start + length) - 1 - offset;
+ for (uint32_t i = (start + length) - 1; i >= (start + half_length); i--) {
+ if (i != j) buf[i] = std::move(buf[j]);
+ j -= 2;
+ }
+}
+
+// this version moves objects within the same buffer
+// assumes that destination has initialized objects
+// does not destroy the originals after the move
+template <typename T, typename C>
+void kll_helper::merge_sorted_arrays(T* buf, uint32_t start_a, uint32_t len_a, uint32_t start_b, uint32_t len_b, uint32_t start_c) {
+ const uint32_t len_c = len_a + len_b;
+ const uint32_t lim_a = start_a + len_a;
+ const uint32_t lim_b = start_b + len_b;
+ const uint32_t lim_c = start_c + len_c;
+
+ uint32_t a = start_a;
+ uint32_t b = start_b;
+
+ for (uint32_t c = start_c; c < lim_c; c++) {
+ if (a == lim_a) {
+ if (b != c) buf[c] = std::move(buf[b]);
+ b++;
+ } else if (b == lim_b) {
+ if (a != c) buf[c] = std::move(buf[a]);
+ a++;
+ } else if (C()(buf[a], buf[b])) {
+ if (a != c) buf[c] = std::move(buf[a]);
+ a++;
+ } else {
+ if (b != c) buf[c] = std::move(buf[b]);
+ b++;
+ }
+ }
+ if (a != lim_a || b != lim_b) throw std::logic_error("inconsistent state");
+}
+
+// this version is to merge from two different buffers into a third buffer
+// initializes objects is the destination buffer
+// moves objects from buf_a and destroys the originals
+// copies objects from buf_b
+template <typename T, typename C>
+void kll_helper::merge_sorted_arrays(const T* buf_a, uint32_t start_a, uint32_t len_a, const T* buf_b, uint32_t start_b, uint32_t len_b, T* buf_c, uint32_t start_c) {
+ const uint32_t len_c = len_a + len_b;
+ const uint32_t lim_a = start_a + len_a;
+ const uint32_t lim_b = start_b + len_b;
+ const uint32_t lim_c = start_c + len_c;
+
+ uint32_t a = start_a;
+ uint32_t b = start_b;
+
+ for (uint32_t c = start_c; c < lim_c; c++) {
+ if (a == lim_a) {
+ new (&buf_c[c]) T(buf_b[b++]);
+ } else if (b == lim_b) {
+ new (&buf_c[c]) T(std::move(buf_a[a]));
+ buf_a[a++].~T();
+ } else if (C()(buf_a[a], buf_b[b])) {
+ new (&buf_c[c]) T(std::move(buf_a[a]));
+ buf_a[a++].~T();
+ } else {
+ new (&buf_c[c]) T(buf_b[b++]);
+ }
+ }
+ if (a != lim_a || b != lim_b) throw std::logic_error("inconsistent state");
+}
+
+/*
+ * Here is what we do for each level:
+ * If it does not need to be compacted, then simply copy it over.
+ *
+ * Otherwise, it does need to be compacted, so...
+ * Copy zero or one guy over.
+ * If the level above is empty, halve up.
+ * Else the level above is nonempty, so...
+ * halve down, then merge up.
+ * Adjust the boundaries of the level above.
+ *
+ * It can be proved that general_compress returns a sketch that satisfies the space constraints
+ * no matter how much data is passed in.
+ * All levels except for level zero must be sorted before calling this, and will still be
+ * sorted afterwards.
+ * Level zero is not required to be sorted before, and may not be sorted afterwards.
+ */
+template <typename T, typename C>
+kll_helper::compress_result kll_helper::general_compress(uint16_t k, uint8_t m, uint8_t num_levels_in, T* items,
+ uint32_t* in_levels, uint32_t* out_levels, bool is_level_zero_sorted)
+{
+ if (num_levels_in == 0) throw std::invalid_argument("num_levels_in == 0"); // things are too weird if zero levels are allowed
+ const uint32_t starting_item_count = in_levels[num_levels_in] - in_levels[0];
+ uint8_t current_num_levels = num_levels_in;
+ uint32_t current_item_count = starting_item_count; // decreases with each compaction
+ uint32_t target_item_count = compute_total_capacity(k, m, current_num_levels); // increases if we add levels
+ bool done_yet = false;
+ out_levels[0] = 0;
+ uint8_t current_level = 0;
+ while (!done_yet) {
+
+ // If we are at the current top level, add an empty level above it for convenience,
+ // but do not increment num_levels until later
+ if (current_level == (current_num_levels - 1)) {
+ in_levels[current_level + 2] = in_levels[current_level + 1];
+ }
+
+ const auto raw_beg = in_levels[current_level];
+ const auto raw_lim = in_levels[current_level + 1];
+ const auto raw_pop = raw_lim - raw_beg;
+
+ if ((current_item_count < target_item_count) || (raw_pop < level_capacity(k, current_num_levels, current_level, m))) {
+ // move level over as is
+ // make sure we are not moving data upwards
+ if (raw_beg < out_levels[current_level]) throw std::logic_error("wrong move");
+ std::move(&items[raw_beg], &items[raw_lim], &items[out_levels[current_level]]);
+ out_levels[current_level + 1] = out_levels[current_level] + raw_pop;
+ } else {
+ // The sketch is too full AND this level is too full, so we compact it
+ // Note: this can add a level and thus change the sketches capacities
+
+ const auto pop_above = in_levels[current_level + 2] - raw_lim;
+ const bool odd_pop = is_odd(raw_pop);
+ const auto adj_beg = odd_pop ? 1 + raw_beg : raw_beg;
+ const auto adj_pop = odd_pop ? raw_pop - 1 : raw_pop;
+ const auto half_adj_pop = adj_pop / 2;
+
+ if (odd_pop) { // move one guy over
+ items[out_levels[current_level]] = std::move(items[raw_beg]);
+ out_levels[current_level + 1] = out_levels[current_level] + 1;
+ } else { // even number of items
+ out_levels[current_level + 1] = out_levels[current_level];
+ }
+
+ // level zero might not be sorted, so we must sort it if we wish to compact it
+ if ((current_level == 0) && !is_level_zero_sorted) {
+ std::sort(&items[adj_beg], &items[adj_beg + adj_pop], C());
+ }
+
+ if (pop_above == 0) { // Level above is empty, so halve up
+ randomly_halve_up(items, adj_beg, adj_pop);
+ } else { // Level above is nonempty, so halve down, then merge up
+ randomly_halve_down(items, adj_beg, adj_pop);
+ merge_sorted_arrays<T, C>(items, adj_beg, half_adj_pop, raw_lim, pop_above, adj_beg + half_adj_pop);
+ }
+
+ // track the fact that we just eliminated some data
+ current_item_count -= half_adj_pop;
+
+ // adjust the boundaries of the level above
+ in_levels[current_level + 1] = in_levels[current_level + 1] - half_adj_pop;
+
+ // increment num_levels if we just compacted the old top level
+ // this creates some more capacity (the size of the new bottom level)
+ if (current_level == (current_num_levels - 1)) {
+ current_num_levels++;
+ target_item_count += level_capacity(k, current_num_levels, 0, m);
+ }
+
+ } // end of code for compacting a level
+
+ // determine whether we have processed all levels yet (including any new levels that we created)
+
+ if (current_level == (current_num_levels - 1)) done_yet = true;
+ current_level++;
+ } // end of loop over levels
+
+ if ((out_levels[current_num_levels] - out_levels[0]) != current_item_count) throw std::logic_error("inconsistent state");
+
+ for (uint32_t i = current_item_count; i < starting_item_count; i++) items[i].~T();
+
+ compress_result result;
+ result.final_num_levels = current_num_levels;
+ result.final_capacity = target_item_count;
+ result.final_num_items = current_item_count;
+ return result;
+}
+
+template<typename T>
+void kll_helper::copy_construct(const T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first) {
+ while (src_first != src_last) {
+ new (&dst[dst_first++]) T(src[src_first++]);
+ }
+}
+
+template<typename T>
+void kll_helper::move_construct(T* src, size_t src_first, size_t src_last, T* dst, size_t dst_first, bool destroy) {
+ while (src_first != src_last) {
+ new (&dst[dst_first++]) T(std::move(src[src_first]));
+ if (destroy) src[src_first].~T();
+ src_first++;
+ }
+}
+
+#ifdef KLL_VALIDATION
+uint32_t kll_helper::deterministic_offset() {
+ const uint32_t result(kll_next_offset);
+ kll_next_offset = 1 - kll_next_offset;
+ return result;
+}
+#endif
+
+} /* namespace datasketches */
+
+#endif // KLL_HELPER_IMPL_HPP_
diff --git a/be/src/thirdparty/datasketches/kll_quantile_calculator.hpp b/be/src/thirdparty/datasketches/kll_quantile_calculator.hpp
new file mode 100644
index 0000000..f77071e
--- /dev/null
+++ b/be/src/thirdparty/datasketches/kll_quantile_calculator.hpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_QUANTILE_CALCULATOR_HPP_
+#define KLL_QUANTILE_CALCULATOR_HPP_
+
+#include <memory>
+
+namespace datasketches {
+
+template <typename T, typename C, typename A>
+class kll_quantile_calculator {
+ typedef typename std::allocator_traits<A>::template rebind_alloc<uint32_t> AllocU32;
+ typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64;
+ public:
+ // assumes that all levels are sorted including level 0
+ kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n);
+ ~kll_quantile_calculator();
+ T get_quantile(double fraction) const;
+
+ private:
+ uint64_t n_;
+ T* items_;
+ uint64_t* weights_;
+ uint32_t* levels_;
+ uint8_t levels_size_;
+ uint8_t num_levels_;
+
+ void populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
+ T approximately_answer_positional_query(uint64_t pos) const;
+ static void convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size);
+ static uint64_t pos_of_phi(double phi, uint64_t n);
+ static uint32_t chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos);
+ static uint32_t search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r);
+ static void blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
+ static void blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
+ static void tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2);
+};
+
+} /* namespace datasketches */
+
+#include "kll_quantile_calculator_impl.hpp"
+
+#endif // KLL_QUANTILE_CALCULATOR_HPP_
diff --git a/be/src/thirdparty/datasketches/kll_quantile_calculator_impl.hpp b/be/src/thirdparty/datasketches/kll_quantile_calculator_impl.hpp
new file mode 100644
index 0000000..d4f4c04
--- /dev/null
+++ b/be/src/thirdparty/datasketches/kll_quantile_calculator_impl.hpp
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_QUANTILE_CALCULATOR_IMPL_HPP_
+#define KLL_QUANTILE_CALCULATOR_IMPL_HPP_
+
+#include <memory>
+#include <cmath>
+
+#include "kll_helper.hpp"
+
+namespace datasketches {
+
+template <typename T, typename C, typename A>
+kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n) {
+ n_ = n;
+ const uint32_t num_items = levels[num_levels] - levels[0];
+ items_ = A().allocate(num_items);
+ weights_ = AllocU64().allocate(num_items + 1); // one more is intentional
+ levels_size_ = num_levels + 1;
+ levels_ = AllocU32().allocate(levels_size_);
+ populate_from_sketch(items, num_items, levels, num_levels);
+ blocky_tandem_merge_sort(items_, weights_, num_items, levels_, num_levels_);
+ convert_to_preceding_cummulative(weights_, num_items + 1);
+}
+
+template <typename T, typename C, typename A>
+kll_quantile_calculator<T, C, A>::~kll_quantile_calculator() {
+ const uint32_t num_items = levels_[num_levels_] - levels_[0];
+ for (uint32_t i = 0; i < num_items; i++) items_[i].~T();
+ A().deallocate(items_, num_items);
+ AllocU64().deallocate(weights_, num_items + 1);
+ AllocU32().deallocate(levels_, levels_size_);
+}
+
+template <typename T, typename C, typename A>
+T kll_quantile_calculator<T, C, A>::get_quantile(double fraction) const {
+ return approximately_answer_positional_query(pos_of_phi(fraction, n_));
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
+ kll_helper::copy_construct<T>(items, levels[0], levels[num_levels], items_, 0);
+ uint8_t src_level = 0;
+ uint8_t dst_level = 0;
+ uint64_t weight = 1;
+ uint32_t offset = levels[0];
+ while (src_level < num_levels) {
+ const uint32_t from_index(levels[src_level] - offset);
+ const uint32_t to_index(levels[src_level + 1] - offset); // exclusive
+ if (from_index < to_index) { // skip empty levels
+ std::fill(&weights_[from_index], &weights_[to_index], weight);
+ levels_[dst_level] = from_index;
+ levels_[dst_level + 1] = to_index;
+ dst_level++;
+ }
+ src_level++;
+ weight *= 2;
+ }
+ weights_[num_items] = 0;
+ num_levels_ = dst_level;
+}
+
+template <typename T, typename C, typename A>
+T kll_quantile_calculator<T, C, A>::approximately_answer_positional_query(uint64_t pos) const {
+ if (pos >= n_) throw std::logic_error("position out of range");
+ const uint32_t weights_size(levels_[num_levels_] + 1);
+ const uint32_t index = chunk_containing_pos(weights_, weights_size, pos);
+ return items_[index];
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size) {
+ uint64_t subtotal(0);
+ for (uint32_t i = 0; i < weights_size; i++) {
+ const uint32_t new_subtotal = subtotal + weights[i];
+ weights[i] = subtotal;
+ subtotal = new_subtotal;
+ }
+}
+
+template <typename T, typename C, typename A>
+uint64_t kll_quantile_calculator<T, C, A>::pos_of_phi(double phi, uint64_t n) {
+ const uint64_t pos = std::floor(phi * n);
+ return (pos == n) ? n - 1 : pos;
+}
+
+template <typename T, typename C, typename A>
+uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos) {
+ if (weights_size <= 1) throw std::logic_error("weights array too short"); // weights_ contains an "extra" position
+ const uint32_t nominal_length(weights_size - 1);
+ if (pos < weights[0]) throw std::logic_error("position too small");
+ if (pos >= weights[nominal_length]) throw std::logic_error("position too large");
+ return search_for_chunk_containing_pos(weights, pos, 0, nominal_length);
+}
+
+template <typename T, typename C, typename A>
+uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r) {
+ if (l + 1 == r) {
+ return l;
+ }
+ const uint32_t m(l + (r - l) / 2);
+ if (arr[m] <= pos) {
+ return search_for_chunk_containing_pos(arr, pos, m, r);
+ }
+ return search_for_chunk_containing_pos(arr, pos, l, m);
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
+ if (num_levels == 1) return;
+
+ // move the input in preparation for the "ping-pong" reduction strategy
+ auto tmp_items_deleter = [num_items](T* ptr) {
+ for (uint32_t i = 0; i < num_items; i++) ptr[i].~T();
+ A().deallocate(ptr, num_items);
+ };
+ std::unique_ptr<T, decltype(tmp_items_deleter)> tmp_items(A().allocate(num_items), tmp_items_deleter);
+ kll_helper::move_construct<T>(items, 0, num_items, tmp_items.get(), 0, false); // do not destroy since the items will be moved back
+ auto tmp_weights_deleter = [num_items](uint64_t* ptr) { AllocU64().deallocate(ptr, num_items); };
+ std::unique_ptr<uint64_t[], decltype(tmp_weights_deleter)> tmp_weights(AllocU64().allocate(num_items), tmp_weights_deleter); // don't need the extra one here
+ std::copy(weights, &weights[num_items], tmp_weights.get());
+ blocky_tandem_merge_sort_recursion(tmp_items.get(), tmp_weights.get(), items, weights, levels, 0, num_levels);
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels) {
+ if (num_levels == 1) return;
+ const uint8_t num_levels_1 = num_levels / 2;
+ const uint8_t num_levels_2 = num_levels - num_levels_1;
+ if (num_levels_1 < 1) throw std::logic_error("level above 0 expected");
+ if (num_levels_2 < num_levels_1) throw std::logic_error("wrong order of levels");
+ const uint8_t starting_level_1 = starting_level;
+ const uint8_t starting_level_2 = starting_level + num_levels_1;
+ // swap roles of src and dst
+ blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_1, num_levels_1);
+ blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_2, num_levels_2);
+ tandem_merge(items_src, weights_src, items_dst, weights_dst, levels, starting_level_1, num_levels_1, starting_level_2, num_levels_2);
+}
+
+template <typename T, typename C, typename A>
+void kll_quantile_calculator<T, C, A>::tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2) {
+ const auto from_index_1 = levels[starting_level_1];
+ const auto to_index_1 = levels[starting_level_1 + num_levels_1]; // exclusive
+ const auto from_index_2 = levels[starting_level_2];
+ const auto to_index_2 = levels[starting_level_2 + num_levels_2]; // exclusive
+ auto i_src_1 = from_index_1;
+ auto i_src_2 = from_index_2;
+ auto i_dst = from_index_1;
+
+ while ((i_src_1 < to_index_1) && (i_src_2 < to_index_2)) {
+ if (C()(items_src[i_src_1], items_src[i_src_2])) {
+ items_dst[i_dst] = std::move(items_src[i_src_1]);
+ weights_dst[i_dst] = weights_src[i_src_1];
+ i_src_1++;
+ } else {
+ items_dst[i_dst] = std::move(items_src[i_src_2]);
+ weights_dst[i_dst] = weights_src[i_src_2];
+ i_src_2++;
+ }
+ i_dst++;
+ }
+ if (i_src_1 < to_index_1) {
+ std::move(&items_src[i_src_1], &items_src[to_index_1], &items_dst[i_dst]);
+ std::copy(&weights_src[i_src_1], &weights_src[to_index_1], &weights_dst[i_dst]);
+ } else if (i_src_2 < to_index_2) {
+ std::move(&items_src[i_src_2], &items_src[to_index_2], &items_dst[i_dst]);
+ std::copy(&weights_src[i_src_2], &weights_src[to_index_2], &weights_dst[i_dst]);
+ }
+}
+
+} /* namespace datasketches */
+
+#endif // KLL_QUANTILE_CALCULATOR_IMPL_HPP_
diff --git a/be/src/thirdparty/datasketches/kll_sketch.hpp b/be/src/thirdparty/datasketches/kll_sketch.hpp
new file mode 100644
index 0000000..8b3409c
--- /dev/null
+++ b/be/src/thirdparty/datasketches/kll_sketch.hpp
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_SKETCH_HPP_
+#define KLL_SKETCH_HPP_
+
+#include <functional>
+#include <memory>
+#include <vector>
+
+#include "kll_quantile_calculator.hpp"
+#include "common_defs.hpp"
+#include "serde.hpp"
+
+namespace datasketches {
+
+/*
+ * Implementation of a very compact quantiles sketch with lazy compaction scheme
+ * and nearly optimal accuracy per retained item.
+ * See <a href="https://arxiv.org/abs/1603.05346v2">Optimal Quantile Approximation in Streams</a>.
+ *
+ * <p>This is a stochastic streaming sketch that enables near-real time analysis of the
+ * approximate distribution of values from a very large stream in a single pass, requiring only
+ * that the values are comparable.
+ * The analysis is obtained using <i>get_quantile()</i> or <i>get_quantiles()</i> functions or the
+ * inverse functions get_rank(), get_PMF() (Probability Mass Function), and get_CDF()
+ * (Cumulative Distribution Function).
+ *
+ * <p>As of May 2020, this implementation produces serialized sketches which are binary-compatible
+ * with the equivalent Java implementation only when template parameter T = float
+ * (32-bit single precision values).
+ *
+ * <p>Given an input stream of <i>N</i> numeric values, the <i>absolute rank</i> of any specific
+ * value is defined as its index <i>(0 to N-1)</i> in the hypothetical sorted stream of all
+ * <i>N</i> input values.
+ *
+ * <p>The <i>normalized rank</i> (<i>rank</i>) of any specific value is defined as its
+ * <i>absolute rank</i> divided by <i>N</i>.
+ * Thus, the <i>normalized rank</i> is a value between zero and one.
+ * In the documentation for this sketch <i>absolute rank</i> is never used so any
+ * reference to just <i>rank</i> should be interpreted to mean <i>normalized rank</i>.
+ *
+ * <p>This sketch is configured with a parameter <i>k</i>, which affects the size of the sketch
+ * and its estimation error.
+ *
+ * <p>The estimation error is commonly called <i>epsilon</i> (or <i>eps</i>) and is a fraction
+ * between zero and one. Larger values of <i>k</i> result in smaller values of epsilon.
+ * Epsilon is always with respect to the rank and cannot be applied to the
+ * corresponding values.
+ *
+ * <p>The relationship between the normalized rank and the corresponding values can be viewed
+ * as a two dimensional monotonic plot with the normalized rank on one axis and the
+ * corresponding values on the other axis. If the y-axis is specified as the value-axis and
+ * the x-axis as the normalized rank, then <i>y = get_quantile(x)</i> is a monotonically
+ * increasing function.
+ *
+ * <p>The functions <i>get_quantile(rank)</i> and get_quantiles(...) translate ranks into
+ * corresponding values. The functions <i>get_rank(value),
+ * get_CDF(...) (Cumulative Distribution Function), and get_PMF(...)
+ * (Probability Mass Function)</i> perform the opposite operation and translate values into ranks.
+ *
+ * <p>The <i>getPMF(...)</i> function has about 13 to 47% worse rank error (depending
+ * on <i>k</i>) than the other queries because the mass of each "bin" of the PMF has
+ * "double-sided" error from the upper and lower edges of the bin as a result of a subtraction,
+ * as the errors from the two edges can sometimes add.
+ *
+ * <p>The default <i>k</i> of 200 yields a "single-sided" epsilon of about 1.33% and a
+ * "double-sided" (PMF) epsilon of about 1.65%.
+ *
+ * <p>A <i>get_quantile(rank)</i> query has the following guarantees:
+ * <ul>
+ * <li>Let <i>v = get_quantile(r)</i> where <i>r</i> is the rank between zero and one.</li>
+ * <li>The value <i>v</i> will be a value from the input stream.</li>
+ * <li>Let <i>trueRank</i> be the true rank of <i>v</i> derived from the hypothetical sorted
+ * stream of all <i>N</i> values.</li>
+ * <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
+ * <li>Then <i>r - eps ≤ trueRank ≤ r + eps</i> with a confidence of 99%. Note that the
+ * error is on the rank, not the value.</li>
+ * </ul>
+ *
+ * <p>A <i>get_rank(value)</i> query has the following guarantees:
+ * <ul>
+ * <li>Let <i>r = get_rank(v)</i> where <i>v</i> is a value between the min and max values of
+ * the input stream.</li>
+ * <li>Let <i>true_rank</i> be the true rank of <i>v</i> derived from the hypothetical sorted
+ * stream of all <i>N</i> values.</li>
+ * <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
+ * <li>Then <i>r - eps ≤ trueRank ≤ r + eps</i> with a confidence of 99%.</li>
+ * </ul>
+ *
+ * <p>A <i>get_PMF()</i> query has the following guarantees:
+ * <ul>
+ * <li>Let <i>{r1, r2, ..., r(m+1)} = get_PMF(v1, v2, ..., vm)</i> where <i>v1, v2</i> are values
+ * between the min and max values of the input stream.
+ * <li>Let <i>mass<sub>i</sub> = estimated mass between v<sub>i</sub> and v<sub>i+1</sub></i>.</li>
+ * <li>Let <i>trueMass</i> be the true mass between the values of <i>v<sub>i</sub>,
+ * v<sub>i+1</sub></i> derived from the hypothetical sorted stream of all <i>N</i> values.</li>
+ * <li>Let <i>eps = get_normalized_rank_error(true)</i>.</li>
+ * <li>then <i>mass - eps ≤ trueMass ≤ mass + eps</i> with a confidence of 99%.</li>
+ * <li>r(m+1) includes the mass of all points larger than vm.</li>
+ * </ul>
+ *
+ * <p>A <i>get_CDF(...)</i> query has the following guarantees;
+ * <ul>
+ * <li>Let <i>{r1, r2, ..., r(m+1)} = get_CDF(v1, v2, ..., vm)</i> where <i>v1, v2</i> are values
+ * between the min and max values of the input stream.
+ * <li>Let <i>mass<sub>i</sub> = r<sub>i+1</sub> - r<sub>i</sub></i>.</li>
+ * <li>Let <i>trueMass</i> be the true mass between the true ranks of <i>v<sub>i</sub>,
+ * v<sub>i+1</sub></i> derived from the hypothetical sorted stream of all <i>N</i> values.</li>
+ * <li>Let <i>eps = get_normalized_rank_error(true)</i>.</li>
+ * <li>then <i>mass - eps ≤ trueMass ≤ mass + eps</i> with a confidence of 99%.</li>
+ * <li>1 - r(m+1) includes the mass of all points larger than vm.</li>
+ * </ul>
+ *
+ * <p>From the above, it might seem like we could make some estimates to bound the
+ * <em>value</em> returned from a call to <em>get_quantile()</em>. The sketch, however, does not
+ * let us derive error bounds or confidences around values. Because errors are independent, we
+ * can approximately bracket a value as shown below, but there are no error estimates available.
+ * Additionally, the interval may be quite large for certain distributions.
+ * <ul>
+ * <li>Let <i>v = get_quantile(r)</i>, the estimated quantile value of rank <i>r</i>.</li>
+ * <li>Let <i>eps = get_normalized_rank_error(false)</i>.</li>
+ * <li>Let <i>v<sub>lo</sub></i> = estimated quantile value of rank <i>(r - eps)</i>.</li>
+ * <li>Let <i>v<sub>hi</sub></i> = estimated quantile value of rank <i>(r + eps)</i>.</li>
+ * <li>Then <i>v<sub>lo</sub> ≤ v ≤ v<sub>hi</sub></i>, with 99% confidence.</li>
+ * </ul>
+ *
+ * author Kevin Lang
+ * author Alexander Saydakov
+ * author Lee Rhodes
+ */
+
+template<typename A> using AllocU8 = typename std::allocator_traits<A>::template rebind_alloc<uint8_t>;
+template<typename A> using vector_u8 = std::vector<uint8_t, AllocU8<A>>;
+template<typename A> using AllocU32 = typename std::allocator_traits<A>::template rebind_alloc<uint32_t>;
+template<typename A> using vector_u32 = std::vector<uint32_t, AllocU32<A>>;
+template<typename A> using AllocD = typename std::allocator_traits<A>::template rebind_alloc<double>;
+template<typename A> using vector_d = std::vector<double, AllocD<A>>;
+
+template <typename T, typename C = std::less<T>, typename S = serde<T>, typename A = std::allocator<T>>
+class kll_sketch {
+ public:
+ static const uint8_t DEFAULT_M = 8;
+ static const uint16_t DEFAULT_K = 200;
+ static const uint16_t MIN_K = DEFAULT_M;
+ static const uint16_t MAX_K = (1 << 16) - 1;
+
+ explicit kll_sketch(uint16_t k = DEFAULT_K);
+ kll_sketch(const kll_sketch& other);
+ kll_sketch(kll_sketch&& other) noexcept;
+ ~kll_sketch();
+ kll_sketch& operator=(const kll_sketch& other);
+ kll_sketch& operator=(kll_sketch&& other);
+
+ /**
+ * Updates this sketch with the given data item.
+ * This method takes lvalue.
+ * @param value an item from a stream of items
+ */
+ void update(const T& value);
+
+ /**
+ * Updates this sketch with the given data item.
+ * This method takes rvalue.
+ * @param value an item from a stream of items
+ */
+ void update(T&& value);
+
+ /**
+ * Merges another sketch into this one.
+ * This method takes lvalue.
+ * @param other sketch to merge into this one
+ */
+ void merge(const kll_sketch& other);
+
+ /**
+ * Merges another sketch into this one.
+ * This method takes rvalue.
+ * @param other sketch to merge into this one
+ */
+ void merge(kll_sketch&& other);
+
+ /**
+ * Returns true if this sketch is empty.
+ * @return empty flag
+ */
+ bool is_empty() const;
+
+ /**
+ * Returns the length of the input stream.
+ * @return stream length
+ */
+ uint64_t get_n() const;
+
+ /**
+ * Returns the number of retained items (samples) in the sketch.
+ * @return the number of retained items
+ */
+ uint32_t get_num_retained() const;
+
+ /**
+ * Returns true if this sketch is in estimation mode.
+ * @return estimation mode flag
+ */
+ bool is_estimation_mode() const;
+
+ /**
+ * Returns the min value of the stream.
+ * For floating point types: if the sketch is empty this returns NaN.
+ * For other types: if the sketch is empty this throws runtime_error.
+ * @return the min value of the stream
+ */
+ T get_min_value() const;
+
+ /**
+ * Returns the max value of the stream.
+ * For floating point types: if the sketch is empty this returns NaN.
+ * For other types: if the sketch is empty this throws runtime_error.
+ * @return the max value of the stream
+ */
+ T get_max_value() const;
+
+ /**
+ * Returns an approximation to the value of the data item
+ * that would be preceded by the given fraction of a hypothetical sorted
+ * version of the input stream so far.
+ * <p>
+ * Note that this method has a fairly large overhead (microseconds instead of nanoseconds)
+ * so it should not be called multiple times to get different quantiles from the same
+ * sketch. Instead use get_quantiles(), which pays the overhead only once.
+ * <p>
+ * For floating point types: if the sketch is empty this returns NaN.
+ * For other types: if the sketch is empty this throws runtime_error.
+ *
+ * @param fraction the specified fractional position in the hypothetical sorted stream.
+ * These are also called normalized ranks or fractional ranks.
+ * If fraction = 0.0, the true minimum value of the stream is returned.
+ * If fraction = 1.0, the true maximum value of the stream is returned.
+ *
+ * @return the approximation to the value at the given fraction
+ */
+ T get_quantile(double fraction) const;
+
+ /**
+ * This is a more efficient multiple-query version of get_quantile().
+ * <p>
+ * This returns an array that could have been generated by using get_quantile() for each
+ * fractional rank separately, but would be very inefficient.
+ * This method incurs the internal set-up overhead once and obtains multiple quantile values in
+ * a single query. It is strongly recommend that this method be used instead of multiple calls
+ * to get_quantile().
+ *
+ * <p>If the sketch is empty this returns an empty vector.
+ *
+ * @param fractions given array of fractional positions in the hypothetical sorted stream.
+ * These are also called normalized ranks or fractional ranks.
+ * These fractions must be in the interval [0.0, 1.0], inclusive.
+ *
+ * @return array of approximations to the given fractions in the same order as given fractions
+ * in the input array.
+ */
+ std::vector<T, A> get_quantiles(const double* fractions, uint32_t size) const;
+
+ /**
+ * This is a multiple-query version of get_quantile() that allows the caller to
+ * specify the number of evenly-spaced fractional ranks.
+ *
+ * <p>If the sketch is empty this returns an empty vector.
+ *
+ * @param num an integer that specifies the number of evenly-spaced fractional ranks.
+ * This must be an integer greater than 0. A value of 1 will return the min value.
+ * A value of 2 will return the min and the max value. A value of 3 will return the min,
+ * the median and the max value, etc.
+ *
+ * @return array of approximations to the given number of evenly-spaced fractional ranks.
+ */
+ std::vector<T, A> get_quantiles(size_t num) const;
+
+ /**
+ * Returns an approximation to the normalized (fractional) rank of the given value from 0 to 1,
+ * inclusive.
+ *
+ * <p>The resulting approximation has a probabilistic guarantee that can be obtained from the
+ * get_normalized_rank_error(false) function.
+ *
+ * <p>If the sketch is empty this returns NaN.
+ *
+ * @param value to be ranked
+ * @return an approximate rank of the given value
+ */
+ double get_rank(const T& value) const;
+
+ /**
+ * Returns an approximation to the Probability Mass Function (PMF) of the input stream
+ * given a set of split points (values).
+ *
+ * <p>The resulting approximations have a probabilistic guarantee that can be obtained from the
+ * get_normalized_rank_error(true) function.
+ *
+ * <p>If the sketch is empty this returns an empty vector.
+ *
+ * @param split_points an array of <i>m</i> unique, monotonically increasing float values
+ * that divide the real number line into <i>m+1</i> consecutive disjoint intervals.
+ * The definition of an "interval" is inclusive of the left split point (or minimum value) and
+ * exclusive of the right split point, with the exception that the last interval will include
+ * the maximum value.
+ * It is not necessary to include either the min or max values in these split points.
+ *
+ * @return an array of m+1 doubles each of which is an approximation
+ * to the fraction of the input stream values (the mass) that fall into one of those intervals.
+ * The definition of an "interval" is inclusive of the left split point and exclusive of the right
+ * split point, with the exception that the last interval will include maximum value.
+ */
+ vector_d<A> get_PMF(const T* split_points, uint32_t size) const;
+
+ /**
+ * Returns an approximation to the Cumulative Distribution Function (CDF), which is the
+ * cumulative analog of the PMF, of the input stream given a set of split points (values).
+ *
+ * <p>The resulting approximations have a probabilistic guarantee that can be obtained from the
+ * get_normalized_rank_error(false) function.
+ *
+ * <p>If the sketch is empty this returns an empty vector.
+ *
+ * @param split_points an array of <i>m</i> unique, monotonically increasing float values
+ * that divide the real number line into <i>m+1</i> consecutive disjoint intervals.
+ * The definition of an "interval" is inclusive of the left split point (or minimum value) and
+ * exclusive of the right split point, with the exception that the last interval will include
+ * the maximum value.
+ * It is not necessary to include either the min or max values in these split points.
+ *
+ * @return an array of m+1 double values, which are a consecutive approximation to the CDF
+ * of the input stream given the split_points. The value at array position j of the returned
+ * CDF array is the sum of the returned values in positions 0 through j of the returned PMF
+ * array.
+ */
+ vector_d<A> get_CDF(const T* split_points, uint32_t size) const;
+
+ /**
+ * Gets the approximate rank error of this sketch normalized as a fraction between zero and one.
+ * @param pmf if true, returns the "double-sided" normalized rank error for the get_PMF() function.
+ * Otherwise, it is the "single-sided" normalized rank error for all the other queries.
+ * @return if pmf is true, returns the normalized rank error for the get_PMF() function.
+ * Otherwise, it is the "single-sided" normalized rank error for all the other queries.
+ */
+ double get_normalized_rank_error(bool pmf) const;
+
+ /**
+ * Computes size needed to serialize the current state of the sketch.
+ * This version is for fixed-size arithmetic types (integral and floating point).
+ * @return size in bytes needed to serialize this sketch
+ */
+ template<typename TT = T, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type = 0>
+ size_t get_serialized_size_bytes() const;
+
+ /**
+ * Computes size needed to serialize the current state of the sketch.
+ * This version is for all other types and can be expensive since every item needs to be looked at.
+ * @return size in bytes needed to serialize this sketch
+ */
+ template<typename TT = T, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type = 0>
+ size_t get_serialized_size_bytes() const;
+
+ /**
+ * This method serializes the sketch into a given stream in a binary form
+ * @param os output stream
+ */
+ void serialize(std::ostream& os) const;
+
+ // This is a convenience alias for users
+ // The type returned by the following serialize method
+ typedef vector_u8<A> vector_bytes;
+
+ /**
+ * This method serializes the sketch as a vector of bytes.
+ * An optional header can be reserved in front of the sketch.
+ * It is a blank space of a given size.
+ * This header is used in Datasketches PostgreSQL extension.
+ * @param header_size_bytes space to reserve in front of the sketch
+ */
+ vector_bytes serialize(unsigned header_size_bytes = 0) const;
+
+ /**
+ * This method deserializes a sketch from a given stream.
+ * @param is input stream
+ * @return an instance of a sketch
+ */
+ static kll_sketch<T, C, S, A> deserialize(std::istream& is);
+
+ /**
+ * This method deserializes a sketch from a given array of bytes.
+ * @param bytes pointer to the array of bytes
+ * @param size the size of the array
+ * @return an instance of a sketch
+ */
+ static kll_sketch<T, C, S, A> deserialize(const void* bytes, size_t size);
+
+ /*
+ * Gets the normalized rank error given k and pmf.
+ * k - the configuration parameter
+ * pmf - if true, returns the "double-sided" normalized rank error for the get_PMF() function.
+ * Otherwise, it is the "single-sided" normalized rank error for all the other queries.
+ * Constants were derived as the best fit to 99 percentile empirically measured max error in thousands of trials
+ */
+ static double get_normalized_rank_error(uint16_t k, bool pmf);
+
+ /**
+ * Prints a summary of the sketch.
+ * @param print_levels if true include information about levels
+ * @param print_items if true include sketch data
+ */
+ string<A> to_string(bool print_levels = false, bool print_items = false) const;
+
+ class const_iterator;
+ const_iterator begin() const;
+ const_iterator end() const;
+
+ #ifdef KLL_VALIDATION
+ uint8_t get_num_levels() { return num_levels_; }
+ uint32_t* get_levels() { return levels_; }
+ T* get_items() { return items_; }
+ #endif
+
+ private:
+ /* Serialized sketch layout:
+ * Adr:
+ * || 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
+ * 0 || unused | M |--------K--------| Flags | FamID | SerVer | PreambleInts |
+ * || 15 | 14 | 13 | 12 | 11 | 10 | 9 | 8 |
+ * 1 ||-----------------------------------N------------------------------------------|
+ * || 23 | 22 | 21 | 20 | 19 | 18 | 17 | 16 |
+ * 2 ||---------------data----------------|-unused-|numLevels|-------min K-----------|
+ */
+
+ static const size_t EMPTY_SIZE_BYTES = 8;
+ static const size_t DATA_START_SINGLE_ITEM = 8;
+ static const size_t DATA_START = 20;
+
+ static const uint8_t SERIAL_VERSION_1 = 1;
+ static const uint8_t SERIAL_VERSION_2 = 2;
+ static const uint8_t FAMILY = 15;
+
+ enum flags { IS_EMPTY, IS_LEVEL_ZERO_SORTED, IS_SINGLE_ITEM };
+
+ static const uint8_t PREAMBLE_INTS_SHORT = 2; // for empty and single item
+ static const uint8_t PREAMBLE_INTS_FULL = 5;
+
+ uint16_t k_;
+ uint8_t m_; // minimum buffer "width"
+ uint16_t min_k_; // for error estimation after merging with different k
+ uint64_t n_;
+ uint8_t num_levels_;
+ vector_u32<A> levels_;
+ T* items_;
+ uint32_t items_size_;
+ T* min_value_;
+ T* max_value_;
+ bool is_level_zero_sorted_;
+
+ // for deserialization
+ class item_deleter;
+ class items_deleter;
+ kll_sketch(uint16_t k, uint16_t min_k, uint64_t n, uint8_t num_levels, vector_u32<A>&& levels,
+ std::unique_ptr<T, items_deleter> items, uint32_t items_size, std::unique_ptr<T, item_deleter> min_value,
+ std::unique_ptr<T, item_deleter> max_value, bool is_level_zero_sorted);
+
+ // common update code
+ inline void update_min_max(const T& value);
+ inline uint32_t internal_update();
+
+ // The following code is only valid in the special case of exactly reaching capacity while updating.
+ // It cannot be used while merging, while reducing k, or anything else.
+ void compress_while_updating(void);
+
+ uint8_t find_level_to_compact() const;
+ void add_empty_top_level_to_completely_full_sketch();
+ void sort_level_zero();
+ std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> get_quantile_calculator();
+ vector_d<A> get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const;
+ void increment_buckets_unsorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
+ const T* split_points, uint32_t size, double* buckets) const;
+ void increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
+ const T* split_points, uint32_t size, double* buckets) const;
+ template<typename O> void merge_higher_levels(O&& other, uint64_t final_n);
+ void populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
+ void populate_work_arrays(kll_sketch&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels);
+ void assert_correct_total_weight() const;
+ uint32_t safe_level_size(uint8_t level) const;
+ uint32_t get_num_retained_above_level_zero() const;
+
+ static void check_m(uint8_t m);
+ static void check_preamble_ints(uint8_t preamble_ints, uint8_t flags_byte);
+ static void check_serial_version(uint8_t serial_version);
+ static void check_family_id(uint8_t family_id);
+
+ // implementations for floating point types
+ template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
+ static TT get_invalid_value() {
+ return std::numeric_limits<TT>::quiet_NaN();
+ }
+
+ template<typename TT = T, typename std::enable_if<std::is_floating_point<TT>::value, int>::type = 0>
+ static inline bool check_update_value(TT value) {
+ return !std::isnan(value);
+ }
+
+ // implementations for all other types
+ template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
+ static TT get_invalid_value() {
+ throw std::runtime_error("getting quantiles from empty sketch is not supported for this type of values");
+ }
+
+ template<typename TT = T, typename std::enable_if<!std::is_floating_point<TT>::value, int>::type = 0>
+ static inline bool check_update_value(TT) {
+ return true;
+ }
+
+};
+
+template<typename T, typename C, typename S, typename A>
+class kll_sketch<T, C, S, A>::const_iterator: public std::iterator<std::input_iterator_tag, T> {
+public:
+ friend class kll_sketch<T, C, S, A>;
+ const_iterator& operator++();
+ const_iterator& operator++(int);
+ bool operator==(const const_iterator& other) const;
+ bool operator!=(const const_iterator& other) const;
+ const std::pair<const T&, const uint64_t> operator*() const;
+private:
+ const T* items;
+ const uint32_t* levels;
+ const uint8_t num_levels;
+ uint32_t index;
+ uint8_t level;
+ uint64_t weight;
+ const_iterator(const T* items, const uint32_t* levels, const uint8_t num_levels);
+};
+
+} /* namespace datasketches */
+
+#include "kll_sketch_impl.hpp"
+
+#endif
diff --git a/be/src/thirdparty/datasketches/kll_sketch_impl.hpp b/be/src/thirdparty/datasketches/kll_sketch_impl.hpp
new file mode 100644
index 0000000..9b6c68e
--- /dev/null
+++ b/be/src/thirdparty/datasketches/kll_sketch_impl.hpp
@@ -0,0 +1,1130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef KLL_SKETCH_IMPL_HPP_
+#define KLL_SKETCH_IMPL_HPP_
+
+#include <iostream>
+#include <iomanip>
+#include <sstream>
+
+#include "memory_operations.hpp"
+#include "kll_helper.hpp"
+
+namespace datasketches {
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>::kll_sketch(uint16_t k):
+k_(k),
+m_(DEFAULT_M),
+min_k_(k),
+n_(0),
+num_levels_(1),
+levels_(2),
+items_(nullptr),
+items_size_(k_),
+min_value_(nullptr),
+max_value_(nullptr),
+is_level_zero_sorted_(false)
+{
+ if (k < MIN_K || k > MAX_K) {
+ throw std::invalid_argument("K must be >= " + std::to_string(MIN_K) + " and <= " + std::to_string(MAX_K) + ": " + std::to_string(k));
+ }
+ levels_[0] = levels_[1] = k;
+ items_ = A().allocate(items_size_);
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>::kll_sketch(const kll_sketch& other):
+k_(other.k_),
+m_(other.m_),
+min_k_(other.min_k_),
+n_(other.n_),
+num_levels_(other.num_levels_),
+levels_(other.levels_),
+items_(nullptr),
+items_size_(other.items_size_),
+min_value_(nullptr),
+max_value_(nullptr),
+is_level_zero_sorted_(other.is_level_zero_sorted_)
+{
+ items_ = A().allocate(items_size_);
+ std::copy(&other.items_[levels_[0]], &other.items_[levels_[num_levels_]], &items_[levels_[0]]);
+ if (other.min_value_ != nullptr) min_value_ = new (A().allocate(1)) T(*other.min_value_);
+ if (other.max_value_ != nullptr) max_value_ = new (A().allocate(1)) T(*other.max_value_);
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>::kll_sketch(kll_sketch&& other) noexcept:
+k_(other.k_),
+m_(other.m_),
+min_k_(other.min_k_),
+n_(other.n_),
+num_levels_(other.num_levels_),
+levels_(std::move(other.levels_)),
+items_(other.items_),
+items_size_(other.items_size_),
+min_value_(other.min_value_),
+max_value_(other.max_value_),
+is_level_zero_sorted_(other.is_level_zero_sorted_)
+{
+ other.items_ = nullptr;
+ other.min_value_ = nullptr;
+ other.max_value_ = nullptr;
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>& kll_sketch<T, C, S, A>::operator=(const kll_sketch& other) {
+ kll_sketch<T, C, S, A> copy(other);
+ std::swap(k_, copy.k_);
+ std::swap(m_, copy.m_);
+ std::swap(min_k_, copy.min_k_);
+ std::swap(n_, copy.n_);
+ std::swap(num_levels_, copy.num_levels_);
+ std::swap(levels_, copy.levels_);
+ std::swap(items_, copy.items_);
+ std::swap(items_size_, copy.items_size_);
+ std::swap(min_value_, copy.min_value_);
+ std::swap(max_value_, copy.max_value_);
+ std::swap(is_level_zero_sorted_, copy.is_level_zero_sorted_);
+ return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>& kll_sketch<T, C, S, A>::operator=(kll_sketch&& other) {
+ std::swap(k_, other.k_);
+ std::swap(m_, other.m_);
+ std::swap(min_k_, other.min_k_);
+ std::swap(n_, other.n_);
+ std::swap(num_levels_, other.num_levels_);
+ std::swap(levels_, other.levels_);
+ std::swap(items_, other.items_);
+ std::swap(items_size_, other.items_size_);
+ std::swap(min_value_, other.min_value_);
+ std::swap(max_value_, other.max_value_);
+ std::swap(is_level_zero_sorted_, other.is_level_zero_sorted_);
+ return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>::~kll_sketch() {
+ if (items_ != nullptr) {
+ const uint32_t begin = levels_[0];
+ const uint32_t end = levels_[num_levels_];
+ for (uint32_t i = begin; i < end; i++) items_[i].~T();
+ A().deallocate(items_, items_size_);
+ }
+ if (min_value_ != nullptr) {
+ min_value_->~T();
+ A().deallocate(min_value_, 1);
+ }
+ if (max_value_ != nullptr) {
+ max_value_->~T();
+ A().deallocate(max_value_, 1);
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::update(const T& value) {
+ if (!check_update_value(value)) { return; }
+ update_min_max(value);
+ const uint32_t index = internal_update();
+ new (&items_[index]) T(value);
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::update(T&& value) {
+ if (!check_update_value(value)) { return; }
+ update_min_max(value);
+ const uint32_t index = internal_update();
+ new (&items_[index]) T(std::move(value));
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::update_min_max(const T& value) {
+ if (is_empty()) {
+ min_value_ = new (A().allocate(1)) T(value);
+ max_value_ = new (A().allocate(1)) T(value);
+ } else {
+ if (C()(value, *min_value_)) *min_value_ = value;
+ if (C()(*max_value_, value)) *max_value_ = value;
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t kll_sketch<T, C, S, A>::internal_update() {
+ if (levels_[0] == 0) compress_while_updating();
+ n_++;
+ is_level_zero_sorted_ = false;
+ return --levels_[0];
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::merge(const kll_sketch& other) {
+ if (other.is_empty()) return;
+ if (m_ != other.m_) {
+ throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
+ }
+ if (is_empty()) {
+ min_value_ = new (A().allocate(1)) T(*other.min_value_);
+ max_value_ = new (A().allocate(1)) T(*other.max_value_);
+ } else {
+ if (C()(*other.min_value_, *min_value_)) *min_value_ = *other.min_value_;
+ if (C()(*max_value_, *other.max_value_)) *max_value_ = *other.max_value_;
+ }
+ const uint64_t final_n = n_ + other.n_;
+ for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
+ const uint32_t index = internal_update();
+ new (&items_[index]) T(other.items_[i]);
+ }
+ if (other.num_levels_ >= 2) merge_higher_levels(other, final_n);
+ n_ = final_n;
+ if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
+ assert_correct_total_weight();
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::merge(kll_sketch&& other) {
+ if (other.is_empty()) return;
+ if (m_ != other.m_) {
+ throw std::invalid_argument("incompatible M: " + std::to_string(m_) + " and " + std::to_string(other.m_));
+ }
+ if (is_empty()) {
+ min_value_ = new (A().allocate(1)) T(std::move(*other.min_value_));
+ max_value_ = new (A().allocate(1)) T(std::move(*other.max_value_));
+ } else {
+ if (C()(*other.min_value_, *min_value_)) *min_value_ = std::move(*other.min_value_);
+ if (C()(*max_value_, *other.max_value_)) *max_value_ = std::move(*other.max_value_);
+ }
+ const uint64_t final_n = n_ + other.n_;
+ for (uint32_t i = other.levels_[0]; i < other.levels_[1]; i++) {
+ const uint32_t index = internal_update();
+ new (&items_[index]) T(std::move(other.items_[i]));
+ }
+ if (other.num_levels_ >= 2) merge_higher_levels(std::forward<kll_sketch>(other), final_n);
+ n_ = final_n;
+ if (other.is_estimation_mode()) min_k_ = std::min(min_k_, other.min_k_);
+ assert_correct_total_weight();
+}
+
+template<typename T, typename C, typename S, typename A>
+bool kll_sketch<T, C, S, A>::is_empty() const {
+ return n_ == 0;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint64_t kll_sketch<T, C, S, A>::get_n() const {
+ return n_;
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t kll_sketch<T, C, S, A>::get_num_retained() const {
+ return levels_[num_levels_] - levels_[0];
+}
+
+template<typename T, typename C, typename S, typename A>
+bool kll_sketch<T, C, S, A>::is_estimation_mode() const {
+ return num_levels_ > 1;
+}
+
+template<typename T, typename C, typename S, typename A>
+T kll_sketch<T, C, S, A>::get_min_value() const {
+ if (is_empty()) return get_invalid_value();
+ return *min_value_;
+}
+
+template<typename T, typename C, typename S, typename A>
+T kll_sketch<T, C, S, A>::get_max_value() const {
+ if (is_empty()) return get_invalid_value();
+ return *max_value_;
+}
+
+template<typename T, typename C, typename S, typename A>
+T kll_sketch<T, C, S, A>::get_quantile(double fraction) const {
+ if (is_empty()) return get_invalid_value();
+ if (fraction == 0.0) return *min_value_;
+ if (fraction == 1.0) return *max_value_;
+ if ((fraction < 0.0) || (fraction > 1.0)) {
+ throw std::invalid_argument("Fraction cannot be less than zero or greater than 1.0");
+ }
+ // has side effect of sorting level zero if needed
+ auto quantile_calculator(const_cast<kll_sketch*>(this)->get_quantile_calculator());
+ return quantile_calculator->get_quantile(fraction);
+}
+
+template<typename T, typename C, typename S, typename A>
+std::vector<T, A> kll_sketch<T, C, S, A>::get_quantiles(const double* fractions, uint32_t size) const {
+ std::vector<T, A> quantiles;
+ if (is_empty()) return quantiles;
+ std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> quantile_calculator;
+ quantiles.reserve(size);
+ for (uint32_t i = 0; i < size; i++) {
+ const double fraction = fractions[i];
+ if ((fraction < 0.0) || (fraction > 1.0)) {
+ throw std::invalid_argument("Fraction cannot be less than zero or greater than 1.0");
+ }
+ if (fraction == 0.0) quantiles.push_back(*min_value_);
+ else if (fraction == 1.0) quantiles.push_back(*max_value_);
+ else {
+ if (!quantile_calculator) {
+ // has side effect of sorting level zero if needed
+ quantile_calculator = const_cast<kll_sketch*>(this)->get_quantile_calculator();
+ }
+ quantiles.push_back(quantile_calculator->get_quantile(fraction));
+ }
+ }
+ return quantiles;
+}
+
+template<typename T, typename C, typename S, typename A>
+std::vector<T, A> kll_sketch<T, C, S, A>::get_quantiles(size_t num) const {
+ if (is_empty()) return std::vector<T, A>();
+ if (num == 0) {
+ throw std::invalid_argument("num must be > 0");
+ }
+ std::vector<double> fractions(num);
+ fractions[0] = 0.0;
+ for (size_t i = 1; i < num; i++) {
+ fractions[i] = static_cast<double>(i) / (num - 1);
+ }
+ if (num > 1) {
+ fractions[num - 1] = 1.0;
+ }
+ return get_quantiles(fractions.data(), num);
+}
+
+template<typename T, typename C, typename S, typename A>
+double kll_sketch<T, C, S, A>::get_rank(const T& value) const {
+ if (is_empty()) return std::numeric_limits<double>::quiet_NaN();
+ uint8_t level(0);
+ uint64_t weight(1);
+ uint64_t total(0);
+ while (level < num_levels_) {
+ const auto from_index(levels_[level]);
+ const auto to_index(levels_[level + 1]); // exclusive
+ for (uint32_t i = from_index; i < to_index; i++) {
+ if (C()(items_[i], value)) {
+ total += weight;
+ } else if ((level > 0) || is_level_zero_sorted_) {
+ break; // levels above 0 are sorted, no point comparing further
+ }
+ }
+ level++;
+ weight *= 2;
+ }
+ return (double) total / n_;
+}
+
+template<typename T, typename C, typename S, typename A>
+vector_d<A> kll_sketch<T, C, S, A>::get_PMF(const T* split_points, uint32_t size) const {
+ return get_PMF_or_CDF(split_points, size, false);
+}
+
+template<typename T, typename C, typename S, typename A>
+vector_d<A> kll_sketch<T, C, S, A>::get_CDF(const T* split_points, uint32_t size) const {
+ return get_PMF_or_CDF(split_points, size, true);
+}
+
+template<typename T, typename C, typename S, typename A>
+double kll_sketch<T, C, S, A>::get_normalized_rank_error(bool pmf) const {
+ return get_normalized_rank_error(min_k_, pmf);
+}
+
+// implementation for fixed-size arithmetic types (integral and floating point)
+template<typename T, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<std::is_arithmetic<TT>::value, int>::type>
+size_t kll_sketch<T, C, S, A>::get_serialized_size_bytes() const {
+ if (is_empty()) { return EMPTY_SIZE_BYTES; }
+ if (num_levels_ == 1 && get_num_retained() == 1) {
+ return DATA_START_SINGLE_ITEM + sizeof(TT);
+ }
+ // the last integer in the levels_ array is not serialized because it can be derived
+ return DATA_START + num_levels_ * sizeof(uint32_t) + (get_num_retained() + 2) * sizeof(TT);
+}
+
+// implementation for all other types
+template<typename T, typename C, typename S, typename A>
+template<typename TT, typename std::enable_if<!std::is_arithmetic<TT>::value, int>::type>
+size_t kll_sketch<T, C, S, A>::get_serialized_size_bytes() const {
+ if (is_empty()) { return EMPTY_SIZE_BYTES; }
+ if (num_levels_ == 1 && get_num_retained() == 1) {
+ return DATA_START_SINGLE_ITEM + S().size_of_item(items_[levels_[0]]);
+ }
+ // the last integer in the levels_ array is not serialized because it can be derived
+ size_t size = DATA_START + num_levels_ * sizeof(uint32_t);
+ size += S().size_of_item(*min_value_);
+ size += S().size_of_item(*max_value_);
+ for (auto& it: *this) size += S().size_of_item(it.first);
+ return size;
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::serialize(std::ostream& os) const {
+ const bool is_single_item = n_ == 1;
+ const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
+ os.write(reinterpret_cast<const char*>(&preamble_ints), sizeof(preamble_ints));
+ const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
+ os.write(reinterpret_cast<const char*>(&serial_version), sizeof(serial_version));
+ const uint8_t family(FAMILY);
+ os.write(reinterpret_cast<const char*>(&family), sizeof(family));
+ const uint8_t flags_byte(
+ (is_empty() ? 1 << flags::IS_EMPTY : 0)
+ | (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
+ | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
+ );
+ os.write(reinterpret_cast<const char*>(&flags_byte), sizeof(flags_byte));
+ os.write((char*)&k_, sizeof(k_));
+ os.write((char*)&m_, sizeof(m_));
+ const uint8_t unused(0);
+ os.write(reinterpret_cast<const char*>(&unused), sizeof(unused));
+ if (is_empty()) return;
+ if (!is_single_item) {
+ os.write((char*)&n_, sizeof(n_));
+ os.write((char*)&min_k_, sizeof(min_k_));
+ os.write((char*)&num_levels_, sizeof(num_levels_));
+ os.write((char*)&unused, sizeof(unused));
+ os.write((char*)levels_.data(), sizeof(levels_[0]) * num_levels_);
+ S().serialize(os, min_value_, 1);
+ S().serialize(os, max_value_, 1);
+ }
+ S().serialize(os, &items_[levels_[0]], get_num_retained());
+}
+
+template<typename T, typename C, typename S, typename A>
+vector_u8<A> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const {
+ const bool is_single_item = n_ == 1;
+ const size_t size = header_size_bytes + get_serialized_size_bytes();
+ vector_u8<A> bytes(size);
+ uint8_t* ptr = bytes.data() + header_size_bytes;
+ uint8_t* end_ptr = ptr + size;
+ const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
+ ptr += copy_to_mem(&preamble_ints, ptr, sizeof(preamble_ints));
+ const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
+ ptr += copy_to_mem(&serial_version, ptr, sizeof(serial_version));
+ const uint8_t family(FAMILY);
+ ptr += copy_to_mem(&family, ptr, sizeof(family));
+ const uint8_t flags_byte(
+ (is_empty() ? 1 << flags::IS_EMPTY : 0)
+ | (is_level_zero_sorted_ ? 1 << flags::IS_LEVEL_ZERO_SORTED : 0)
+ | (is_single_item ? 1 << flags::IS_SINGLE_ITEM : 0)
+ );
+ ptr += copy_to_mem(&flags_byte, ptr, sizeof(flags_byte));
+ ptr += copy_to_mem(&k_, ptr, sizeof(k_));
+ ptr += copy_to_mem(&m_, ptr, sizeof(m_));
+ const uint8_t unused(0);
+ ptr += copy_to_mem(&unused, ptr, sizeof(unused));
+ if (!is_empty()) {
+ if (!is_single_item) {
+ ptr += copy_to_mem(&n_, ptr, sizeof(n_));
+ ptr += copy_to_mem(&min_k_, ptr, sizeof(min_k_));
+ ptr += copy_to_mem(&num_levels_, ptr, sizeof(num_levels_));
+ ptr += copy_to_mem(&unused, ptr, sizeof(unused));
+ ptr += copy_to_mem(levels_.data(), ptr, sizeof(levels_[0]) * num_levels_);
+ ptr += S().serialize(ptr, end_ptr - ptr, min_value_, 1);
+ ptr += S().serialize(ptr, end_ptr - ptr, max_value_, 1);
+ }
+ const size_t bytes_remaining = end_ptr - ptr;
+ ptr += S().serialize(ptr, bytes_remaining, &items_[levels_[0]], get_num_retained());
+ }
+ const size_t delta = ptr - bytes.data();
+ if (delta != size) throw std::logic_error("serialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size));
+ return bytes;
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(std::istream& is) {
+ uint8_t preamble_ints;
+ is.read((char*)&preamble_ints, sizeof(preamble_ints));
+ uint8_t serial_version;
+ is.read((char*)&serial_version, sizeof(serial_version));
+ uint8_t family_id;
+ is.read((char*)&family_id, sizeof(family_id));
+ uint8_t flags_byte;
+ is.read((char*)&flags_byte, sizeof(flags_byte));
+ uint16_t k;
+ is.read((char*)&k, sizeof(k));
+ uint8_t m;
+ is.read((char*)&m, sizeof(m));
+ uint8_t unused;
+ is.read((char*)&unused, sizeof(unused));
+
+ check_m(m);
+ check_preamble_ints(preamble_ints, flags_byte);
+ check_serial_version(serial_version);
+ check_family_id(family_id);
+
+ const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
+ if (!is.good()) throw std::runtime_error("error reading from std::istream");
+ if (is_empty) return kll_sketch<T, C, S, A>(k);
+
+ uint64_t n;
+ uint16_t min_k;
+ uint8_t num_levels;
+ const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM)); // used in serial version 2
+ if (is_single_item) {
+ n = 1;
+ min_k = k;
+ num_levels = 1;
+ } else {
+ is.read((char*)&n, sizeof(n_));
+ is.read((char*)&min_k, sizeof(min_k_));
+ is.read((char*)&num_levels, sizeof(num_levels));
+ is.read((char*)&unused, sizeof(unused));
+ }
+ vector_u32<A> levels(num_levels + 1);
+ const uint32_t capacity(kll_helper::compute_total_capacity(k, m, num_levels));
+ if (is_single_item) {
+ levels[0] = capacity - 1;
+ } else {
+ // the last integer in levels_ is not serialized because it can be derived
+ is.read((char*)levels.data(), sizeof(levels[0]) * num_levels);
+ }
+ levels[num_levels] = capacity;
+ auto item_buffer_deleter = [](T* ptr) { A().deallocate(ptr, 1); };
+ std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(A().allocate(1), item_buffer_deleter);
+ std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(A().allocate(1), item_buffer_deleter);
+ std::unique_ptr<T, item_deleter> min_value;
+ std::unique_ptr<T, item_deleter> max_value;
+ if (!is_single_item) {
+ S().deserialize(is, min_value_buffer.get(), 1);
+ // serde call did not throw, repackage with destrtuctor
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+ S().deserialize(is, max_value_buffer.get(), 1);
+ // serde call did not throw, repackage with destrtuctor
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+ }
+ auto items_buffer_deleter = [capacity](T* ptr) { A().deallocate(ptr, capacity); };
+ std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(A().allocate(capacity), items_buffer_deleter);
+ const auto num_items = levels[num_levels] - levels[0];
+ S().deserialize(is, &items_buffer.get()[levels[0]], num_items);
+ // serde call did not throw, repackage with destrtuctors
+ std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity));
+ const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
+ if (is_single_item) {
+ new (min_value_buffer.get()) T(items.get()[levels[0]]);
+ // copy did not throw, repackage with destrtuctor
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+ new (max_value_buffer.get()) T(items.get()[levels[0]]);
+ // copy did not throw, repackage with destrtuctor
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+ }
+ if (!is.good())
+ throw std::runtime_error("error reading from std::istream");
+ return kll_sketch(k, min_k, n, num_levels, std::move(levels), std::move(items), capacity,
+ std::move(min_value), std::move(max_value), is_level_zero_sorted);
+}
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A> kll_sketch<T, C, S, A>::deserialize(const void* bytes, size_t size) {
+ ensure_minimum_memory(size, 8);
+ const char* ptr = static_cast<const char*>(bytes);
+ uint8_t preamble_ints;
+ ptr += copy_from_mem(ptr, &preamble_ints, sizeof(preamble_ints));
+ uint8_t serial_version;
+ ptr += copy_from_mem(ptr, &serial_version, sizeof(serial_version));
+ uint8_t family_id;
+ ptr += copy_from_mem(ptr, &family_id, sizeof(family_id));
+ uint8_t flags_byte;
+ ptr += copy_from_mem(ptr, &flags_byte, sizeof(flags_byte));
+ uint16_t k;
+ ptr += copy_from_mem(ptr, &k, sizeof(k));
+ uint8_t m;
+ ptr += copy_from_mem(ptr, &m, sizeof(m));
+ ptr++; // skip unused byte
+
+ check_m(m);
+ check_preamble_ints(preamble_ints, flags_byte);
+ check_serial_version(serial_version);
+ check_family_id(family_id);
+ ensure_minimum_memory(size, 1 << preamble_ints);
+
+ const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
+ if (is_empty) return kll_sketch<T, C, S, A>(k);
+
+ uint64_t n;
+ uint16_t min_k;
+ uint8_t num_levels;
+ const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM)); // used in serial version 2
+ const char* end_ptr = static_cast<const char*>(bytes) + size;
+ if (is_single_item) {
+ n = 1;
+ min_k = k;
+ num_levels = 1;
+ } else {
+ ptr += copy_from_mem(ptr, &n, sizeof(n));
+ ptr += copy_from_mem(ptr, &min_k, sizeof(min_k));
+ ptr += copy_from_mem(ptr, &num_levels, sizeof(num_levels));
+ ptr++; // skip unused byte
+ }
+ vector_u32<A> levels(num_levels + 1);
+ const uint32_t capacity(kll_helper::compute_total_capacity(k, m, num_levels));
+ if (is_single_item) {
+ levels[0] = capacity - 1;
+ } else {
+ // the last integer in levels_ is not serialized because it can be derived
+ ptr += copy_from_mem(ptr, levels.data(), sizeof(levels[0]) * num_levels);
+ }
+ levels[num_levels] = capacity;
+ auto item_buffer_deleter = [](T* ptr) { A().deallocate(ptr, 1); };
+ std::unique_ptr<T, decltype(item_buffer_deleter)> min_value_buffer(A().allocate(1), item_buffer_deleter);
+ std::unique_ptr<T, decltype(item_buffer_deleter)> max_value_buffer(A().allocate(1), item_buffer_deleter);
+ std::unique_ptr<T, item_deleter> min_value;
+ std::unique_ptr<T, item_deleter> max_value;
+ if (!is_single_item) {
+ ptr += S().deserialize(ptr, end_ptr - ptr, min_value_buffer.get(), 1);
+ // serde call did not throw, repackage with destrtuctor
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+ ptr += S().deserialize(ptr, end_ptr - ptr, max_value_buffer.get(), 1);
+ // serde call did not throw, repackage with destrtuctor
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+ }
+ auto items_buffer_deleter = [capacity](T* ptr) { A().deallocate(ptr, capacity); };
+ std::unique_ptr<T, decltype(items_buffer_deleter)> items_buffer(A().allocate(capacity), items_buffer_deleter);
+ const auto num_items = levels[num_levels] - levels[0];
+ ptr += S().deserialize(ptr, end_ptr - ptr, &items_buffer.get()[levels[0]], num_items);
+ // serde call did not throw, repackage with destrtuctors
+ std::unique_ptr<T, items_deleter> items(items_buffer.release(), items_deleter(levels[0], capacity));
+ const size_t delta = ptr - static_cast<const char*>(bytes);
+ if (delta != size) throw std::logic_error("deserialized size mismatch: " + std::to_string(delta) + " != " + std::to_string(size));
+ const bool is_level_zero_sorted = (flags_byte & (1 << flags::IS_LEVEL_ZERO_SORTED)) > 0;
+ if (is_single_item) {
+ new (min_value_buffer.get()) T(items.get()[levels[0]]);
+ // copy did not throw, repackage with destrtuctor
+ min_value = std::unique_ptr<T, item_deleter>(min_value_buffer.release(), item_deleter());
+ new (max_value_buffer.get()) T(items.get()[levels[0]]);
+ // copy did not throw, repackage with destrtuctor
+ max_value = std::unique_ptr<T, item_deleter>(max_value_buffer.release(), item_deleter());
+ }
+ return kll_sketch(k, min_k, n, num_levels, std::move(levels), std::move(items), capacity,
+ std::move(min_value), std::move(max_value), is_level_zero_sorted);
+}
+
+/*
+ * Gets the normalized rank error given k and pmf.
+ * k - the configuration parameter
+ * pmf - if true, returns the "double-sided" normalized rank error for the get_PMF() function.
+ * Otherwise, it is the "single-sided" normalized rank error for all the other queries.
+ * Constants were derived as the best fit to 99 percentile empirically measured max error in thousands of trials
+ */
+template<typename T, typename C, typename S, typename A>
+double kll_sketch<T, C, S, A>::get_normalized_rank_error(uint16_t k, bool pmf) {
+ return pmf
+ ? 2.446 / pow(k, 0.9433)
+ : 2.296 / pow(k, 0.9723);
+}
+
+// for deserialization
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>::kll_sketch(uint16_t k, uint16_t min_k, uint64_t n, uint8_t num_levels, vector_u32<A>&& levels,
+ std::unique_ptr<T, items_deleter> items, uint32_t items_size, std::unique_ptr<T, item_deleter> min_value,
+ std::unique_ptr<T, item_deleter> max_value, bool is_level_zero_sorted):
+k_(k),
+m_(DEFAULT_M),
+min_k_(min_k),
+n_(n),
+num_levels_(num_levels),
+levels_(std::move(levels)),
+items_(items.release()),
+items_size_(items_size),
+min_value_(min_value.release()),
+max_value_(max_value.release()),
+is_level_zero_sorted_(is_level_zero_sorted)
+{}
+
+// The following code is only valid in the special case of exactly reaching capacity while updating.
+// It cannot be used while merging, while reducing k, or anything else.
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::compress_while_updating(void) {
+ const uint8_t level = find_level_to_compact();
+
+ // It is important to add the new top level right here. Be aware that this operation
+ // grows the buffer and shifts the data and also the boundaries of the data and grows the
+ // levels array and increments num_levels_
+ if (level == (num_levels_ - 1)) {
+ add_empty_top_level_to_completely_full_sketch();
+ }
+
+ const uint32_t raw_beg = levels_[level];
+ const uint32_t raw_lim = levels_[level + 1];
+ // +2 is OK because we already added a new top level if necessary
+ const uint32_t pop_above = levels_[level + 2] - raw_lim;
+ const uint32_t raw_pop = raw_lim - raw_beg;
+ const bool odd_pop = kll_helper::is_odd(raw_pop);
+ const uint32_t adj_beg = odd_pop ? raw_beg + 1 : raw_beg;
+ const uint32_t adj_pop = odd_pop ? raw_pop - 1 : raw_pop;
+ const uint32_t half_adj_pop = adj_pop / 2;
+ const uint32_t destroy_beg = levels_[0];
+
+ // level zero might not be sorted, so we must sort it if we wish to compact it
+ // sort_level_zero() is not used here because of the adjustment for odd number of items
+ if ((level == 0) && !is_level_zero_sorted_) {
+ std::sort(&items_[adj_beg], &items_[adj_beg + adj_pop], C());
+ }
+ if (pop_above == 0) {
+ kll_helper::randomly_halve_up(items_, adj_beg, adj_pop);
+ } else {
+ kll_helper::randomly_halve_down(items_, adj_beg, adj_pop);
+ kll_helper::merge_sorted_arrays<T, C>(items_, adj_beg, half_adj_pop, raw_lim, pop_above, adj_beg + half_adj_pop);
+ }
+ levels_[level + 1] -= half_adj_pop; // adjust boundaries of the level above
+ if (odd_pop) {
+ levels_[level] = levels_[level + 1] - 1; // the current level now contains one item
+ if (levels_[level] != raw_beg) items_[levels_[level]] = std::move(items_[raw_beg]); // namely this leftover guy
+ } else {
+ levels_[level] = levels_[level + 1]; // the current level is now empty
+ }
+
+ // verify that we freed up half_adj_pop array slots just below the current level
+ if (levels_[level] != (raw_beg + half_adj_pop)) throw std::logic_error("compaction error");
+
+ // finally, we need to shift up the data in the levels below
+ // so that the freed-up space can be used by level zero
+ if (level > 0) {
+ const uint32_t amount = raw_beg - levels_[0];
+ std::move_backward(&items_[levels_[0]], &items_[levels_[0] + amount], &items_[levels_[0] + half_adj_pop + amount]);
+ for (uint8_t lvl = 0; lvl < level; lvl++) levels_[lvl] += half_adj_pop;
+ }
+ for (uint32_t i = 0; i < half_adj_pop; i++) items_[i + destroy_beg].~T();
+}
+
+template<typename T, typename C, typename S, typename A>
+uint8_t kll_sketch<T, C, S, A>::find_level_to_compact() const {
+ uint8_t level = 0;
+ while (true) {
+ if (level >= num_levels_) throw std::logic_error("capacity calculation error");
+ const uint32_t pop = levels_[level + 1] - levels_[level];
+ const uint32_t cap = kll_helper::level_capacity(k_, num_levels_, level, m_);
+ if (pop >= cap) {
+ return level;
+ }
+ level++;
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::add_empty_top_level_to_completely_full_sketch() {
+ const uint32_t cur_total_cap = levels_[num_levels_];
+
+ // make sure that we are following a certain growth scheme
+ if (levels_[0] != 0) throw std::logic_error("full sketch expected");
+ if (items_size_ != cur_total_cap) throw std::logic_error("current capacity mismatch");
+
+ // note that merging MIGHT over-grow levels_, in which case we might not have to grow it here
+ const uint8_t new_levels_size = num_levels_ + 2;
+ if (levels_.size() < new_levels_size) {
+ levels_.resize(new_levels_size);
+ }
+
+ const uint32_t delta_cap = kll_helper::level_capacity(k_, num_levels_ + 1, 0, m_);
+ const uint32_t new_total_cap = cur_total_cap + delta_cap;
+
+ // move (and shift) the current data into the new buffer
+ T* new_buf = A().allocate(new_total_cap);
+ kll_helper::move_construct<T>(items_, 0, cur_total_cap, new_buf, delta_cap, true);
+ A().deallocate(items_, items_size_);
+ items_ = new_buf;
+ items_size_ = new_total_cap;
+
+ // this loop includes the old "extra" index at the top
+ for (uint8_t i = 0; i <= num_levels_; i++) {
+ levels_[i] += delta_cap;
+ }
+
+ if (levels_[num_levels_] != new_total_cap) throw std::logic_error("new capacity mismatch");
+
+ num_levels_++;
+ levels_[num_levels_] = new_total_cap; // initialize the new "extra" index at the top
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::sort_level_zero() {
+ if (!is_level_zero_sorted_) {
+ std::sort(&items_[levels_[0]], &items_[levels_[1]], C());
+ is_level_zero_sorted_ = true;
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> kll_sketch<T, C, S, A>::get_quantile_calculator() {
+ sort_level_zero();
+ typedef typename std::allocator_traits<A>::template rebind_alloc<kll_quantile_calculator<T, C, A>> AllocCalc;
+ std::unique_ptr<kll_quantile_calculator<T, C, A>, std::function<void(kll_quantile_calculator<T, C, A>*)>> quantile_calculator(
+ new (AllocCalc().allocate(1)) kll_quantile_calculator<T, C, A>(items_, levels_.data(), num_levels_, n_),
+ [](kll_quantile_calculator<T, C, A>* ptr){ ptr->~kll_quantile_calculator<T, C, A>(); AllocCalc().deallocate(ptr, 1); }
+ );
+ return quantile_calculator;
+}
+
+template<typename T, typename C, typename S, typename A>
+vector_d<A> kll_sketch<T, C, S, A>::get_PMF_or_CDF(const T* split_points, uint32_t size, bool is_CDF) const {
+ if (is_empty()) return vector_d<A>();
+ kll_helper::validate_values<T, C>(split_points, size);
+ vector_d<A> buckets(size + 1, 0);
+ uint8_t level(0);
+ uint64_t weight(1);
+ while (level < num_levels_) {
+ const auto from_index = levels_[level];
+ const auto to_index = levels_[level + 1]; // exclusive
+ if ((level == 0) && !is_level_zero_sorted_) {
+ increment_buckets_unsorted_level(from_index, to_index, weight, split_points, size, buckets.data());
+ } else {
+ increment_buckets_sorted_level(from_index, to_index, weight, split_points, size, buckets.data());
+ }
+ level++;
+ weight *= 2;
+ }
+ // normalize and, if CDF, convert to cumulative
+ if (is_CDF) {
+ double subtotal = 0;
+ for (uint32_t i = 0; i <= size; i++) {
+ subtotal += buckets[i];
+ buckets[i] = subtotal / n_;
+ }
+ } else {
+ for (uint32_t i = 0; i <= size; i++) {
+ buckets[i] /= n_;
+ }
+ }
+ return buckets;
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::increment_buckets_unsorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
+ const T* split_points, uint32_t size, double* buckets) const
+{
+ for (uint32_t i = from_index; i < to_index; i++) {
+ uint32_t j;
+ for (j = 0; j < size; j++) {
+ if (C()(items_[i], split_points[j])) {
+ break;
+ }
+ }
+ buckets[j] += weight;
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::increment_buckets_sorted_level(uint32_t from_index, uint32_t to_index, uint64_t weight,
+ const T* split_points, uint32_t size, double* buckets) const
+{
+ uint32_t i = from_index;
+ uint32_t j = 0;
+ while ((i < to_index) && (j < size)) {
+ if (C()(items_[i], split_points[j])) {
+ buckets[j] += weight; // this sample goes into this bucket
+ i++; // move on to next sample and see whether it also goes into this bucket
+ } else {
+ j++; // no more samples for this bucket
+ }
+ }
+ // now either i == to_index (we are out of samples), or
+ // j == size (we are out of buckets, but there are more samples remaining)
+ // we only need to do something in the latter case
+ if (j == size) {
+ buckets[j] += weight * (to_index - i);
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+template<typename O>
+void kll_sketch<T, C, S, A>::merge_higher_levels(O&& other, uint64_t final_n) {
+ const uint32_t tmp_num_items = get_num_retained() + other.get_num_retained_above_level_zero();
+ auto tmp_items_deleter = [tmp_num_items](T* ptr) { A().deallocate(ptr, tmp_num_items); }; // no destructor needed
+ const std::unique_ptr<T, decltype(tmp_items_deleter)> workbuf(A().allocate(tmp_num_items), tmp_items_deleter);
+ const uint8_t ub = kll_helper::ub_on_num_levels(final_n);
+ const size_t work_levels_size = ub + 2; // ub+1 does not work
+ vector_u32<A> worklevels(work_levels_size);
+ vector_u32<A> outlevels(work_levels_size);
+
+ const uint8_t provisional_num_levels = std::max(num_levels_, other.num_levels_);
+
+ populate_work_arrays(std::forward<O>(other), workbuf.get(), worklevels.data(), provisional_num_levels);
+
+ const kll_helper::compress_result result = kll_helper::general_compress<T, C>(k_, m_, provisional_num_levels, workbuf.get(),
+ worklevels.data(), outlevels.data(), is_level_zero_sorted_);
+
+ // ub can sometimes be much bigger
+ if (result.final_num_levels > ub) throw std::logic_error("merge error");
+
+ // now we need to transfer the results back into "this" sketch
+ if (result.final_capacity != items_size_) {
+ A().deallocate(items_, items_size_);
+ items_size_ = result.final_capacity;
+ items_ = A().allocate(items_size_);
+ }
+ const uint32_t free_space_at_bottom = result.final_capacity - result.final_num_items;
+ kll_helper::move_construct<T>(workbuf.get(), outlevels[0], outlevels[0] + result.final_num_items, items_, free_space_at_bottom, true);
+
+ if (levels_.size() < (result.final_num_levels + 1)) {
+ levels_.resize(result.final_num_levels + 1);
+ }
+ const uint32_t offset = free_space_at_bottom - outlevels[0];
+ for (uint8_t lvl = 0; lvl < levels_.size(); lvl++) { // includes the "extra" index
+ levels_[lvl] = outlevels[lvl] + offset;
+ }
+ num_levels_ = result.final_num_levels;
+}
+
+// this leaves items_ uninitialized (all objects moved out and destroyed)
+// this version copies objects from the incoming sketch
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::populate_work_arrays(const kll_sketch& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
+ worklevels[0] = 0;
+
+ // the level zero data from "other" was already inserted into "this"
+ kll_helper::move_construct<T>(items_, levels_[0], levels_[1], workbuf, 0, true);
+ worklevels[1] = safe_level_size(0);
+
+ for (uint8_t lvl = 1; lvl < provisional_num_levels; lvl++) {
+ const uint32_t self_pop = safe_level_size(lvl);
+ const uint32_t other_pop = other.safe_level_size(lvl);
+ worklevels[lvl + 1] = worklevels[lvl] + self_pop + other_pop;
+
+ if ((self_pop > 0) && (other_pop == 0)) {
+ kll_helper::move_construct<T>(items_, levels_[lvl], levels_[lvl] + self_pop, workbuf, worklevels[lvl], true);
+ } else if ((self_pop == 0) && (other_pop > 0)) {
+ kll_helper::copy_construct<T>(other.items_, other.levels_[lvl], other.levels_[lvl] + other_pop, workbuf, worklevels[lvl]);
+ } else if ((self_pop > 0) && (other_pop > 0)) {
+ kll_helper::merge_sorted_arrays<T, C>(items_, levels_[lvl], self_pop, other.items_, other.levels_[lvl], other_pop, workbuf, worklevels[lvl]);
+ }
+ }
+}
+
+// this leaves items_ uninitialized (all objects moved out and destroyed)
+// this version moves objects from the incoming sketch
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::populate_work_arrays(kll_sketch&& other, T* workbuf, uint32_t* worklevels, uint8_t provisional_num_levels) {
+ worklevels[0] = 0;
+
+ // the level zero data from "other" was already inserted into "this"
+ kll_helper::move_construct<T>(items_, levels_[0], levels_[1], workbuf, 0, true);
+ worklevels[1] = safe_level_size(0);
+
+ for (uint8_t lvl = 1; lvl < provisional_num_levels; lvl++) {
+ const uint32_t self_pop = safe_level_size(lvl);
+ const uint32_t other_pop = other.safe_level_size(lvl);
+ worklevels[lvl + 1] = worklevels[lvl] + self_pop + other_pop;
+
+ if ((self_pop > 0) && (other_pop == 0)) {
+ kll_helper::move_construct<T>(items_, levels_[lvl], levels_[lvl] + self_pop, workbuf, worklevels[lvl], true);
+ } else if ((self_pop == 0) && (other_pop > 0)) {
+ kll_helper::move_construct<T>(other.items_, other.levels_[lvl], other.levels_[lvl] + other_pop, workbuf, worklevels[lvl], false);
+ } else if ((self_pop > 0) && (other_pop > 0)) {
+ kll_helper::merge_sorted_arrays<T, C>(items_, levels_[lvl], self_pop, other.items_, other.levels_[lvl], other_pop, workbuf, worklevels[lvl]);
+ }
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::assert_correct_total_weight() const {
+ const uint64_t total(kll_helper::sum_the_sample_weights(num_levels_, levels_.data()));
+ if (total != n_) {
+ throw std::logic_error("Total weight does not match N");
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t kll_sketch<T, C, S, A>::safe_level_size(uint8_t level) const {
+ if (level >= num_levels_) return 0;
+ return levels_[level + 1] - levels_[level];
+}
+
+template<typename T, typename C, typename S, typename A>
+uint32_t kll_sketch<T, C, S, A>::get_num_retained_above_level_zero() const {
+ if (num_levels_ == 1) return 0;
+ return levels_[num_levels_] - levels_[1];
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::check_m(uint8_t m) {
+ if (m != DEFAULT_M) {
+ throw std::invalid_argument("Possible corruption: M must be " + std::to_string(DEFAULT_M)
+ + ": " + std::to_string(m));
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::check_preamble_ints(uint8_t preamble_ints, uint8_t flags_byte) {
+ const bool is_empty(flags_byte & (1 << flags::IS_EMPTY));
+ const bool is_single_item(flags_byte & (1 << flags::IS_SINGLE_ITEM));
+ if (is_empty || is_single_item) {
+ if (preamble_ints != PREAMBLE_INTS_SHORT) {
+ throw std::invalid_argument("Possible corruption: preamble ints must be "
+ + std::to_string(PREAMBLE_INTS_SHORT) + " for an empty or single item sketch: " + std::to_string(preamble_ints));
+ }
+ } else {
+ if (preamble_ints != PREAMBLE_INTS_FULL) {
+ throw std::invalid_argument("Possible corruption: preamble ints must be "
+ + std::to_string(PREAMBLE_INTS_FULL) + " for a sketch with more than one item: " + std::to_string(preamble_ints));
+ }
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::check_serial_version(uint8_t serial_version) {
+ if (serial_version != SERIAL_VERSION_1 && serial_version != SERIAL_VERSION_2) {
+ throw std::invalid_argument("Possible corruption: serial version mismatch: expected "
+ + std::to_string(SERIAL_VERSION_1) + " or " + std::to_string(SERIAL_VERSION_2)
+ + ", got " + std::to_string(serial_version));
+ }
+}
+
+template<typename T, typename C, typename S, typename A>
+void kll_sketch<T, C, S, A>::check_family_id(uint8_t family_id) {
+ if (family_id != FAMILY) {
+ throw std::invalid_argument("Possible corruption: family mismatch: expected "
+ + std::to_string(FAMILY) + ", got " + std::to_string(family_id));
+ }
+}
+
+template <typename T, typename C, typename S, typename A>
+string<A> kll_sketch<T, C, S, A>::to_string(bool print_levels, bool print_items) const {
+ std::basic_ostringstream<char, std::char_traits<char>, AllocChar<A>> os;
+ os << "### KLL sketch summary:" << std::endl;
+ os << " K : " << k_ << std::endl;
+ os << " min K : " << min_k_ << std::endl;
+ os << " M : " << (unsigned int) m_ << std::endl;
+ os << " N : " << n_ << std::endl;
+ os << " Epsilon : " << std::setprecision(3) << get_normalized_rank_error(false) * 100 << "%" << std::endl;
+ os << " Epsilon PMF : " << get_normalized_rank_error(true) * 100 << "%" << std::endl;
+ os << " Empty : " << (is_empty() ? "true" : "false") << std::endl;
+ os << " Estimation mode: " << (is_estimation_mode() ? "true" : "false") << std::endl;
+ os << " Levels : " << (unsigned int) num_levels_ << std::endl;
+ os << " Sorted : " << (is_level_zero_sorted_ ? "true" : "false") << std::endl;
+ os << " Capacity items : " << items_size_ << std::endl;
+ os << " Retained items : " << get_num_retained() << std::endl;
+ os << " Storage bytes : " << get_serialized_size_bytes() << std::endl;
+ if (!is_empty()) {
+ os << " Min value : " << *min_value_ << std::endl;
+ os << " Max value : " << *max_value_ << std::endl;
+ }
+ os << "### End sketch summary" << std::endl;
+
+ if (print_levels) {
+ os << "### KLL sketch levels:" << std::endl;
+ os << " index: nominal capacity, actual size" << std::endl;
+ for (uint8_t i = 0; i < num_levels_; i++) {
+ os << " " << (unsigned int) i << ": " << kll_helper::level_capacity(k_, num_levels_, i, m_) << ", " << safe_level_size(i) << std::endl;
+ }
+ os << "### End sketch levels" << std::endl;
+ }
+
+ if (print_items) {
+ os << "### KLL sketch data:" << std::endl;
+ uint8_t level(0);
+ while (level < num_levels_) {
+ const uint32_t from_index = levels_[level];
+ const uint32_t to_index = levels_[level + 1]; // exclusive
+ if (from_index < to_index) {
+ os << " level " << (unsigned int) level << ":" << std::endl;
+ }
+ for (uint32_t i = from_index; i < to_index; i++) {
+ os << " " << items_[i] << std::endl;
+ }
+ level++;
+ }
+ os << "### End sketch data" << std::endl;
+ }
+ return os.str();
+}
+
+template <typename T, typename C, typename S, typename A>
+typename kll_sketch<T, C, S, A>::const_iterator kll_sketch<T, C, S, A>::begin() const {
+ return kll_sketch<T, C, S, A>::const_iterator(items_, levels_.data(), num_levels_);
+}
+
+template <typename T, typename C, typename S, typename A>
+typename kll_sketch<T, C, S, A>::const_iterator kll_sketch<T, C, S, A>::end() const {
+ return kll_sketch<T, C, S, A>::const_iterator(nullptr, nullptr, num_levels_);
+}
+
+// kll_sketch::const_iterator implementation
+
+template<typename T, typename C, typename S, typename A>
+kll_sketch<T, C, S, A>::const_iterator::const_iterator(const T* items, const uint32_t* levels, const uint8_t num_levels):
+items(items), levels(levels), num_levels(num_levels), index(levels == nullptr ? 0 : levels[0]), level(levels == nullptr ? num_levels : 0), weight(1)
+{}
+
+template<typename T, typename C, typename S, typename A>
+typename kll_sketch<T, C, S, A>::const_iterator& kll_sketch<T, C, S, A>::const_iterator::operator++() {
+ ++index;
+ if (index == levels[level + 1]) { // go to the next non-empty level
+ do {
+ ++level;
+ weight *= 2;
+ } while (level < num_levels && levels[level] == levels[level + 1]);
+ }
+ return *this;
+}
+
+template<typename T, typename C, typename S, typename A>
+typename kll_sketch<T, C, S, A>::const_iterator& kll_sketch<T, C, S, A>::const_iterator::operator++(int) {
+ const_iterator tmp(*this);
+ operator++();
+ return tmp;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool kll_sketch<T, C, S, A>::const_iterator::operator==(const const_iterator& other) const {
+ if (level != other.level) return false;
+ if (level == num_levels) return true; // end
+ return index == other.index;
+}
+
+template<typename T, typename C, typename S, typename A>
+bool kll_sketch<T, C, S, A>::const_iterator::operator!=(const const_iterator& other) const {
+ return !operator==(other);
+}
+
+template<typename T, typename C, typename S, typename A>
+const std::pair<const T&, const uint64_t> kll_sketch<T, C, S, A>::const_iterator::operator*() const {
+ return std::pair<const T&, const uint64_t>(items[index], weight);
+}
+
+template<typename T, typename C, typename S, typename A>
+class kll_sketch<T, C, S, A>::item_deleter {
+ public:
+ void operator() (T* ptr) const {
+ if (ptr != nullptr) {
+ ptr->~T();
+ A().deallocate(ptr, 1);
+ }
+ }
+};
+
+template<typename T, typename C, typename S, typename A>
+class kll_sketch<T, C, S, A>::items_deleter {
+ public:
+ items_deleter(uint32_t start, uint32_t num): start(start), num(num) {}
+ void operator() (T* ptr) const {
+ if (ptr != nullptr) {
+ for (uint32_t i = start; i < num; ++i) {
+ ptr[i].~T();
+ }
+ A().deallocate(ptr, num);
+ }
+ }
+ private:
+ uint32_t start;
+ uint32_t num;
+};
+
+} /* namespace datasketches */
+
+#endif
diff --git a/be/src/thirdparty/datasketches/memory_operations.hpp b/be/src/thirdparty/datasketches/memory_operations.hpp
new file mode 100644
index 0000000..80dc3a3
--- /dev/null
+++ b/be/src/thirdparty/datasketches/memory_operations.hpp
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _MEMORY_OPERATIONS_HPP_
+#define _MEMORY_OPERATIONS_HPP_
+
+#include <memory>
+#include <exception>
+#include <iostream>
+
+namespace datasketches {
+
+static inline void ensure_minimum_memory(size_t bytes_available, size_t min_needed) {
+ if (bytes_available < min_needed) {
+ throw std::out_of_range("Insufficient buffer size detected: bytes available "
+ + std::to_string(bytes_available) + ", minimum needed " + std::to_string(min_needed));
+ }
+}
+
+static inline void check_memory_size(size_t requested_index, size_t capacity) {
+ if (requested_index > capacity) {
+ throw std::out_of_range("Attempt to access memory beyond limits: requested index "
+ + std::to_string(requested_index) + ", capacity " + std::to_string(capacity));
+ }
+}
+
+// note: size is in bytes, not items
+static inline size_t copy_from_mem(const void* src, void* dst, size_t size) {
+ memcpy(dst, src, size);
+ return size;
+}
+
+// note: size is in bytes, not items
+static inline size_t copy_to_mem(const void* src, void* dst, size_t size) {
+ memcpy(dst, src, size);
+ return size;
+}
+
+} // namespace
+
+#endif // _MEMORY_OPERATIONS_HPP_
diff --git a/be/src/thirdparty/datasketches/serde.hpp b/be/src/thirdparty/datasketches/serde.hpp
new file mode 100644
index 0000000..d0819d8
--- /dev/null
+++ b/be/src/thirdparty/datasketches/serde.hpp
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DATASKETCHES_SERDE_HPP_
+#define DATASKETCHES_SERDE_HPP_
+
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <string>
+#include <exception>
+
+#include "memory_operations.hpp"
+
+namespace datasketches {
+
+// serialize and deserialize
+template<typename T, typename Enable = void> struct serde {
+ // stream serialization
+ void serialize(std::ostream& os, const T* items, unsigned num);
+ void deserialize(std::istream& is, T* items, unsigned num); // items allocated but not initialized
+
+ // raw bytes serialization
+ size_t size_of_item(const T& item);
+ size_t serialize(void* ptr, size_t capacity, const T* items, unsigned num);
+ size_t deserialize(const void* ptr, size_t capacity, T* items, unsigned num); // items allocated but not initialized
+};
+
+// serde for all fixed-size arithmetic types (int and float of different sizes)
+// in particular, kll_sketch<int64_t> should produce sketches binary-compatible
+// with LongsSketch and ItemsSketch<Long> with ArrayOfLongsSerDe in Java
+template<typename T>
+struct serde<T, typename std::enable_if<std::is_arithmetic<T>::value>::type> {
+ void serialize(std::ostream& os, const T* items, unsigned num) {
+ bool failure = false;
+ try {
+ os.write(reinterpret_cast<const char*>(items), sizeof(T) * num);
+ } catch (std::ostream::failure& e) {
+ failure = true;
+ }
+ if (failure || !os.good()) {
+ throw std::runtime_error("error writing to std::ostream with " + std::to_string(num) + " items");
+ }
+ }
+ void deserialize(std::istream& is, T* items, unsigned num) {
+ bool failure = false;
+ try {
+ is.read((char*)items, sizeof(T) * num);
+ } catch (std::istream::failure& e) {
+ failure = true;
+ }
+ if (failure || !is.good()) {
+ throw std::runtime_error("error reading from std::istream with " + std::to_string(num) + " items");
+ }
+ }
+
+ size_t size_of_item(const T&) {
+ return sizeof(T);
+ }
+ size_t serialize(void* ptr, size_t capacity, const T* items, unsigned num) {
+ const size_t bytes_written = sizeof(T) * num;
+ check_memory_size(bytes_written, capacity);
+ memcpy(ptr, items, bytes_written);
+ return bytes_written;
+ }
+ size_t deserialize(const void* ptr, size_t capacity, T* items, unsigned num) {
+ const size_t bytes_read = sizeof(T) * num;
+ check_memory_size(bytes_read, capacity);
+ memcpy(items, ptr, bytes_read);
+ return bytes_read;
+ }
+};
+
+// serde for std::string items
+// This should produce sketches binary-compatible with
+// ItemsSketch<String> with ArrayOfStringsSerDe in Java.
+// The length of each string is stored as a 32-bit integer (historically),
+// which may be too wasteful. Treat this as an example.
+template<>
+struct serde<std::string> {
+ void serialize(std::ostream& os, const std::string* items, unsigned num) {
+ unsigned i = 0;
+ bool failure = false;
+ try {
+ for (; i < num && os.good(); i++) {
+ uint32_t length = items[i].size();
+ os.write((char*)&length, sizeof(length));
+ os.write(items[i].c_str(), length);
+ }
+ } catch (std::ostream::failure& e) {
+ failure = true;
+ }
+ if (failure || !os.good()) {
+ throw std::runtime_error("error writing to std::ostream at item " + std::to_string(i));
+ }
+ }
+ void deserialize(std::istream& is, std::string* items, unsigned num) {
+ unsigned i = 0;
+ bool failure = false;
+ try {
+ for (; i < num; i++) {
+ uint32_t length;
+ is.read((char*)&length, sizeof(length));
+ if (!is.good()) { break; }
+ std::string str;
+ str.reserve(length);
+ for (uint32_t j = 0; j < length; j++) {
+ str.push_back(is.get());
+ }
+ if (!is.good()) { break; }
+ new (&items[i]) std::string(std::move(str));
+ }
+ } catch (std::istream::failure& e) {
+ failure = true;
+ }
+ if (failure || !is.good()) {
+ // clean up what we've already allocated
+ for (unsigned j = 0; j < i; ++j) {
+ items[j].~basic_string();
+ }
+ throw std::runtime_error("error reading from std::istream at item " + std::to_string(i));
+ }
+ }
+ size_t size_of_item(const std::string& item) {
+ return sizeof(uint32_t) + item.size();
+ }
+ size_t serialize(void* ptr, size_t capacity, const std::string* items, unsigned num) {
+ size_t bytes_written = 0;
+ for (unsigned i = 0; i < num; ++i) {
+ const uint32_t length = items[i].size();
+ const size_t new_bytes = length + sizeof(length);
+ check_memory_size(bytes_written + new_bytes, capacity);
+ memcpy(ptr, &length, sizeof(length));
+ ptr = static_cast<char*>(ptr) + sizeof(uint32_t);
+ memcpy(ptr, items[i].c_str(), length);
+ ptr = static_cast<char*>(ptr) + length;
+ bytes_written += new_bytes;
+ }
+ return bytes_written;
+ }
+ size_t deserialize(const void* ptr, size_t capacity, std::string* items, unsigned num) {
+ size_t bytes_read = 0;
+ unsigned i = 0;
+ bool failure = false;
+ for (; i < num && !failure; ++i) {
+ uint32_t length;
+ if (bytes_read + sizeof(length) > capacity) {
+ bytes_read += sizeof(length); // we'll use this to report the error
+ failure = true;
+ break;
+ }
+ memcpy(&length, ptr, sizeof(length));
+ ptr = static_cast<const char*>(ptr) + sizeof(uint32_t);
+ bytes_read += sizeof(length);
+
+ if (bytes_read + length > capacity) {
+ bytes_read += length; // we'll use this to report the error
+ failure = true;
+ break;
+ }
+ new (&items[i]) std::string(static_cast<const char*>(ptr), length);
+ ptr = static_cast<const char*>(ptr) + length;
+ bytes_read += length;
+ }
+
+ if (failure) {
+ // clean up what we've already allocated
+ for (unsigned j = 0; j < i; ++j)
+ items[j].~basic_string();
+ // using this for a consistent error message
+ check_memory_size(bytes_read, capacity);
+ }
+
+ return bytes_read;
+ }
+};
+
+} /* namespace datasketches */
+
+# endif