You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2021/03/24 19:00:46 UTC

[impala] 02/02: IMPALA-10580: Implement ds_theta_union_f() function

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 622e3c95adca5cf30a0aff6542556feab9b8a861
Author: Fucun Chu <ch...@hotmail.com>
AuthorDate: Fri Mar 12 16:11:13 2021 +0800

    IMPALA-10580: Implement ds_theta_union_f() function
    
    This function receives two strings that are serialized Apache
    DataSketches Theta sketches. Union two sketches and returns the
    resulting sketch of union.
    
    Example:
    select ds_theta_estimate(ds_theta_union_f(sketch1, sketch2))
    from sketch_tbl;
    +-------------------------------------------------------+
    | ds_theta_estimate(ds_theta_union_f(sketch1, sketch2)) |
    +-------------------------------------------------------+
    | 15                                                    |
    +-------------------------------------------------------+
    
    Change-Id: I8329979b81ceeaad739a43fab79768ca9c2916fa
    Reviewed-on: http://gerrit.cloudera.org:8080/17179
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/datasketches-functions-ir.cc          | 31 +++++++++
 be/src/exprs/datasketches-functions.h              |  7 ++
 common/function-registry/impala_functions.py       |  2 +
 .../queries/QueryTest/datasketches-theta.test      | 74 ++++++++++++++++++++++
 4 files changed, 114 insertions(+)

diff --git a/be/src/exprs/datasketches-functions-ir.cc b/be/src/exprs/datasketches-functions-ir.cc
index bf0eb1a..3a3fa4a 100644
--- a/be/src/exprs/datasketches-functions-ir.cc
+++ b/be/src/exprs/datasketches-functions-ir.cc
@@ -21,6 +21,7 @@
 #include "gutil/strings/substitute.h"
 #include "thirdparty/datasketches/hll.hpp"
 #include "thirdparty/datasketches/theta_sketch.hpp"
+#include "thirdparty/datasketches/theta_union.hpp"
 #include "thirdparty/datasketches/theta_a_not_b.hpp"
 #include "thirdparty/datasketches/kll_sketch.hpp"
 #include "udf/udf-internal.h"
@@ -159,6 +160,36 @@ StringVal DataSketchesFunctions::DsThetaExclude(FunctionContext* ctx,
   return StringVal::null();
 }
 
+bool update_sketch_to_theta_union(FunctionContext* ctx,
+    const StringVal& serialized_sketch, datasketches::theta_union& union_sketch) {
+  if (!serialized_sketch.is_null && serialized_sketch.len > 0) {
+    datasketches::theta_sketch::unique_ptr sketch_ptr;
+    if (!DeserializeDsSketch(serialized_sketch, &sketch_ptr)) {
+      LogSketchDeserializationError(ctx);
+      return false;
+    }
+    union_sketch.update(*sketch_ptr);
+  }
+  return true;
+}
+
+StringVal DataSketchesFunctions::DsThetaUnionF(FunctionContext* ctx,
+    const StringVal& first_serialized_sketch, const StringVal& second_serialized_sketch) {
+  datasketches::theta_union union_sketch = datasketches::theta_union::builder().build();
+  // Update two sketches to theta_union
+  if (!update_sketch_to_theta_union(ctx, first_serialized_sketch, union_sketch)) {
+    return StringVal::null();
+  }
+  if (!update_sketch_to_theta_union(ctx, second_serialized_sketch, union_sketch)) {
+    return StringVal::null();
+  }
+  //  Result
+  datasketches::compact_theta_sketch sketch = union_sketch.get_result();
+  std::stringstream serialized_input;
+  sketch.serialize(serialized_input);
+  return StringStreamToStringVal(ctx, serialized_input);
+}
+
 FloatVal DataSketchesFunctions::DsKllQuantile(FunctionContext* ctx,
     const StringVal& serialized_sketch, const DoubleVal& rank) {
   if (serialized_sketch.is_null || serialized_sketch.len == 0) return FloatVal::null();
diff --git a/be/src/exprs/datasketches-functions.h b/be/src/exprs/datasketches-functions.h
index 26f276c..15d9774 100644
--- a/be/src/exprs/datasketches-functions.h
+++ b/be/src/exprs/datasketches-functions.h
@@ -78,6 +78,13 @@ public:
       const StringVal& first_serialized_sketch,
       const StringVal& second_serialized_sketch);
 
+  /// 'first_serialized_sketch' and 'second_serialized_sketch' are both expected as
+  /// serialized Apache DataSketches Theta sketches. If they are not, then the query
+  /// fails. Union two sketches and returns the resulting sketch of union.
+  static StringVal DsThetaUnionF(FunctionContext* ctx,
+      const StringVal& first_serialized_sketch,
+      const StringVal& second_serialized_sketch);
+
   /// 'serialized_sketch' is expected as a serialized Apache DataSketches KLL sketch. If
   /// it is not, then the query fails. 'rank' is used to identify which item (estimate)
   /// to return from the sketched dataset. E.g. 0.1 means the item where 10% of the
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index ee9f87f..e46ef89 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -1007,6 +1007,8 @@ visible_functions = [
      '_ZN6impala21DataSketchesFunctions15DsThetaEstimateEPN10impala_udf15FunctionContextERKNS1_9StringValE'],
   [['ds_theta_exclude'], 'STRING', ['STRING', 'STRING'],
      '_ZN6impala21DataSketchesFunctions14DsThetaExcludeEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
+  [['ds_theta_union_f'], 'STRING', ['STRING', 'STRING'],
+     '_ZN6impala21DataSketchesFunctions13DsThetaUnionFEPN10impala_udf15FunctionContextERKNS1_9StringValES6_'],
   [['ds_kll_quantile'], 'FLOAT', ['STRING', 'DOUBLE'],
       '_ZN6impala21DataSketchesFunctions13DsKllQuantileEPN10impala_udf15FunctionContextERKNS1_9StringValERKNS1_9DoubleValE'],
   [['ds_kll_n'], 'BIGINT', ['STRING'],
diff --git a/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test b/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
index 34a3a0e..27a0c06 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
@@ -407,4 +407,78 @@ select ds_theta_estimate(ds_theta_exclude(sketch1, sketch2)) from sketch_interme
 BIGINT
 ---- RESULTS
 5
+====
+---- QUERY
+# Checks that unioning a valid sketch with a null value result the valid sketch being
+# returned.
+select
+    ds_theta_estimate(ds_theta_union_f(date_sketch, null)),
+    ds_theta_estimate(ds_theta_union_f(null, float_sketch))
+from sketch_store;
+---- TYPES
+BIGINT,BIGINT
+---- RESULTS
+3,10
+3,10
+3,10
+3,10
+====
+---- QUERY
+# Check that ds_theta_union_f() returns an empty sketch for an empty sketch.
+select ds_theta_estimate(ds_theta_union_f(ds_theta_sketch(cast(f2 as float)), null))
+from functional_parquet.emptytable;
+---- TYPES
+BIGINT
+---- RESULTS
+0
+====
+---- QUERY
+# Checks that ds_theta_union_f() returns an empty sketch for NULL inputs.
+select ds_theta_estimate(ds_theta_union_f(null_str, some_nulls)) from
+functional_parquet.nullrows where id='b';
+---- TYPES
+BIGINT
+---- RESULTS
+0
+====
+---- QUERY
+# ds_theta_union_f() returns an error if it receives an invalid serialized sketch.
+select ds_theta_union_f(null, date_string_col) from functional_parquet.alltypestiny
+where id=1;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch.
+====
+---- QUERY
+# ds_theta_union_f() returns an error if it receives an invalid serialized sketch.
+select ds_theta_union_f(date_string_col, null) from functional_parquet.alltypestiny
+where id=1;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch.
+====
+---- QUERY
+# Unions the sketches from theta_sketches_impala_hive2 and checks if the union produces
+# the same result as if these sketches were used separately to get the estimates.
+select
+    ds_theta_estimate(ds_theta_union_f(i_ti, h_ti)) as ti,
+    ds_theta_estimate(ds_theta_union_f(i_i, h_i)) as i,
+    ds_theta_estimate(ds_theta_union_f(i_bi, h_bi)) as bi,
+    ds_theta_estimate(ds_theta_union_f(i_f, h_f)) as f,
+    ds_theta_estimate(ds_theta_union_f(i_d, h_d)) as d,
+    ds_theta_estimate(ds_theta_union_f(i_s, h_s)) as s,
+    ds_theta_estimate(ds_theta_union_f(i_c, h_c)) as c,
+    ds_theta_estimate(ds_theta_union_f(i_v, h_v)) as v,
+    ds_theta_estimate(ds_theta_union_f(i_nc, h_nc)) as nc
+from theta_sketches_impala_hive2;
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+5,7,6,6,7,4,4,3,0
+====
+---- QUERY
+# Union two sketches from different columns of sketch_intermediate.
+select ds_theta_estimate(ds_theta_union_f(sketch1, sketch2)) from sketch_intermediate;
+---- TYPES
+BIGINT
+---- RESULTS
+15
 ====
\ No newline at end of file