You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/03/30 01:30:45 UTC

[impala] 01/04: IMPALA-10581: Implement ds_theta_intersect_f() function

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 77d6acd032333d54063b43db25d35b5f29fc2c38
Author: Fucun Chu <ch...@hotmail.com>
AuthorDate: Fri Mar 12 16:59:05 2021 +0800

    IMPALA-10581: Implement ds_theta_intersect_f() function
    
    This function receives two strings that are serialized Apache
    DataSketches Theta sketches. Computes the intersection of two sketches
    of same or different column and returns the resulting sketch of
    intersection.
    
    Example:
    select ds_theta_estimate(ds_theta_intersect_f(sketch1, sketch2))
    from sketch_tbl;
    +-----------------------------------------------------------+
    | ds_theta_estimate(ds_theta_intersect_f(sketch1, sketch2)) |
    +-----------------------------------------------------------+
    | 5                                                         |
    +-----------------------------------------------------------+
    
    Change-Id: I335eada00730036d5433775cfe673e0e4babaa01
    Reviewed-on: http://gerrit.cloudera.org:8080/17186
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/datasketches-common.cc                | 27 +++++++
 be/src/exprs/datasketches-common.h                 | 15 ++++
 be/src/exprs/datasketches-functions-ir.cc          | 33 +++++----
 be/src/exprs/datasketches-functions.h              |  8 ++
 common/function-registry/impala_functions.py       |  2 +
 .../queries/QueryTest/datasketches-theta.test      | 85 ++++++++++++++++++++++
 6 files changed, 157 insertions(+), 13 deletions(-)

diff --git a/be/src/exprs/datasketches-common.cc b/be/src/exprs/datasketches-common.cc
index c80bd5b..e76ad83 100644
--- a/be/src/exprs/datasketches-common.cc
+++ b/be/src/exprs/datasketches-common.cc
@@ -78,6 +78,33 @@ StringVal StringStreamToStringVal(FunctionContext* ctx, const stringstream& str_
   return dst;
 }
 
+bool update_sketch_to_theta_union(FunctionContext* ctx,
+    const StringVal& serialized_sketch, datasketches::theta_union& sketch) {
+  if (!serialized_sketch.is_null && serialized_sketch.len > 0) {
+    datasketches::theta_sketch::unique_ptr sketch_ptr;
+    if (!DeserializeDsSketch(serialized_sketch, &sketch_ptr)) {
+      LogSketchDeserializationError(ctx);
+      return false;
+    }
+    sketch.update(*sketch_ptr);
+  }
+  return true;
+}
+
+bool update_sketch_to_theta_intersection(FunctionContext* ctx,
+    const StringVal& serialized_sketch, datasketches::theta_intersection& sketch) {
+  if (!serialized_sketch.is_null && serialized_sketch.len > 0) {
+    datasketches::theta_sketch::unique_ptr sketch_ptr;
+    if (!DeserializeDsSketch(serialized_sketch, &sketch_ptr)) {
+      LogSketchDeserializationError(ctx);
+      return false;
+    }
+    sketch.update(*sketch_ptr);
+    return true;
+  }
+  return false;
+}
+
 template<class T>
 StringVal DsKllVectorResultToStringVal(FunctionContext* ctx,
     const vector<T>& kll_result) {
diff --git a/be/src/exprs/datasketches-common.h b/be/src/exprs/datasketches-common.h
index 2f2b4d5..20a7863 100644
--- a/be/src/exprs/datasketches-common.h
+++ b/be/src/exprs/datasketches-common.h
@@ -22,6 +22,8 @@
 
 #include "common/status.h"
 #include "thirdparty/datasketches/hll.hpp"
+#include "thirdparty/datasketches/theta_union.hpp"
+#include "thirdparty/datasketches/theta_intersection.hpp"
 #include "udf/udf.h"
 
 namespace impala {
@@ -59,6 +61,19 @@ bool DeserializeDsSketch(const StringVal& serialized_sketch, T* sketch)
 StringVal StringStreamToStringVal(FunctionContext* ctx,
     const std::stringstream& str_stream);
 
+/// Helper function that receives a serialized DataSketches Theta sketch in
+/// 'serialized_sketch', deserializes it and update the deserialized sketch to 'sketch'.
+/// Returns false if the deserialization fails (the error log will be written),
+/// true otherwise.
+bool update_sketch_to_theta_union(FunctionContext* ctx,
+    const StringVal& serialized_sketch, datasketches::theta_union& sketch);
+
+/// Helper function that receives a serialized DataSketches Theta sketch in
+/// 'serialized_sketch', deserializes it and update the deserialized sketch to 'sketch'.
+/// Returns false if 'serialized_sketch' is empty or deserialization fails (the error log
+/// will be written), true otherwise.
+bool update_sketch_to_theta_intersection(FunctionContext* ctx,
+    const StringVal& serialized_sketch, datasketches::theta_intersection& sketch);
 
 /// Helper function that receives a vector and returns a comma separated StringVal that
 /// holds the items from the vector keeping the order.
diff --git a/be/src/exprs/datasketches-functions-ir.cc b/be/src/exprs/datasketches-functions-ir.cc
index 3a3fa4a..a7e7b74 100644
--- a/be/src/exprs/datasketches-functions-ir.cc
+++ b/be/src/exprs/datasketches-functions-ir.cc
@@ -22,6 +22,7 @@
 #include "thirdparty/datasketches/hll.hpp"
 #include "thirdparty/datasketches/theta_sketch.hpp"
 #include "thirdparty/datasketches/theta_union.hpp"
+#include "thirdparty/datasketches/theta_intersection.hpp"
 #include "thirdparty/datasketches/theta_a_not_b.hpp"
 #include "thirdparty/datasketches/kll_sketch.hpp"
 #include "udf/udf-internal.h"
@@ -160,19 +161,6 @@ StringVal DataSketchesFunctions::DsThetaExclude(FunctionContext* ctx,
   return StringVal::null();
 }
 
-bool update_sketch_to_theta_union(FunctionContext* ctx,
-    const StringVal& serialized_sketch, datasketches::theta_union& union_sketch) {
-  if (!serialized_sketch.is_null && serialized_sketch.len > 0) {
-    datasketches::theta_sketch::unique_ptr sketch_ptr;
-    if (!DeserializeDsSketch(serialized_sketch, &sketch_ptr)) {
-      LogSketchDeserializationError(ctx);
-      return false;
-    }
-    union_sketch.update(*sketch_ptr);
-  }
-  return true;
-}
-
 StringVal DataSketchesFunctions::DsThetaUnionF(FunctionContext* ctx,
     const StringVal& first_serialized_sketch, const StringVal& second_serialized_sketch) {
   datasketches::theta_union union_sketch = datasketches::theta_union::builder().build();
@@ -190,6 +178,25 @@ StringVal DataSketchesFunctions::DsThetaUnionF(FunctionContext* ctx,
   return StringStreamToStringVal(ctx, serialized_input);
 }
 
+StringVal DataSketchesFunctions::DsThetaIntersectF(FunctionContext* ctx,
+    const StringVal& first_serialized_sketch, const StringVal& second_serialized_sketch) {
+  datasketches::theta_intersection intersection_sketch;
+  // Update two sketches to theta_intersection
+  // Note that if one of the sketches is null, null is returned.
+  if (!update_sketch_to_theta_intersection(
+          ctx, first_serialized_sketch, intersection_sketch)) {
+    return StringVal::null();
+  }
+  if (!update_sketch_to_theta_intersection(
+          ctx, second_serialized_sketch, intersection_sketch)) {
+    return StringVal::null();
+  }
+  datasketches::compact_theta_sketch sketch = intersection_sketch.get_result();
+  std::stringstream serialized_input;
+  sketch.serialize(serialized_input);
+  return StringStreamToStringVal(ctx, serialized_input);
+}
+
 FloatVal DataSketchesFunctions::DsKllQuantile(FunctionContext* ctx,
     const StringVal& serialized_sketch, const DoubleVal& rank) {
   if (serialized_sketch.is_null || serialized_sketch.len == 0) return FloatVal::null();
diff --git a/be/src/exprs/datasketches-functions.h b/be/src/exprs/datasketches-functions.h
index 15d9774..7013945 100644
--- a/be/src/exprs/datasketches-functions.h
+++ b/be/src/exprs/datasketches-functions.h
@@ -85,6 +85,14 @@ public:
       const StringVal& first_serialized_sketch,
       const StringVal& second_serialized_sketch);
 
+  /// 'first_serialized_sketch' and 'second_serialized_sketch' are both expected as
+  /// serialized Apache DataSketches Theta sketches. If they are not, then the query
+  /// fails. Intersect two sketches and return the resulting sketch after the
+  /// intersection.
+  static StringVal DsThetaIntersectF(FunctionContext* ctx,
+      const StringVal& first_serialized_sketch,
+      const StringVal& second_serialized_sketch);
+
   /// 'serialized_sketch' is expected as a serialized Apache DataSketches KLL sketch. If
   /// it is not, then the query fails. 'rank' is used to identify which item (estimate)
   /// to return from the sketched dataset. E.g. 0.1 means the item where 10% of the
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index e46ef89..fde4c71 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -1009,6 +1009,8 @@ visible_functions = [
      '_ZN6impala21DataSketchesFunctions14DsThetaExcludeEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
   [['ds_theta_union_f'], 'STRING', ['STRING', 'STRING'],
      '_ZN6impala21DataSketchesFunctions13DsThetaUnionFEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
+  [['ds_theta_intersect_f'], 'STRING', ['STRING', 'STRING'],
+     '_ZN6impala21DataSketchesFunctions17DsThetaIntersectFEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
   [['ds_kll_quantile'], 'FLOAT', ['STRING', 'DOUBLE'],
       '_ZN6impala21DataSketchesFunctions13DsKllQuantileEPN10impala_udf15FunctionContextERKNS1_9StringValERKNS1_9DoubleValE'],
   [['ds_kll_n'], 'BIGINT', ['STRING'],
diff --git a/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test b/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
index 27a0c06..cc03b22 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
@@ -481,4 +481,89 @@ select ds_theta_estimate(ds_theta_union_f(sketch1, sketch2)) from sketch_interme
 BIGINT
 ---- RESULTS
 15
+====
+---- QUERY
+# Check if the intersection of a valid sketch with a null value returns an null sketch.
+select
+    ds_theta_estimate(ds_theta_intersect_f(date_sketch, null)),
+    ds_theta_estimate(ds_theta_intersect_f(null, float_sketch))
+from sketch_store;
+---- TYPES
+BIGINT,BIGINT
+---- RESULTS
+0,0
+0,0
+0,0
+0,0
+====
+---- QUERY
+# Check that ds_theta_intersect_f() returns an empty sketch for 2 empty sketch.
+select ds_theta_estimate(ds_theta_intersect_f(
+ds_theta_sketch(cast(f2 as float)), ds_theta_sketch(cast(f2 as float))))
+from functional_parquet.emptytable;
+---- TYPES
+BIGINT
+---- RESULTS
+0
+====
+---- QUERY
+# Checks that ds_theta_intersect_f() returns an null sketch for 2 null inputs.
+select ds_theta_estimate(ds_theta_intersect_f(null_str, some_nulls)) from
+functional_parquet.nullrows where id='b';
+---- TYPES
+BIGINT
+---- RESULTS
+0
+====
+---- QUERY
+# ds_theta_intersect_f() returns an error if it receives an invalid serialized sketch.
+select ds_theta_intersect_f(date_string_col, null) from functional_parquet.alltypestiny
+where id=1;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch.
+====
+---- QUERY
+# ds_theta_intersect_f() returns an error if it receives an invalid serialized sketch.
+select ds_theta_intersect_f(sketch1, sketch2) from (
+select ds_theta_sketch(float_col) sketch1, max(date_string_col) sketch2 from
+functional_parquet.alltypestiny
+) t
+---- CATCH
+UDF ERROR: Unable to deserialize sketch.
+====
+---- QUERY
+# Intersect the sketches from theta_sketches_impala_hive2 and checks if the intersect
+# produces the same result as if these sketches were used separately to get the estimates.
+select
+    ds_theta_estimate(ds_theta_intersect_f(i_ti, h_ti)) as ti,
+    ds_theta_estimate(ds_theta_intersect_f(i_i, h_i)) as i,
+    ds_theta_estimate(ds_theta_intersect_f(i_bi, h_bi)) as bi,
+    ds_theta_estimate(ds_theta_intersect_f(i_f, h_f)) as f,
+    ds_theta_estimate(ds_theta_intersect_f(i_d, h_d)) as d,
+    ds_theta_estimate(ds_theta_intersect_f(i_s, h_s)) as s,
+    ds_theta_estimate(ds_theta_intersect_f(i_c, h_c)) as c,
+    ds_theta_estimate(ds_theta_intersect_f(i_v, h_v)) as v,
+    ds_theta_estimate(ds_theta_intersect_f(i_nc, h_nc)) as nc
+from theta_sketches_impala_hive2;
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+5,7,6,6,7,4,4,3,0
+====
+---- QUERY
+# Check that the non-empty input sketches are distinct and the result is empty.
+select ds_theta_estimate(ds_theta_intersect_f(date_sketch, float_sketch))
+from sketch_store where year=2009 and month=1;
+---- TYPES
+BIGINT
+---- RESULTS
+0
+====
+---- QUERY
+# Check When the inputs aren't the same but has some (but not all) items common.
+select ds_theta_estimate(ds_theta_intersect_f(sketch1, sketch2)) from sketch_intermediate;
+---- TYPES
+BIGINT
+---- RESULTS
+5
 ====
\ No newline at end of file