You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by la...@apache.org on 2021/04/08 12:01:55 UTC

[impala] 01/02: IMPALA-10632: Update the Theta sketch serialization interface

This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ed0faaffb79557702b0ef0b952806bb632b62188
Author: Fucun Chu <ch...@hotmail.com>
AuthorDate: Fri Mar 19 19:57:01 2021 +0800

    IMPALA-10632: Update the Theta sketch serialization interface
    
    DataSketches 3.0.0 removes the serialization of Update Theta sketch,
    and uses Compact Theta sketch to serialize for backward compatibility.
    
    tests:
     -Ran the tests from tests/query_test/test_datasketches.py
    
    Change-Id: I80470863097a4836ee07fe44babaef0c852f3051
    Reviewed-on: http://gerrit.cloudera.org:8080/17261
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/aggregate-functions-ir.cc    | 58 ++++++++++++++++++-------------
 be/src/exprs/datasketches-common.cc       | 16 +++++----
 be/src/exprs/datasketches-functions-ir.cc | 16 +++------
 3 files changed, 48 insertions(+), 42 deletions(-)

diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index f2a078a..e52b69d 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -1645,7 +1645,7 @@ StringVal SerializeDsHllUnion(FunctionContext* ctx,
 /// Introducing this function in the .cc to avoid including the whole DataSketches Theta
 /// functionality into the header.
 StringVal SerializeDsThetaSketch(
-    FunctionContext* ctx, const datasketches::theta_sketch& sketch) {
+    FunctionContext* ctx, const datasketches::compact_theta_sketch& sketch) {
   std::stringstream serialized_input(std::ios::in | std::ios::out | std::ios::binary);
   sketch.serialize(serialized_input);
   return StringStreamToStringVal(ctx, serialized_input);
@@ -1898,8 +1898,8 @@ StringVal AggregateFunctions::DsThetaSerialize(
       || src.len == sizeof(datasketches::theta_union));
   StringVal dst;
   if (src.len == sizeof(datasketches::update_theta_sketch)) {
-    auto sketch_ptr = reinterpret_cast<datasketches::theta_sketch*>(src.ptr);
-    dst = SerializeDsThetaSketch(ctx, *sketch_ptr);
+    auto sketch_ptr = reinterpret_cast<datasketches::update_theta_sketch*>(src.ptr);
+    dst = SerializeDsThetaSketch(ctx, sketch_ptr->compact());
   } else {
     auto union_ptr = reinterpret_cast<datasketches::theta_union*>(src.ptr);
     dst = SerializeDsThetaUnion(ctx, *union_ptr);
@@ -1915,9 +1915,12 @@ void AggregateFunctions::DsThetaMerge(
   DCHECK(dst->len == sizeof(datasketches::update_theta_sketch)
       or dst->len == sizeof(datasketches::theta_union));
 
-  // Note, 'src' is a serialized theta_sketch.
-  auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
-  if (src_sketch->is_empty()) return;
+  // Note, 'src' is a serialized compact_theta_sketch.
+  std::unique_ptr<datasketches::compact_theta_sketch> src_sketch;
+  if (!DeserializeDsSketch(src, &src_sketch)) {
+    LogSketchDeserializationError(ctx);
+    return;
+  }
 
   if (dst->len == sizeof(datasketches::theta_union)) {
     auto dst_union_ptr = reinterpret_cast<datasketches::theta_union*>(dst->ptr);
@@ -1951,7 +1954,7 @@ BigIntVal AggregateFunctions::DsThetaFinalize(
       or src.len == sizeof(datasketches::theta_union));
   BigIntVal estimate;
   if (src.len == sizeof(datasketches::update_theta_sketch)) {
-    auto sketch_ptr = reinterpret_cast<datasketches::theta_sketch*>(src.ptr);
+    auto sketch_ptr = reinterpret_cast<datasketches::update_theta_sketch*>(src.ptr);
     estimate = sketch_ptr->get_estimate();
   } else {
     auto union_ptr = reinterpret_cast<datasketches::theta_union*>(src.ptr);
@@ -1968,8 +1971,8 @@ StringVal AggregateFunctions::DsThetaFinalizeSketch(
       or src.len == sizeof(datasketches::theta_union));
   StringVal result;
   if (src.len == sizeof(datasketches::update_theta_sketch)) {
-    auto sketch_ptr = reinterpret_cast<datasketches::theta_sketch*>(src.ptr);
-    result = SerializeDsThetaSketch(ctx, *sketch_ptr);
+    auto sketch_ptr = reinterpret_cast<datasketches::update_theta_sketch*>(src.ptr);
+    result = SerializeDsThetaSketch(ctx, sketch_ptr->compact());
   } else {
     auto union_ptr = reinterpret_cast<datasketches::theta_union*>(src.ptr);
     result = SerializeDsThetaUnion(ctx, *union_ptr);
@@ -1995,14 +1998,14 @@ void AggregateFunctions::DsThetaUnionUpdate(
   if (src.is_null) return;
   DCHECK(!dst->is_null);
   DCHECK_EQ(dst->len, sizeof(datasketches::theta_union));
-  try {
-    auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
-    datasketches::theta_union* union_ptr =
-        reinterpret_cast<datasketches::theta_union*>(dst->ptr);
-    union_ptr->update(*src_sketch);
-  } catch (const std::exception&) {
+  std::unique_ptr<datasketches::compact_theta_sketch> src_sketch;
+  if (!DeserializeDsSketch(src, &src_sketch)) {
     LogSketchDeserializationError(ctx);
+    return;
   }
+  datasketches::theta_union* union_ptr =
+      reinterpret_cast<datasketches::theta_union*>(dst->ptr);
+  union_ptr->update(*src_sketch);
 }
 
 StringVal AggregateFunctions::DsThetaUnionSerialize(
@@ -2023,7 +2026,11 @@ void AggregateFunctions::DsThetaUnionMerge(
   DCHECK_EQ(dst->len, sizeof(datasketches::theta_union));
 
   // Note, 'src' is a serialized compact_theta_sketch and not a serialized theta_union.
-  auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
+  std::unique_ptr<datasketches::compact_theta_sketch> src_sketch;
+  if (!DeserializeDsSketch(src, &src_sketch)) {
+    LogSketchDeserializationError(ctx);
+    return;
+  }
 
   datasketches::theta_union* dst_union_ptr =
       reinterpret_cast<datasketches::theta_union*>(dst->ptr);
@@ -2063,15 +2070,14 @@ void AggregateFunctions::DsThetaIntersectUpdate(
   if (src.is_null) return;
   DCHECK(!dst->is_null);
   DCHECK_EQ(dst->len, sizeof(datasketches::theta_intersection));
-  try {
-    auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
-    if (src_sketch->is_empty()) return;
-    datasketches::theta_intersection* intersection_ptr =
-        reinterpret_cast<datasketches::theta_intersection*>(dst->ptr);
-    intersection_ptr->update(*src_sketch);
-  } catch (const std::exception&) {
+  std::unique_ptr<datasketches::compact_theta_sketch> src_sketch;
+  if (!DeserializeDsSketch(src, &src_sketch)) {
     LogSketchDeserializationError(ctx);
+    return;
   }
+  datasketches::theta_intersection* intersection_ptr =
+      reinterpret_cast<datasketches::theta_intersection*>(dst->ptr);
+  intersection_ptr->update(*src_sketch);
 }
 
 StringVal AggregateFunctions::DsThetaIntersectSerialize(
@@ -2093,7 +2099,11 @@ void AggregateFunctions::DsThetaIntersectMerge(
 
   // Note, 'src' is a serialized compact_theta_sketch and not a serialized
   // theta_intersection.
-  auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
+  std::unique_ptr<datasketches::compact_theta_sketch> src_sketch;
+  if (!DeserializeDsSketch(src, &src_sketch)) {
+    LogSketchDeserializationError(ctx);
+    return;
+  }
 
   datasketches::theta_intersection* dst_intersection_ptr =
       reinterpret_cast<datasketches::theta_intersection*>(dst->ptr);
diff --git a/be/src/exprs/datasketches-common.cc b/be/src/exprs/datasketches-common.cc
index e76ad83..060277c 100644
--- a/be/src/exprs/datasketches-common.cc
+++ b/be/src/exprs/datasketches-common.cc
@@ -27,6 +27,7 @@ namespace impala {
 using datasketches::hll_sketch;
 using datasketches::kll_sketch;
 using datasketches::theta_sketch;
+using datasketches::compact_theta_sketch;
 using impala_udf::StringVal;
 using std::stringstream;
 using std::vector;
@@ -51,13 +52,14 @@ bool DeserializeDsSketch(const StringVal& serialized_sketch, T* sketch) {
 
 // This is a specialization of the template DeserializeDsSketch() for theta sketches.
 template <>
-bool DeserializeDsSketch(
-    const StringVal& serialized_sketch, theta_sketch::unique_ptr* sketch) {
-  DCHECK(sketch != nullptr);
+bool DeserializeDsSketch(const StringVal& serialized_sketch,
+    std::unique_ptr<compact_theta_sketch>* sketch_ptr) {
+  DCHECK(sketch_ptr->get() == nullptr);
   if (serialized_sketch.is_null || serialized_sketch.len == 0) return false;
   try {
-    *sketch =
-        theta_sketch::deserialize((void*)serialized_sketch.ptr, serialized_sketch.len);
+    auto sketch = compact_theta_sketch::deserialize(
+        (void*)serialized_sketch.ptr, serialized_sketch.len);
+    *sketch_ptr = std::make_unique<compact_theta_sketch>(sketch);
     return true;
   } catch (const std::exception&) {
     // One reason of throwing from deserialization is that the input string is not a
@@ -81,7 +83,7 @@ StringVal StringStreamToStringVal(FunctionContext* ctx, const stringstream& str_
 bool update_sketch_to_theta_union(FunctionContext* ctx,
     const StringVal& serialized_sketch, datasketches::theta_union& sketch) {
   if (!serialized_sketch.is_null && serialized_sketch.len > 0) {
-    datasketches::theta_sketch::unique_ptr sketch_ptr;
+    std::unique_ptr<datasketches::compact_theta_sketch> sketch_ptr;
     if (!DeserializeDsSketch(serialized_sketch, &sketch_ptr)) {
       LogSketchDeserializationError(ctx);
       return false;
@@ -94,7 +96,7 @@ bool update_sketch_to_theta_union(FunctionContext* ctx,
 bool update_sketch_to_theta_intersection(FunctionContext* ctx,
     const StringVal& serialized_sketch, datasketches::theta_intersection& sketch) {
   if (!serialized_sketch.is_null && serialized_sketch.len > 0) {
-    datasketches::theta_sketch::unique_ptr sketch_ptr;
+    std::unique_ptr<datasketches::compact_theta_sketch> sketch_ptr;
     if (!DeserializeDsSketch(serialized_sketch, &sketch_ptr)) {
       LogSketchDeserializationError(ctx);
       return false;
diff --git a/be/src/exprs/datasketches-functions-ir.cc b/be/src/exprs/datasketches-functions-ir.cc
index a7e7b74..1bf3deb 100644
--- a/be/src/exprs/datasketches-functions-ir.cc
+++ b/be/src/exprs/datasketches-functions-ir.cc
@@ -111,32 +111,26 @@ StringVal DataSketchesFunctions::DsHllStringify(FunctionContext* ctx,
 BigIntVal DataSketchesFunctions::DsThetaEstimate(
     FunctionContext* ctx, const StringVal& serialized_sketch) {
   if (serialized_sketch.is_null || serialized_sketch.len == 0) return 0;
-  try {
-    // serialized_sketch may be a serialized of update_theta_sketch or
-    // compact_theta_sketch
-    auto sketch = datasketches::theta_sketch::deserialize(
-        (void*)serialized_sketch.ptr, serialized_sketch.len);
-    return sketch->get_estimate();
-  } catch (const std::exception&) {
-    // One reason of throwing from deserialization is that the input string is not a
-    // serialized sketch.
+  std::unique_ptr<datasketches::compact_theta_sketch> sketch;
+  if (!DeserializeDsSketch(serialized_sketch, &sketch)) {
     LogSketchDeserializationError(ctx);
     return BigIntVal::null();
   }
+  return sketch->get_estimate();
 }
 
 StringVal DataSketchesFunctions::DsThetaExclude(FunctionContext* ctx,
     const StringVal& first_serialized_sketch, const StringVal& second_serialized_sketch) {
   datasketches::theta_a_not_b a_not_b;
   // Deserialize two sketches
-  datasketches::theta_sketch::unique_ptr first_sketch_ptr;
+  std::unique_ptr<datasketches::compact_theta_sketch> first_sketch_ptr;
   if (!first_serialized_sketch.is_null && first_serialized_sketch.len > 0) {
     if (!DeserializeDsSketch(first_serialized_sketch, &first_sketch_ptr)) {
       LogSketchDeserializationError(ctx);
       return StringVal::null();
     }
   }
-  datasketches::theta_sketch::unique_ptr second_sketch_ptr;
+  std::unique_ptr<datasketches::compact_theta_sketch> second_sketch_ptr;
   if (!second_serialized_sketch.is_null && second_serialized_sketch.len > 0) {
     if (!DeserializeDsSketch(second_serialized_sketch, &second_sketch_ptr)) {
       LogSketchDeserializationError(ctx);