You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/03/13 02:24:27 UTC
[impala] 03/08: IMPALA-10520: Implement ds_theta_intersect()
function
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 0d22e89df452d3e21912b60c72a087a3e8c38057
Author: Fucun Chu <ch...@hotmail.com>
AuthorDate: Thu Feb 4 15:24:25 2021 +0800
IMPALA-10520: Implement ds_theta_intersect() function
This function receives a set of serialized Apache DataSketches Theta
sketches produced by ds_theta_sketch() and intersects them into a
single sketch.
An example usage is to create a sketch for each partition of a table,
write these sketches to a separate table and intersect them to get
estimates based on the partitions the user is interested in related
sketches. E.g.:
SELECT
ds_theta_estimate(ds_theta_intersect(sketch_col))
FROM sketch_tbl
WHERE partition_col=1 OR partition_col=5;
Testing:
- Apart from the automated tests I added to this patch I also
tested ds_theta_intersect() on a bigger dataset to check that
serialization, deserialization and merging steps work well. I
took TPCH25.linelitem, created a number of sketches with grouping
by l_shipdate and called ds_theta_intersect() on those sketches
Change-Id: I80e68c2151c4604f0386d3dfb004c82b10293f97
Reviewed-on: http://gerrit.cloudera.org:8080/17088
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exprs/aggregate-functions-ir.cc | 87 ++++++++++++++++++++++
be/src/exprs/aggregate-functions.h | 9 +++
.../java/org/apache/impala/catalog/BuiltinsDb.java | 14 ++++
.../queries/QueryTest/datasketches-theta.test | 72 ++++++++++++++++++
4 files changed, 182 insertions(+)
diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 23ed6d3..f2a078a 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -42,6 +42,7 @@
#include "thirdparty/datasketches/hll.hpp"
#include "thirdparty/datasketches/theta_sketch.hpp"
#include "thirdparty/datasketches/theta_union.hpp"
+#include "thirdparty/datasketches/theta_intersection.hpp"
#include "thirdparty/datasketches/kll_sketch.hpp"
#include "util/arithmetic-util.h"
#include "util/mpfit-util.h"
@@ -1660,6 +1661,22 @@ StringVal SerializeDsThetaUnion(
return SerializeDsThetaSketch(ctx, sketch);
}
+/// Auxiliary function that receives a theta_intersection, gets the underlying Theta
+/// sketch from the intersection object and returns the serialized, compacted Theta sketch
+/// wrapped into StringVal (may be null).
+/// Introducing this function in the .cc to avoid including the whole DataSketches Theta
+/// functionality into the header.
+StringVal SerializeDsThetaIntersection(
+ FunctionContext* ctx, const datasketches::theta_intersection& ds_intersection) {
+ std::stringstream serialized_input;
+ // Calling get_result() before calling update() is undefined, so you need to check.
+ if (ds_intersection.has_result()) {
+ datasketches::compact_theta_sketch sketch = ds_intersection.get_result();
+ return SerializeDsThetaSketch(ctx, sketch);
+ }
+ return StringVal::null();
+}
+
/// Auxiliary function that receives a kll_sketch<float> and returns the serialized
/// version of it wrapped into a StringVal.
/// Introducing this function in the .cc to avoid including the whole DataSketches HLL
@@ -2030,6 +2047,76 @@ StringVal AggregateFunctions::DsThetaUnionFinalize(
return result;
}
+void AggregateFunctions::DsThetaIntersectInit(FunctionContext* ctx, StringVal* slot) {
+ AllocBuffer(ctx, slot, sizeof(datasketches::theta_intersection));
+ if (UNLIKELY(slot->is_null)) {
+ DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+ return;
+ }
+ datasketches::theta_intersection* intersection_ptr =
+ reinterpret_cast<datasketches::theta_intersection*>(slot->ptr);
+ *intersection_ptr = datasketches::theta_intersection();
+}
+
+void AggregateFunctions::DsThetaIntersectUpdate(
+ FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+ if (src.is_null) return;
+ DCHECK(!dst->is_null);
+ DCHECK_EQ(dst->len, sizeof(datasketches::theta_intersection));
+ try {
+ auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
+ if (src_sketch->is_empty()) return;
+ datasketches::theta_intersection* intersection_ptr =
+ reinterpret_cast<datasketches::theta_intersection*>(dst->ptr);
+ intersection_ptr->update(*src_sketch);
+ } catch (const std::exception&) {
+ LogSketchDeserializationError(ctx);
+ }
+}
+
+StringVal AggregateFunctions::DsThetaIntersectSerialize(
+ FunctionContext* ctx, const StringVal& src) {
+ DCHECK(!src.is_null);
+ DCHECK_EQ(src.len, sizeof(datasketches::theta_intersection));
+ datasketches::theta_intersection* intersection_ptr =
+ reinterpret_cast<datasketches::theta_intersection*>(src.ptr);
+ StringVal dst = SerializeDsThetaIntersection(ctx, *intersection_ptr);
+ ctx->Free(src.ptr);
+ return dst;
+}
+
+void AggregateFunctions::DsThetaIntersectMerge(
+ FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+ if (src.is_null) return;
+ DCHECK(!dst->is_null);
+ DCHECK_EQ(dst->len, sizeof(datasketches::theta_intersection));
+
+ // Note, 'src' is a serialized compact_theta_sketch and not a serialized
+ // theta_intersection.
+ auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
+
+ datasketches::theta_intersection* dst_intersection_ptr =
+ reinterpret_cast<datasketches::theta_intersection*>(dst->ptr);
+
+ dst_intersection_ptr->update(*src_sketch);
+}
+
+StringVal AggregateFunctions::DsThetaIntersectFinalize(
+ FunctionContext* ctx, const StringVal& src) {
+ DCHECK(!src.is_null);
+ DCHECK_EQ(src.len, sizeof(datasketches::theta_intersection));
+ datasketches::theta_intersection* intersection_ptr =
+ reinterpret_cast<datasketches::theta_intersection*>(src.ptr);
+ if (!intersection_ptr->has_result()) {
+ ctx->Free(src.ptr);
+ return StringVal::null();
+ }
+ auto sketch = intersection_ptr->get_result();
+ StringVal result = SerializeDsThetaSketch(ctx, sketch);
+ ctx->Free(src.ptr);
+ return result;
+}
+
void AggregateFunctions::DsKllInitHelper(FunctionContext* ctx, StringVal* slot) {
AllocBuffer(ctx, slot, sizeof(datasketches::kll_sketch<float>));
if (UNLIKELY(slot->is_null)) {
diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h
index 474e23d..104eda8 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -266,6 +266,15 @@ class AggregateFunctions {
static void DsThetaUnionMerge(FunctionContext*, const StringVal& src, StringVal* dst);
static StringVal DsThetaUnionFinalize(FunctionContext*, const StringVal& src);
+ /// These functions implement ds_theta_intersect().
+ static void DsThetaIntersectInit(FunctionContext*, StringVal* slot);
+ static void DsThetaIntersectUpdate(
+ FunctionContext*, const StringVal& src, StringVal* dst);
+ static StringVal DsThetaIntersectSerialize(FunctionContext*, const StringVal& src);
+ static void DsThetaIntersectMerge(
+ FunctionContext*, const StringVal& src, StringVal* dst);
+ static StringVal DsThetaIntersectFinalize(FunctionContext*, const StringVal& src);
+
/// These functions implement Apache DataSketches KLL support for sketching.
static void DsKllInit(FunctionContext*, StringVal* slot);
static void DsKllUpdate(FunctionContext*, const FloatVal& src, StringVal* dst);
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 6a1a609..fa76cf1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -1434,6 +1434,20 @@ public class BuiltinsDb extends Db {
"20DsThetaUnionFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
true, false, true));
+ // DataSketches Theta intersect
+ db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_theta_intersect",
+ Lists.<Type>newArrayList(Type.STRING), Type.STRING, Type.STRING,
+ prefix + "20DsThetaIntersectInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+ prefix +
+ "22DsThetaIntersectUpdateEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+ prefix +
+ "21DsThetaIntersectMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+ prefix +
+ "25DsThetaIntersectSerializeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ prefix +
+ "24DsThetaIntersectFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+ true, false, true));
+
for (Type t: Type.getSupportedTypes()) {
if (t.isNull()) continue; // NULL is handled through type promotion.
if (t.isScalarType(PrimitiveType.CHAR)) continue; // promoted to STRING
diff --git a/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test b/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
index 84ae7f9..fe7da72 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
@@ -233,3 +233,75 @@ BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
---- RESULTS
5,7,6,6,7,4,4,3,0
====
+---- QUERY
+# Intersect the sketches in sketch_store and check.
+select
+ ds_theta_estimate(ds_theta_intersect(date_sketch)),
+ ds_theta_estimate(ds_theta_intersect(float_sketch))
+from sketch_store;
+---- TYPES
+BIGINT,BIGINT
+---- RESULTS
+0,10
+====
+---- QUERY
+# Checks that ds_theta_intersect() produces NULL for an empty dataset.
+select ds_theta_intersect(field) from functional_parquet.emptytable;
+---- TYPES
+STRING
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# Checks that ds_theta_intersect() produces NULL for NULL inputs.
+select ds_theta_intersect(null_str) from functional_parquet.nullrows;
+---- TYPES
+STRING
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# ds_theta_intersect() returns an error if it receives an invalid serialized sketch.
+select ds_theta_intersect(date_string_col) from functional_parquet.alltypestiny
+where id=1;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch.
+====
+---- QUERY
+# Check that sketches made by Hive for intersection and estimation, which is consistent
+# with direct estimation of these sketches.
+select
+ ds_theta_estimate(ds_theta_intersect(ti)) as ti,
+ ds_theta_estimate(ds_theta_intersect(i)) as i,
+ ds_theta_estimate(ds_theta_intersect(bi)) as bi,
+ ds_theta_estimate(ds_theta_intersect(f)) as f,
+ ds_theta_estimate(ds_theta_intersect(d)) as d,
+ ds_theta_estimate(ds_theta_intersect(s)) as s,
+ ds_theta_estimate(ds_theta_intersect(c)) as c,
+ ds_theta_estimate(ds_theta_intersect(v)) as v,
+ ds_theta_estimate(ds_theta_intersect(nc)) as nc
+from theta_sketches_from_hive;
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+5,7,6,6,7,4,4,3,0
+====
+---- QUERY
+# Intersect the sketches from theta_sketches_impala_hive and checks if the intersection
+# produces the same result as if these sketches were used separately to get the estimates.
+select
+ ds_theta_estimate(ds_theta_intersect(ti)) as ti,
+ ds_theta_estimate(ds_theta_intersect(i)) as i,
+ ds_theta_estimate(ds_theta_intersect(bi)) as bi,
+ ds_theta_estimate(ds_theta_intersect(f)) as f,
+ ds_theta_estimate(ds_theta_intersect(d)) as d,
+ ds_theta_estimate(ds_theta_intersect(s)) as s,
+ ds_theta_estimate(ds_theta_intersect(c)) as c,
+ ds_theta_estimate(ds_theta_intersect(v)) as v,
+ ds_theta_estimate(ds_theta_intersect(nc)) as nc
+from theta_sketches_impala_hive;
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+5,7,6,6,7,4,4,3,0
+====
\ No newline at end of file