You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/05/12 01:41:02 UTC
[impala] branch master updated: IMPALA-10282: Implement
ds_cpc_sketch() and ds_cpc_estimate() functions
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new e39c30b IMPALA-10282: Implement ds_cpc_sketch() and ds_cpc_estimate() functions
e39c30b is described below
commit e39c30b3cd94cc334068203a948bacdabf3eea41
Author: Fucun Chu <ch...@hotmail.com>
AuthorDate: Tue Oct 27 15:11:29 2020 +0800
IMPALA-10282: Implement ds_cpc_sketch() and ds_cpc_estimate() functions
These functions can be used to get cardinality estimates of data
using CPC algorithm from Apache DataSketches. ds_cpc_sketch()
receives a dataset, e.g. a column from a table, and returns a
serialized CPC sketch in string format. This can be written to a
table or be fed directly to ds_cpc_estimate() that returns the
cardinality estimate for that sketch.
Similar to the HLL sketch, the primary use-case for the CPC sketch
is for counting distinct values as a stream, and then merging
multiple sketches together for a total distinct count.
For more details about Apache DataSketches' CPC see:
http://datasketches.apache.org/docs/CPC/CPC.html
Figures-of-Merit Comparison of the HLL and CPC Sketches see:
https://datasketches.apache.org/docs/DistinctCountMeritComparisons.html
Testing:
- Added some tests running estimates for small datasets where the
amount of data is small enough to get the correct results.
- Ran manual tests on tpch_parquet.lineitem to compare perfomance
with ndv(). Depending on data characteristics ndv() appears 2x-3x
faster. CPC gives closer estimate than current ndv(). CPC is more
accurate than HLL in some cases
Change-Id: I731e66fbadc74bc339c973f4d9337db9b7dd715a
Reviewed-on: http://gerrit.cloudera.org:8080/16656
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exprs/aggregate-functions-ir.cc | 121 +++++++++++++
be/src/exprs/aggregate-functions.h | 9 +
be/src/exprs/datasketches-common.cc | 4 +
be/src/exprs/datasketches-common.h | 4 +
be/src/exprs/datasketches-functions-ir.cc | 12 ++
be/src/exprs/datasketches-functions.h | 6 +
common/function-registry/impala_functions.py | 2 +
.../java/org/apache/impala/catalog/BuiltinsDb.java | 43 +++++
testdata/data/README | 6 +
testdata/data/cpc_sketches_from_hive.parquet | Bin 0 -> 2906 bytes
.../queries/QueryTest/datasketches-cpc.test | 187 +++++++++++++++++++++
tests/query_test/test_datasketches.py | 4 +
12 files changed, 398 insertions(+)
diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index e52b69d..774a64d 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -40,6 +40,8 @@
#include "runtime/timestamp-value.h"
#include "runtime/timestamp-value.inline.h"
#include "thirdparty/datasketches/hll.hpp"
+#include "thirdparty/datasketches/cpc_sketch.hpp"
+#include "thirdparty/datasketches/cpc_union.hpp"
#include "thirdparty/datasketches/theta_sketch.hpp"
#include "thirdparty/datasketches/theta_union.hpp"
#include "thirdparty/datasketches/theta_intersection.hpp"
@@ -1640,6 +1642,17 @@ StringVal SerializeDsHllUnion(FunctionContext* ctx,
return SerializeCompactDsHllSketch(ctx, sketch);
}
+/// Auxiliary function that receives a cpc_sketch and returns the serialized version of
+/// it wrapped into a StringVal.
+/// Introducing this function in the .cc to avoid including the whole DataSketches CPC
+/// functionality into the header.
+StringVal SerializeDsCpcSketch(
+ FunctionContext* ctx, const datasketches::cpc_sketch& sketch) {
+ std::stringstream serialized_input;
+ sketch.serialize(serialized_input);
+ return StringStreamToStringVal(ctx, serialized_input);
+}
+
/// Auxiliary function that receives a theta_sketch and returns the serialized version of
/// it wrapped into a StringVal.
/// Introducing this function in the .cc to avoid including the whole DataSketches Theta
@@ -1852,6 +1865,97 @@ StringVal AggregateFunctions::DsHllUnionFinalize(FunctionContext* ctx,
return result;
}
+void AggregateFunctions::DsCpcInit(FunctionContext* ctx, StringVal* dst) {
+ AllocBuffer(ctx, dst, sizeof(datasketches::cpc_sketch));
+ if (UNLIKELY(dst->is_null)) {
+ DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+ return;
+ }
+ // Note, that cpc_sketch will always have the same size regardless of the amount of data
+ // it keeps track. This is because it's a wrapper class that holds all the inserted data
+ // on heap. Here, we put only the wrapper class into a StringVal.
+ datasketches::cpc_sketch* sketch_ptr =
+ reinterpret_cast<datasketches::cpc_sketch*>(dst->ptr);
+ *sketch_ptr = datasketches::cpc_sketch(DS_CPC_SKETCH_CONFIG);
+}
+
+template <typename T>
+void AggregateFunctions::DsCpcUpdate(FunctionContext* ctx, const T& src, StringVal* dst) {
+ if (src.is_null) return;
+ DCHECK(!dst->is_null);
+ DCHECK_EQ(dst->len, sizeof(datasketches::cpc_sketch));
+ datasketches::cpc_sketch* sketch_ptr =
+ reinterpret_cast<datasketches::cpc_sketch*>(dst->ptr);
+ sketch_ptr->update(src.val);
+}
+
+// Specialize for StringVal
+template <>
+void AggregateFunctions::DsCpcUpdate(
+ FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+ if (src.is_null || src.len == 0) return;
+ DCHECK(!dst->is_null);
+ DCHECK_EQ(dst->len, sizeof(datasketches::cpc_sketch));
+ datasketches::cpc_sketch* sketch_ptr =
+ reinterpret_cast<datasketches::cpc_sketch*>(dst->ptr);
+ sketch_ptr->update(reinterpret_cast<char*>(src.ptr), src.len);
+}
+
+StringVal AggregateFunctions::DsCpcSerialize(FunctionContext* ctx, const StringVal& src) {
+ DCHECK(!src.is_null);
+ DCHECK_EQ(src.len, sizeof(datasketches::cpc_sketch));
+ datasketches::cpc_sketch* sketch_ptr =
+ reinterpret_cast<datasketches::cpc_sketch*>(src.ptr);
+ StringVal dst = SerializeDsCpcSketch(ctx, *sketch_ptr);
+ ctx->Free(src.ptr);
+ return dst;
+}
+
+void AggregateFunctions::DsCpcMerge(
+ FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+ DCHECK(!src.is_null);
+ DCHECK(!dst->is_null);
+ DCHECK_EQ(dst->len, sizeof(datasketches::cpc_sketch));
+ datasketches::cpc_sketch src_sketch;
+ if (!DeserializeDsSketch(src, &src_sketch)) {
+ LogSketchDeserializationError(ctx);
+ return;
+ }
+
+ datasketches::cpc_sketch* dst_sketch_ptr =
+ reinterpret_cast<datasketches::cpc_sketch*>(dst->ptr);
+
+ datasketches::cpc_union union_sketch(DS_CPC_SKETCH_CONFIG);
+ union_sketch.update(src_sketch);
+ union_sketch.update(*dst_sketch_ptr);
+
+ *dst_sketch_ptr = union_sketch.get_result();
+}
+
+BigIntVal AggregateFunctions::DsCpcFinalize(FunctionContext* ctx, const StringVal& src) {
+ DCHECK(!src.is_null);
+ DCHECK_EQ(src.len, sizeof(datasketches::cpc_sketch));
+ datasketches::cpc_sketch* sketch_ptr =
+ reinterpret_cast<datasketches::cpc_sketch*>(src.ptr);
+ BigIntVal estimate = sketch_ptr->get_estimate();
+ ctx->Free(src.ptr);
+ return (estimate == 0) ? BigIntVal::null() : estimate;
+}
+
+StringVal AggregateFunctions::DsCpcFinalizeSketch(
+ FunctionContext* ctx, const StringVal& src) {
+ DCHECK(!src.is_null);
+ DCHECK_EQ(src.len, sizeof(datasketches::cpc_sketch));
+ datasketches::cpc_sketch* sketch_ptr =
+ reinterpret_cast<datasketches::cpc_sketch*>(src.ptr);
+ StringVal result_str = StringVal::null();
+ if (sketch_ptr->get_estimate() > 0.0) {
+ result_str = SerializeDsCpcSketch(ctx, *sketch_ptr);
+ }
+ ctx->Free(src.ptr);
+ return result_str;
+}
+
void AggregateFunctions::DsThetaInit(FunctionContext* ctx, StringVal* dst) {
AllocBuffer(ctx, dst, sizeof(datasketches::update_theta_sketch));
if (UNLIKELY(dst->is_null)) {
@@ -3216,6 +3320,23 @@ template void AggregateFunctions::DsHllUpdate(
template void AggregateFunctions::DsHllUpdate(
FunctionContext*, const DateVal&, StringVal*);
+template void AggregateFunctions::DsCpcUpdate(
+ FunctionContext*, const BooleanVal&, StringVal*);
+template void AggregateFunctions::DsCpcUpdate(
+ FunctionContext*, const TinyIntVal&, StringVal*);
+template void AggregateFunctions::DsCpcUpdate(
+ FunctionContext*, const SmallIntVal&, StringVal*);
+template void AggregateFunctions::DsCpcUpdate(
+ FunctionContext*, const IntVal&, StringVal*);
+template void AggregateFunctions::DsCpcUpdate(
+ FunctionContext*, const BigIntVal&, StringVal*);
+template void AggregateFunctions::DsCpcUpdate(
+ FunctionContext*, const FloatVal&, StringVal*);
+template void AggregateFunctions::DsCpcUpdate(
+ FunctionContext*, const DoubleVal&, StringVal*);
+template void AggregateFunctions::DsCpcUpdate(
+ FunctionContext*, const DateVal&, StringVal*);
+
template void AggregateFunctions::DsThetaUpdate(
FunctionContext*, const BooleanVal&, StringVal*);
template void AggregateFunctions::DsThetaUpdate(
diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h
index 104eda8..ae95439 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -250,6 +250,15 @@ class AggregateFunctions {
static void DsHllUnionMerge(FunctionContext*, const StringVal& src, StringVal* dst);
static StringVal DsHllUnionFinalize(FunctionContext*, const StringVal& src);
+ /// These functions implement Apache DataSketches CPC support for sketching.
+ static void DsCpcInit(FunctionContext*, StringVal* slot);
+ template <typename T>
+ static void DsCpcUpdate(FunctionContext*, const T& src, StringVal* dst);
+ static StringVal DsCpcSerialize(FunctionContext*, const StringVal& src);
+ static void DsCpcMerge(FunctionContext*, const StringVal& src, StringVal* dst);
+ static BigIntVal DsCpcFinalize(FunctionContext*, const StringVal& src);
+ static StringVal DsCpcFinalizeSketch(FunctionContext*, const StringVal& src);
+
/// These functions implement Apache DataSketches Theta support for sketching.
static void DsThetaInit(FunctionContext*, StringVal* slot);
template <typename T>
diff --git a/be/src/exprs/datasketches-common.cc b/be/src/exprs/datasketches-common.cc
index 060277c..cfb5a68 100644
--- a/be/src/exprs/datasketches-common.cc
+++ b/be/src/exprs/datasketches-common.cc
@@ -19,12 +19,14 @@
#include "common/logging.h"
#include "udf/udf-internal.h"
+#include "thirdparty/datasketches/cpc_sketch.hpp"
#include "thirdparty/datasketches/kll_sketch.hpp"
#include "thirdparty/datasketches/theta_sketch.hpp"
namespace impala {
using datasketches::hll_sketch;
+using datasketches::cpc_sketch;
using datasketches::kll_sketch;
using datasketches::theta_sketch;
using datasketches::compact_theta_sketch;
@@ -71,6 +73,8 @@ bool DeserializeDsSketch(const StringVal& serialized_sketch,
template bool DeserializeDsSketch(const StringVal& serialized_sketch,
hll_sketch* sketch);
template bool DeserializeDsSketch(const StringVal& serialized_sketch,
+ cpc_sketch* sketch);
+template bool DeserializeDsSketch(const StringVal& serialized_sketch,
kll_sketch<float>* sketch);
StringVal StringStreamToStringVal(FunctionContext* ctx, const stringstream& str_stream) {
diff --git a/be/src/exprs/datasketches-common.h b/be/src/exprs/datasketches-common.h
index 20a7863..7187399 100644
--- a/be/src/exprs/datasketches-common.h
+++ b/be/src/exprs/datasketches-common.h
@@ -41,6 +41,10 @@ const datasketches::target_hll_type DS_HLL_TYPE = datasketches::target_hll_type:
/// buckets equals 2^DS_SKETCH_CONFIG.
const int DS_SKETCH_CONFIG = 12;
+/// Similar to DS_SKETCH_CONFIG, the value must be between 4 and 21. Note that CPC is
+/// configured as 11 because it is comparable to an HLL sketch of 12.
+const int DS_CPC_SKETCH_CONFIG = 11;
+
/// 'kappa' is a number of standard deviations from the mean: 1, 2 or 3 (default 2).
const int DS_DEFAULT_KAPPA = 2;
diff --git a/be/src/exprs/datasketches-functions-ir.cc b/be/src/exprs/datasketches-functions-ir.cc
index 1bf3deb..c754d2f 100644
--- a/be/src/exprs/datasketches-functions-ir.cc
+++ b/be/src/exprs/datasketches-functions-ir.cc
@@ -20,6 +20,7 @@
#include "exprs/datasketches-common.h"
#include "gutil/strings/substitute.h"
#include "thirdparty/datasketches/hll.hpp"
+#include "thirdparty/datasketches/cpc_sketch.hpp"
#include "thirdparty/datasketches/theta_sketch.hpp"
#include "thirdparty/datasketches/theta_union.hpp"
#include "thirdparty/datasketches/theta_intersection.hpp"
@@ -108,6 +109,17 @@ StringVal DataSketchesFunctions::DsHllStringify(FunctionContext* ctx,
return dst;
}
+BigIntVal DataSketchesFunctions::DsCpcEstimate(
+ FunctionContext* ctx, const StringVal& serialized_sketch) {
+ if (serialized_sketch.is_null || serialized_sketch.len == 0) return BigIntVal::null();
+ datasketches::cpc_sketch sketch(DS_CPC_SKETCH_CONFIG);
+ if (!DeserializeDsSketch(serialized_sketch, &sketch)) {
+ LogSketchDeserializationError(ctx);
+ return BigIntVal::null();
+ }
+ return sketch.get_estimate();
+}
+
BigIntVal DataSketchesFunctions::DsThetaEstimate(
FunctionContext* ctx, const StringVal& serialized_sketch) {
if (serialized_sketch.is_null || serialized_sketch.len == 0) return 0;
diff --git a/be/src/exprs/datasketches-functions.h b/be/src/exprs/datasketches-functions.h
index 7013945..1218c19 100644
--- a/be/src/exprs/datasketches-functions.h
+++ b/be/src/exprs/datasketches-functions.h
@@ -64,6 +64,12 @@ public:
static StringVal DsHllStringify(FunctionContext* ctx,
const StringVal& serialized_sketch);
+ /// 'serialized_sketch' is expected as a serialized Apache DataSketches CPC sketch. If
+ /// it is not, then the query fails. Otherwise, returns the count(distinct) estimate
+ /// from the sketch.
+ static BigIntVal DsCpcEstimate(
+ FunctionContext* ctx, const StringVal& serialized_sketch);
+
/// 'serialized_sketch' is expected as a serialized Apache DataSketches Theta sketch.
/// If it is not, then the query fails. Otherwise, returns the count(distinct) estimate
/// from the sketch.
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index fde4c71..ffd2f61 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -1003,6 +1003,8 @@ visible_functions = [
'_ZN6impala21DataSketchesFunctions11DsHllUnionFEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
[['ds_hll_stringify'], 'STRING', ['STRING'],
'_ZN6impala21DataSketchesFunctions14DsHllStringifyEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
+ [['ds_cpc_estimate'], 'BIGINT', ['STRING'],
+ '_ZN6impala21DataSketchesFunctions13DsCpcEstimateEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
[['ds_theta_estimate'], 'BIGINT', ['STRING'],
'_ZN6impala21DataSketchesFunctions15DsThetaEstimateEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
[['ds_theta_exclude'], 'STRING', ['STRING', 'STRING'],
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index fa76cf1..38576ad 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -400,6 +400,22 @@ public class BuiltinsDb extends Db {
"11DsHllUpdateIN10impala_udf9StringValEEEvPNS2_15FunctionContextERKT_PS3_")
.build();
+ private static final Map<Type, String> DS_CPC_UPDATE_SYMBOL =
+ ImmutableMap.<Type, String>builder()
+ .put(Type.TINYINT,
+ "11DsCpcUpdateIN10impala_udf10TinyIntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+ .put(Type.INT,
+ "11DsCpcUpdateIN10impala_udf6IntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+ .put(Type.BIGINT,
+ "11DsCpcUpdateIN10impala_udf9BigIntValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+ .put(Type.FLOAT,
+ "11DsCpcUpdateIN10impala_udf8FloatValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+ .put(Type.DOUBLE,
+ "11DsCpcUpdateIN10impala_udf9DoubleValEEEvPNS2_15FunctionContextERKT_PNS2_9StringValE")
+ .put(Type.STRING,
+ "11DsCpcUpdateIN10impala_udf9StringValEEEvPNS2_15FunctionContextERKT_PS3_")
+ .build();
+
private static final Map<Type, String> DS_THETA_UPDATE_SYMBOL =
ImmutableMap.<Type, String>builder()
.put(Type.TINYINT,
@@ -1101,6 +1117,33 @@ public class BuiltinsDb extends Db {
Lists.newArrayList(t), Type.STRING, Type.STRING));
}
+ // DataSketches CPC
+ if (DS_CPC_UPDATE_SYMBOL.containsKey(t)) {
+ db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_cpc_sketch_and_estimate",
+ Lists.newArrayList(t), Type.BIGINT, Type.STRING,
+ prefix + "9DsCpcInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+ prefix + DS_CPC_UPDATE_SYMBOL.get(t),
+ prefix + "10DsCpcMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+ prefix + "14DsCpcSerializeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ prefix + "13DsCpcFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ true, false, true));
+
+ db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_cpc_sketch",
+ Lists.newArrayList(t), Type.STRING, Type.STRING,
+ prefix + "9DsCpcInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+ prefix + DS_CPC_UPDATE_SYMBOL.get(t),
+ prefix + "10DsCpcMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+ prefix + "14DsCpcSerializeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ prefix + "19DsCpcFinalizeSketchEPN10impala_udf15FunctionContextERKNS1_" +
+ "9StringValE", true, false, true));
+ } else {
+ db.addBuiltin(AggregateFunction.createUnsupportedBuiltin(db,
+ "ds_cpc_sketch_and_estimate", Lists.newArrayList(t), Type.STRING,
+ Type.STRING));
+ db.addBuiltin(AggregateFunction.createUnsupportedBuiltin(db, "ds_cpc_sketch",
+ Lists.newArrayList(t), Type.STRING, Type.STRING));
+ }
+
// DataSketches Theta
if (DS_THETA_UPDATE_SYMBOL.containsKey(t)) {
db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_theta_sketch_and_estimate",
diff --git a/testdata/data/README b/testdata/data/README
index 1663af9..74e5a4b 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -509,6 +509,12 @@ hll_sketches_from_impala.parquet:
This holds the same sketches as hll_sketches_from_hive.parquet but these sketches were
created by Impala instead of Hive.
+cpc_sketches_from_hive.parquet:
+This file contains a table that has some string columns to store serialized Apache
+DataSketches CPC sketches created by Hive. Each column contains a sketch for a
+specific data type. Covers the following types: TINYINT, INT, BIGINT, FLOAT, DOUBLE,
+STRING, CHAR and VARCHAR. Has an additional column for NULL values.
+
theta_sketches_from_hive.parquet:
This file contains a table that has some string columns to store serialized Apache
DataSketches Theta sketches created by Hive. Each column contains a sketch for a
diff --git a/testdata/data/cpc_sketches_from_hive.parquet b/testdata/data/cpc_sketches_from_hive.parquet
new file mode 100644
index 0000000..4e31cee
Binary files /dev/null and b/testdata/data/cpc_sketches_from_hive.parquet differ
diff --git a/testdata/workloads/functional-query/queries/QueryTest/datasketches-cpc.test b/testdata/workloads/functional-query/queries/QueryTest/datasketches-cpc.test
new file mode 100644
index 0000000..2f6db47
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/datasketches-cpc.test
@@ -0,0 +1,187 @@
+====
+---- QUERY
+# Use a small table for testing Datasketches CPC functions through Impala to make sure
+# that these approximate functions give the correct result. For testing Impala
+# functionality no need to test how Datasketches CPC approximates count distint values
+# so a small table is enough.
+select
+ ds_cpc_estimate(ds_cpc_sketch(tinyint_col)),
+ ds_cpc_estimate(ds_cpc_sketch(int_col)),
+ ds_cpc_estimate(ds_cpc_sketch(bigint_col)),
+ ds_cpc_estimate(ds_cpc_sketch(float_col)),
+ ds_cpc_estimate(ds_cpc_sketch(double_col)),
+ ds_cpc_estimate(ds_cpc_sketch(string_col))
+from functional_parquet.alltypessmall
+---- RESULTS
+10,10,10,10,10,10
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+select
+ ds_cpc_sketch_and_estimate(tinyint_col),
+ ds_cpc_sketch_and_estimate(int_col),
+ ds_cpc_sketch_and_estimate(bigint_col),
+ ds_cpc_sketch_and_estimate(float_col),
+ ds_cpc_sketch_and_estimate(double_col),
+ ds_cpc_sketch_and_estimate(string_col)
+from functional_parquet.alltypessmall
+---- RESULTS
+10,10,10,10,10,10
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+# Check that unsupported types give error with ds_cpc_sketch().
+select ds_cpc_sketch(bool_col) from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_cpc_sketch(BOOLEAN)
+====
+---- QUERY
+select ds_cpc_sketch(smallint_col) from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_cpc_sketch(SMALLINT)
+====
+---- QUERY
+select ds_cpc_sketch(cast(date_string_col as date format 'MM/DD/YYYY'))
+from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_cpc_sketch(DATE)
+====
+---- QUERY
+select ds_cpc_sketch(d1) from functional_parquet.decimal_tbl;
+---- CATCH
+AnalysisException: No matching function with signature: ds_cpc_sketch(DECIMAL(9,0))
+====
+---- QUERY
+# Check that unsupported types give error with ds_cpc_sketch_and_estimate().
+select ds_cpc_sketch_and_estimate(bool_col) from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_cpc_sketch_and_estimate(BOOLEAN)
+====
+---- QUERY
+select ds_cpc_sketch_and_estimate(smallint_col) from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_cpc_sketch_and_estimate(SMALLINT)
+====
+---- QUERY
+select ds_cpc_sketch_and_estimate(cast(date_string_col as date format 'MM/DD/YYYY'))
+from functional_parquet.alltypessmall;
+---- CATCH
+AnalysisException: No matching function with signature: ds_cpc_sketch_and_estimate(DATE)
+====
+---- QUERY
+select ds_cpc_sketch_and_estimate(d1) from functional_parquet.decimal_tbl;
+---- CATCH
+AnalysisException: No matching function with signature: ds_cpc_sketch_and_estimate(DECIMAL(9,0))
+====
+---- QUERY
+# Check if CPC works with null values.
+select
+ ds_cpc_estimate(ds_cpc_sketch(null_str)),
+ ds_cpc_estimate(ds_cpc_sketch(null_int)),
+ ds_cpc_estimate(ds_cpc_sketch(null_double)),
+ ds_cpc_estimate(ds_cpc_sketch(some_nulls)),
+ ds_cpc_sketch_and_estimate(null_str),
+ ds_cpc_sketch_and_estimate(null_int),
+ ds_cpc_sketch_and_estimate(null_double),
+ ds_cpc_sketch_and_estimate(some_nulls)
+from functional_parquet.nullrows;
+---- RESULTS
+NULL,NULL,NULL,6,NULL,NULL,NULL,6
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+# Check if CPC works for empty datasets.
+select
+ ds_cpc_estimate(ds_cpc_sketch(field)),
+ ds_cpc_estimate(ds_cpc_sketch(f2)),
+ ds_cpc_sketch_and_estimate(field),
+ ds_cpc_sketch_and_estimate(f2)
+from functional_parquet.emptytable;
+---- RESULTS
+NULL,NULL,NULL,NULL
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT
+====
+---- QUERY
+# Write sketches to a table as string and get an estimate from the written sketch.
+# Note, the plan is to write sketches as binary instead of strings. For this we have to
+# wait for the binary support (IMPALA-9482).
+create table cpc_sketch_store
+ (year int, month int, date_sketch string, float_sketch string)
+stored as parquet;
+insert into cpc_sketch_store
+ select
+ year,
+ month,
+ ds_cpc_sketch(date_string_col),
+ ds_cpc_sketch(float_col)
+ from functional_parquet.alltypessmall
+ group by year, month;
+select
+ year,
+ month,
+ ds_cpc_estimate(date_sketch),
+ ds_cpc_estimate(float_sketch)
+from cpc_sketch_store order by month;
+---- RESULTS
+2009,1,3,10
+2009,2,3,10
+2009,3,3,10
+2009,4,3,10
+---- TYPES
+INT,INT,BIGINT,BIGINT
+====
+---- QUERY
+# Check that ds_cpc_estimate returns error for strings that are not serialized sketches.
+select ds_cpc_estimate(date_string_col) from functional_parquet.alltypestiny;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch.
+====
+---- QUERY
+# Check that ds_cpc_estimate returns null for null and empty string inputs.
+select ds_cpc_estimate(b), ds_cpc_estimate(c) from functional_parquet.nulltable;
+---- RESULTS
+NULL,NULL
+---- TYPES
+BIGINT,BIGINT
+====
+---- QUERY
+# Check that sketches made by Hive can be read and used for estimating by Impala.
+select
+ ds_cpc_estimate(ti) as ti,
+ ds_cpc_estimate(i) as i,
+ ds_cpc_estimate(bi) as bi,
+ ds_cpc_estimate(f) as f,
+ ds_cpc_estimate(d) as d,
+ ds_cpc_estimate(s) as s,
+ ds_cpc_estimate(c) as c,
+ ds_cpc_estimate(v) as v,
+ ds_cpc_estimate(nc) as nc
+from cpc_sketches_from_hive;
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+5,7,6,6,7,4,4,3,NULL
+====
+---- QUERY
+# Check if CPC works with empty strings.
+create table empty_string (s string, v varchar(1), c char(1));
+insert into empty_string values
+ ("", cast("" as varchar(1)), cast("" as char(1))),
+ ("a", cast("a" as varchar(1)), cast("a" as char(1))),
+ ("", cast("" as varchar(1)), cast("" as char(1))),
+ ("b", cast("b" as varchar(1)), cast("b" as char(1))),
+ ("b", cast("b" as varchar(1)), cast("b" as char(1)));
+select
+ ds_cpc_estimate(ds_cpc_sketch(s)),
+ ds_cpc_estimate(ds_cpc_sketch(v)),
+ ds_cpc_estimate(ds_cpc_sketch(c))
+from empty_string
+---- RESULTS
+2,2,3
+---- TYPES
+BIGINT,BIGINT,BIGINT
+====
diff --git a/tests/query_test/test_datasketches.py b/tests/query_test/test_datasketches.py
index ce80c12..117045f 100644
--- a/tests/query_test/test_datasketches.py
+++ b/tests/query_test/test_datasketches.py
@@ -37,6 +37,10 @@ class TestDatasketches(ImpalaTestSuite):
create_table_from_parquet(self.client, unique_database, 'hll_sketches_from_impala')
self.run_test_case('QueryTest/datasketches-hll', vector, unique_database)
+ def test_cpc(self, vector, unique_database):
+ create_table_from_parquet(self.client, unique_database, 'cpc_sketches_from_hive')
+ self.run_test_case('QueryTest/datasketches-cpc', vector, unique_database)
+
def test_theta(self, vector, unique_database):
create_table_from_parquet(self.client, unique_database, 'theta_sketches_from_hive')
create_table_from_parquet(self.client, unique_database, 'theta_sketches_from_impala')