You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by la...@apache.org on 2021/04/08 12:01:55 UTC
[impala] 01/02: IMPALA-10632: Update the Theta sketch serialization
interface
This is an automated email from the ASF dual-hosted git repository.
laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit ed0faaffb79557702b0ef0b952806bb632b62188
Author: Fucun Chu <ch...@hotmail.com>
AuthorDate: Fri Mar 19 19:57:01 2021 +0800
IMPALA-10632: Update the Theta sketch serialization interface
DataSketches 3.0.0 removes the serialization of Update Theta sketch,
and uses Compact Theta sketch to serialize for backward compatibility.
tests:
-Ran the tests from tests/query_test/test_datasketches.py
Change-Id: I80470863097a4836ee07fe44babaef0c852f3051
Reviewed-on: http://gerrit.cloudera.org:8080/17261
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exprs/aggregate-functions-ir.cc | 58 ++++++++++++++++++-------------
be/src/exprs/datasketches-common.cc | 16 +++++----
be/src/exprs/datasketches-functions-ir.cc | 16 +++------
3 files changed, 48 insertions(+), 42 deletions(-)
diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index f2a078a..e52b69d 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -1645,7 +1645,7 @@ StringVal SerializeDsHllUnion(FunctionContext* ctx,
/// Introducing this function in the .cc to avoid including the whole DataSketches Theta
/// functionality into the header.
StringVal SerializeDsThetaSketch(
- FunctionContext* ctx, const datasketches::theta_sketch& sketch) {
+ FunctionContext* ctx, const datasketches::compact_theta_sketch& sketch) {
std::stringstream serialized_input(std::ios::in | std::ios::out | std::ios::binary);
sketch.serialize(serialized_input);
return StringStreamToStringVal(ctx, serialized_input);
@@ -1898,8 +1898,8 @@ StringVal AggregateFunctions::DsThetaSerialize(
|| src.len == sizeof(datasketches::theta_union));
StringVal dst;
if (src.len == sizeof(datasketches::update_theta_sketch)) {
- auto sketch_ptr = reinterpret_cast<datasketches::theta_sketch*>(src.ptr);
- dst = SerializeDsThetaSketch(ctx, *sketch_ptr);
+ auto sketch_ptr = reinterpret_cast<datasketches::update_theta_sketch*>(src.ptr);
+ dst = SerializeDsThetaSketch(ctx, sketch_ptr->compact());
} else {
auto union_ptr = reinterpret_cast<datasketches::theta_union*>(src.ptr);
dst = SerializeDsThetaUnion(ctx, *union_ptr);
@@ -1915,9 +1915,12 @@ void AggregateFunctions::DsThetaMerge(
DCHECK(dst->len == sizeof(datasketches::update_theta_sketch)
or dst->len == sizeof(datasketches::theta_union));
- // Note, 'src' is a serialized theta_sketch.
- auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
- if (src_sketch->is_empty()) return;
+ // Note, 'src' is a serialized compact_theta_sketch.
+ std::unique_ptr<datasketches::compact_theta_sketch> src_sketch;
+ if (!DeserializeDsSketch(src, &src_sketch)) {
+ LogSketchDeserializationError(ctx);
+ return;
+ }
if (dst->len == sizeof(datasketches::theta_union)) {
auto dst_union_ptr = reinterpret_cast<datasketches::theta_union*>(dst->ptr);
@@ -1951,7 +1954,7 @@ BigIntVal AggregateFunctions::DsThetaFinalize(
or src.len == sizeof(datasketches::theta_union));
BigIntVal estimate;
if (src.len == sizeof(datasketches::update_theta_sketch)) {
- auto sketch_ptr = reinterpret_cast<datasketches::theta_sketch*>(src.ptr);
+ auto sketch_ptr = reinterpret_cast<datasketches::update_theta_sketch*>(src.ptr);
estimate = sketch_ptr->get_estimate();
} else {
auto union_ptr = reinterpret_cast<datasketches::theta_union*>(src.ptr);
@@ -1968,8 +1971,8 @@ StringVal AggregateFunctions::DsThetaFinalizeSketch(
or src.len == sizeof(datasketches::theta_union));
StringVal result;
if (src.len == sizeof(datasketches::update_theta_sketch)) {
- auto sketch_ptr = reinterpret_cast<datasketches::theta_sketch*>(src.ptr);
- result = SerializeDsThetaSketch(ctx, *sketch_ptr);
+ auto sketch_ptr = reinterpret_cast<datasketches::update_theta_sketch*>(src.ptr);
+ result = SerializeDsThetaSketch(ctx, sketch_ptr->compact());
} else {
auto union_ptr = reinterpret_cast<datasketches::theta_union*>(src.ptr);
result = SerializeDsThetaUnion(ctx, *union_ptr);
@@ -1995,14 +1998,14 @@ void AggregateFunctions::DsThetaUnionUpdate(
if (src.is_null) return;
DCHECK(!dst->is_null);
DCHECK_EQ(dst->len, sizeof(datasketches::theta_union));
- try {
- auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
- datasketches::theta_union* union_ptr =
- reinterpret_cast<datasketches::theta_union*>(dst->ptr);
- union_ptr->update(*src_sketch);
- } catch (const std::exception&) {
+ std::unique_ptr<datasketches::compact_theta_sketch> src_sketch;
+ if (!DeserializeDsSketch(src, &src_sketch)) {
LogSketchDeserializationError(ctx);
+ return;
}
+ datasketches::theta_union* union_ptr =
+ reinterpret_cast<datasketches::theta_union*>(dst->ptr);
+ union_ptr->update(*src_sketch);
}
StringVal AggregateFunctions::DsThetaUnionSerialize(
@@ -2023,7 +2026,11 @@ void AggregateFunctions::DsThetaUnionMerge(
DCHECK_EQ(dst->len, sizeof(datasketches::theta_union));
// Note, 'src' is a serialized compact_theta_sketch and not a serialized theta_union.
- auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
+ std::unique_ptr<datasketches::compact_theta_sketch> src_sketch;
+ if (!DeserializeDsSketch(src, &src_sketch)) {
+ LogSketchDeserializationError(ctx);
+ return;
+ }
datasketches::theta_union* dst_union_ptr =
reinterpret_cast<datasketches::theta_union*>(dst->ptr);
@@ -2063,15 +2070,14 @@ void AggregateFunctions::DsThetaIntersectUpdate(
if (src.is_null) return;
DCHECK(!dst->is_null);
DCHECK_EQ(dst->len, sizeof(datasketches::theta_intersection));
- try {
- auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
- if (src_sketch->is_empty()) return;
- datasketches::theta_intersection* intersection_ptr =
- reinterpret_cast<datasketches::theta_intersection*>(dst->ptr);
- intersection_ptr->update(*src_sketch);
- } catch (const std::exception&) {
+ std::unique_ptr<datasketches::compact_theta_sketch> src_sketch;
+ if (!DeserializeDsSketch(src, &src_sketch)) {
LogSketchDeserializationError(ctx);
+ return;
}
+ datasketches::theta_intersection* intersection_ptr =
+ reinterpret_cast<datasketches::theta_intersection*>(dst->ptr);
+ intersection_ptr->update(*src_sketch);
}
StringVal AggregateFunctions::DsThetaIntersectSerialize(
@@ -2093,7 +2099,11 @@ void AggregateFunctions::DsThetaIntersectMerge(
// Note, 'src' is a serialized compact_theta_sketch and not a serialized
// theta_intersection.
- auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
+ std::unique_ptr<datasketches::compact_theta_sketch> src_sketch;
+ if (!DeserializeDsSketch(src, &src_sketch)) {
+ LogSketchDeserializationError(ctx);
+ return;
+ }
datasketches::theta_intersection* dst_intersection_ptr =
reinterpret_cast<datasketches::theta_intersection*>(dst->ptr);
diff --git a/be/src/exprs/datasketches-common.cc b/be/src/exprs/datasketches-common.cc
index e76ad83..060277c 100644
--- a/be/src/exprs/datasketches-common.cc
+++ b/be/src/exprs/datasketches-common.cc
@@ -27,6 +27,7 @@ namespace impala {
using datasketches::hll_sketch;
using datasketches::kll_sketch;
using datasketches::theta_sketch;
+using datasketches::compact_theta_sketch;
using impala_udf::StringVal;
using std::stringstream;
using std::vector;
@@ -51,13 +52,14 @@ bool DeserializeDsSketch(const StringVal& serialized_sketch, T* sketch) {
// This is a specialization of the template DeserializeDsSketch() for theta sketches.
template <>
-bool DeserializeDsSketch(
- const StringVal& serialized_sketch, theta_sketch::unique_ptr* sketch) {
- DCHECK(sketch != nullptr);
+bool DeserializeDsSketch(const StringVal& serialized_sketch,
+ std::unique_ptr<compact_theta_sketch>* sketch_ptr) {
+ DCHECK(sketch_ptr->get() == nullptr);
if (serialized_sketch.is_null || serialized_sketch.len == 0) return false;
try {
- *sketch =
- theta_sketch::deserialize((void*)serialized_sketch.ptr, serialized_sketch.len);
+ auto sketch = compact_theta_sketch::deserialize(
+ (void*)serialized_sketch.ptr, serialized_sketch.len);
+ *sketch_ptr = std::make_unique<compact_theta_sketch>(sketch);
return true;
} catch (const std::exception&) {
// One reason of throwing from deserialization is that the input string is not a
@@ -81,7 +83,7 @@ StringVal StringStreamToStringVal(FunctionContext* ctx, const stringstream& str_
bool update_sketch_to_theta_union(FunctionContext* ctx,
const StringVal& serialized_sketch, datasketches::theta_union& sketch) {
if (!serialized_sketch.is_null && serialized_sketch.len > 0) {
- datasketches::theta_sketch::unique_ptr sketch_ptr;
+ std::unique_ptr<datasketches::compact_theta_sketch> sketch_ptr;
if (!DeserializeDsSketch(serialized_sketch, &sketch_ptr)) {
LogSketchDeserializationError(ctx);
return false;
@@ -94,7 +96,7 @@ bool update_sketch_to_theta_union(FunctionContext* ctx,
bool update_sketch_to_theta_intersection(FunctionContext* ctx,
const StringVal& serialized_sketch, datasketches::theta_intersection& sketch) {
if (!serialized_sketch.is_null && serialized_sketch.len > 0) {
- datasketches::theta_sketch::unique_ptr sketch_ptr;
+ std::unique_ptr<datasketches::compact_theta_sketch> sketch_ptr;
if (!DeserializeDsSketch(serialized_sketch, &sketch_ptr)) {
LogSketchDeserializationError(ctx);
return false;
diff --git a/be/src/exprs/datasketches-functions-ir.cc b/be/src/exprs/datasketches-functions-ir.cc
index a7e7b74..1bf3deb 100644
--- a/be/src/exprs/datasketches-functions-ir.cc
+++ b/be/src/exprs/datasketches-functions-ir.cc
@@ -111,32 +111,26 @@ StringVal DataSketchesFunctions::DsHllStringify(FunctionContext* ctx,
BigIntVal DataSketchesFunctions::DsThetaEstimate(
FunctionContext* ctx, const StringVal& serialized_sketch) {
if (serialized_sketch.is_null || serialized_sketch.len == 0) return 0;
- try {
- // serialized_sketch may be a serialized of update_theta_sketch or
- // compact_theta_sketch
- auto sketch = datasketches::theta_sketch::deserialize(
- (void*)serialized_sketch.ptr, serialized_sketch.len);
- return sketch->get_estimate();
- } catch (const std::exception&) {
- // One reason of throwing from deserialization is that the input string is not a
- // serialized sketch.
+ std::unique_ptr<datasketches::compact_theta_sketch> sketch;
+ if (!DeserializeDsSketch(serialized_sketch, &sketch)) {
LogSketchDeserializationError(ctx);
return BigIntVal::null();
}
+ return sketch->get_estimate();
}
StringVal DataSketchesFunctions::DsThetaExclude(FunctionContext* ctx,
const StringVal& first_serialized_sketch, const StringVal& second_serialized_sketch) {
datasketches::theta_a_not_b a_not_b;
// Deserialize two sketches
- datasketches::theta_sketch::unique_ptr first_sketch_ptr;
+ std::unique_ptr<datasketches::compact_theta_sketch> first_sketch_ptr;
if (!first_serialized_sketch.is_null && first_serialized_sketch.len > 0) {
if (!DeserializeDsSketch(first_serialized_sketch, &first_sketch_ptr)) {
LogSketchDeserializationError(ctx);
return StringVal::null();
}
}
- datasketches::theta_sketch::unique_ptr second_sketch_ptr;
+ std::unique_ptr<datasketches::compact_theta_sketch> second_sketch_ptr;
if (!second_serialized_sketch.is_null && second_serialized_sketch.len > 0) {
if (!DeserializeDsSketch(second_serialized_sketch, &second_sketch_ptr)) {
LogSketchDeserializationError(ctx);