You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/03/13 02:24:27 UTC

[impala] 03/08: IMPALA-10520: Implement ds_theta_intersect() function

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0d22e89df452d3e21912b60c72a087a3e8c38057
Author: Fucun Chu <ch...@hotmail.com>
AuthorDate: Thu Feb 4 15:24:25 2021 +0800

    IMPALA-10520: Implement ds_theta_intersect() function
    
    This function receives a set of serialized Apache DataSketches Theta
    sketches produced by ds_theta_sketch() and intersects them into a
    single sketch.
    
    An example usage is to create a sketch for each partition of a table,
    write these sketches to a separate table and intersect them to get
    estimates based on the partitions the user is interested in related
    sketches. E.g.:
      SELECT
          ds_theta_estimate(ds_theta_intersect(sketch_col))
      FROM sketch_tbl
      WHERE partition_col=1 OR partition_col=5;
    
    Testing:
      - Apart from the automated tests I added to this patch I also
        tested ds_theta_intersect() on a bigger dataset to check that
        serialization, deserialization and merging steps work well. I
        took TPCH25.linelitem, created a number of sketches with grouping
        by l_shipdate and called ds_theta_intersect() on those sketches
    
    Change-Id: I80e68c2151c4604f0386d3dfb004c82b10293f97
    Reviewed-on: http://gerrit.cloudera.org:8080/17088
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/aggregate-functions-ir.cc             | 87 ++++++++++++++++++++++
 be/src/exprs/aggregate-functions.h                 |  9 +++
 .../java/org/apache/impala/catalog/BuiltinsDb.java | 14 ++++
 .../queries/QueryTest/datasketches-theta.test      | 72 ++++++++++++++++++
 4 files changed, 182 insertions(+)

diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 23ed6d3..f2a078a 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -42,6 +42,7 @@
 #include "thirdparty/datasketches/hll.hpp"
 #include "thirdparty/datasketches/theta_sketch.hpp"
 #include "thirdparty/datasketches/theta_union.hpp"
+#include "thirdparty/datasketches/theta_intersection.hpp"
 #include "thirdparty/datasketches/kll_sketch.hpp"
 #include "util/arithmetic-util.h"
 #include "util/mpfit-util.h"
@@ -1660,6 +1661,22 @@ StringVal SerializeDsThetaUnion(
   return SerializeDsThetaSketch(ctx, sketch);
 }
 
+/// Auxiliary function that receives a theta_intersection, gets the underlying Theta
+/// sketch from the intersection object and returns the serialized, compacted Theta sketch
+/// wrapped into StringVal (may be null).
+/// Introducing this function in the .cc to avoid including the whole DataSketches Theta
+/// functionality into the header.
+StringVal SerializeDsThetaIntersection(
+    FunctionContext* ctx, const datasketches::theta_intersection& ds_intersection) {
+  std::stringstream serialized_input;
+  // Calling get_result() before calling update() is undefined, so you need to check.
+  if (ds_intersection.has_result()) {
+    datasketches::compact_theta_sketch sketch = ds_intersection.get_result();
+    return SerializeDsThetaSketch(ctx, sketch);
+  }
+  return StringVal::null();
+}
+
 /// Auxiliary function that receives a kll_sketch<float> and returns the serialized
 /// version of it wrapped into a StringVal.
 /// Introducing this function in the .cc to avoid including the whole DataSketches HLL
@@ -2030,6 +2047,76 @@ StringVal AggregateFunctions::DsThetaUnionFinalize(
   return result;
 }
 
+void AggregateFunctions::DsThetaIntersectInit(FunctionContext* ctx, StringVal* slot) {
+  AllocBuffer(ctx, slot, sizeof(datasketches::theta_intersection));
+  if (UNLIKELY(slot->is_null)) {
+    DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+    return;
+  }
+  datasketches::theta_intersection* intersection_ptr =
+      reinterpret_cast<datasketches::theta_intersection*>(slot->ptr);
+  *intersection_ptr = datasketches::theta_intersection();
+}
+
+void AggregateFunctions::DsThetaIntersectUpdate(
+    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+  if (src.is_null) return;
+  DCHECK(!dst->is_null);
+  DCHECK_EQ(dst->len, sizeof(datasketches::theta_intersection));
+  try {
+    auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
+    if (src_sketch->is_empty()) return;
+    datasketches::theta_intersection* intersection_ptr =
+        reinterpret_cast<datasketches::theta_intersection*>(dst->ptr);
+    intersection_ptr->update(*src_sketch);
+  } catch (const std::exception&) {
+    LogSketchDeserializationError(ctx);
+  }
+}
+
+StringVal AggregateFunctions::DsThetaIntersectSerialize(
+    FunctionContext* ctx, const StringVal& src) {
+  DCHECK(!src.is_null);
+  DCHECK_EQ(src.len, sizeof(datasketches::theta_intersection));
+  datasketches::theta_intersection* intersection_ptr =
+      reinterpret_cast<datasketches::theta_intersection*>(src.ptr);
+  StringVal dst = SerializeDsThetaIntersection(ctx, *intersection_ptr);
+  ctx->Free(src.ptr);
+  return dst;
+}
+
+void AggregateFunctions::DsThetaIntersectMerge(
+    FunctionContext* ctx, const StringVal& src, StringVal* dst) {
+  if (src.is_null) return;
+  DCHECK(!dst->is_null);
+  DCHECK_EQ(dst->len, sizeof(datasketches::theta_intersection));
+
+  // Note, 'src' is a serialized compact_theta_sketch and not a serialized
+  // theta_intersection.
+  auto src_sketch = datasketches::theta_sketch::deserialize((void*)src.ptr, src.len);
+
+  datasketches::theta_intersection* dst_intersection_ptr =
+      reinterpret_cast<datasketches::theta_intersection*>(dst->ptr);
+
+  dst_intersection_ptr->update(*src_sketch);
+}
+
+StringVal AggregateFunctions::DsThetaIntersectFinalize(
+    FunctionContext* ctx, const StringVal& src) {
+  DCHECK(!src.is_null);
+  DCHECK_EQ(src.len, sizeof(datasketches::theta_intersection));
+  datasketches::theta_intersection* intersection_ptr =
+      reinterpret_cast<datasketches::theta_intersection*>(src.ptr);
+  if (!intersection_ptr->has_result()) {
+    ctx->Free(src.ptr);
+    return StringVal::null();
+  }
+  auto sketch = intersection_ptr->get_result();
+  StringVal result = SerializeDsThetaSketch(ctx, sketch);
+  ctx->Free(src.ptr);
+  return result;
+}
+
 void AggregateFunctions::DsKllInitHelper(FunctionContext* ctx, StringVal* slot) {
   AllocBuffer(ctx, slot, sizeof(datasketches::kll_sketch<float>));
   if (UNLIKELY(slot->is_null)) {
diff --git a/be/src/exprs/aggregate-functions.h b/be/src/exprs/aggregate-functions.h
index 474e23d..104eda8 100644
--- a/be/src/exprs/aggregate-functions.h
+++ b/be/src/exprs/aggregate-functions.h
@@ -266,6 +266,15 @@ class AggregateFunctions {
   static void DsThetaUnionMerge(FunctionContext*, const StringVal& src, StringVal* dst);
   static StringVal DsThetaUnionFinalize(FunctionContext*, const StringVal& src);
 
+  /// These functions implement ds_theta_intersect().
+  static void DsThetaIntersectInit(FunctionContext*, StringVal* slot);
+  static void DsThetaIntersectUpdate(
+      FunctionContext*, const StringVal& src, StringVal* dst);
+  static StringVal DsThetaIntersectSerialize(FunctionContext*, const StringVal& src);
+  static void DsThetaIntersectMerge(
+      FunctionContext*, const StringVal& src, StringVal* dst);
+  static StringVal DsThetaIntersectFinalize(FunctionContext*, const StringVal& src);
+
   /// These functions implement Apache DataSketches KLL support for sketching.
   static void DsKllInit(FunctionContext*, StringVal* slot);
   static void DsKllUpdate(FunctionContext*, const FloatVal& src, StringVal* dst);
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index 6a1a609..fa76cf1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -1434,6 +1434,20 @@ public class BuiltinsDb extends Db {
             "20DsThetaUnionFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
         true, false, true));
 
+    // DataSketches Theta intersect
+    db.addBuiltin(AggregateFunction.createBuiltin(db, "ds_theta_intersect",
+        Lists.<Type>newArrayList(Type.STRING), Type.STRING, Type.STRING,
+        prefix + "20DsThetaIntersectInitEPN10impala_udf15FunctionContextEPNS1_9StringValE",
+        prefix +
+            "22DsThetaIntersectUpdateEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+        prefix +
+            "21DsThetaIntersectMergeEPN10impala_udf15FunctionContextERKNS1_9StringValEPS4_",
+        prefix +
+            "25DsThetaIntersectSerializeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        prefix +
+            "24DsThetaIntersectFinalizeEPN10impala_udf15FunctionContextERKNS1_9StringValE",
+        true, false, true));
+
     for (Type t: Type.getSupportedTypes()) {
       if (t.isNull()) continue; // NULL is handled through type promotion.
       if (t.isScalarType(PrimitiveType.CHAR)) continue; // promoted to STRING
diff --git a/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test b/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
index 84ae7f9..fe7da72 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/datasketches-theta.test
@@ -233,3 +233,75 @@ BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
 ---- RESULTS
 5,7,6,6,7,4,4,3,0
 ====
+---- QUERY
+# Intersect the sketches in sketch_store and check.
+select
+    ds_theta_estimate(ds_theta_intersect(date_sketch)),
+    ds_theta_estimate(ds_theta_intersect(float_sketch))
+from sketch_store;
+---- TYPES
+BIGINT,BIGINT
+---- RESULTS
+0,10
+====
+---- QUERY
+# Checks that ds_theta_intersect() produces NULL for an empty dataset.
+select ds_theta_intersect(field) from functional_parquet.emptytable;
+---- TYPES
+STRING
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# Checks that ds_theta_intersect() produces NULL for NULL inputs.
+select ds_theta_intersect(null_str) from functional_parquet.nullrows;
+---- TYPES
+STRING
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# ds_theta_intersect() returns an error if it receives an invalid serialized sketch.
+select ds_theta_intersect(date_string_col) from functional_parquet.alltypestiny
+where id=1;
+---- CATCH
+UDF ERROR: Unable to deserialize sketch.
+====
+---- QUERY
+# Check that sketches made by Hive for intersection and estimation, which is consistent
+# with direct estimation of these sketches.
+select
+    ds_theta_estimate(ds_theta_intersect(ti)) as ti,
+    ds_theta_estimate(ds_theta_intersect(i)) as i,
+    ds_theta_estimate(ds_theta_intersect(bi)) as bi,
+    ds_theta_estimate(ds_theta_intersect(f)) as f,
+    ds_theta_estimate(ds_theta_intersect(d)) as d,
+    ds_theta_estimate(ds_theta_intersect(s)) as s,
+    ds_theta_estimate(ds_theta_intersect(c)) as c,
+    ds_theta_estimate(ds_theta_intersect(v)) as v,
+    ds_theta_estimate(ds_theta_intersect(nc)) as nc
+from theta_sketches_from_hive;
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+5,7,6,6,7,4,4,3,0
+====
+---- QUERY
+# Intersect the sketches from theta_sketches_impala_hive and checks if the intersection
+# produces the same result as if these sketches were used separately to get the estimates.
+select
+    ds_theta_estimate(ds_theta_intersect(ti)) as ti,
+    ds_theta_estimate(ds_theta_intersect(i)) as i,
+    ds_theta_estimate(ds_theta_intersect(bi)) as bi,
+    ds_theta_estimate(ds_theta_intersect(f)) as f,
+    ds_theta_estimate(ds_theta_intersect(d)) as d,
+    ds_theta_estimate(ds_theta_intersect(s)) as s,
+    ds_theta_estimate(ds_theta_intersect(c)) as c,
+    ds_theta_estimate(ds_theta_intersect(v)) as v,
+    ds_theta_estimate(ds_theta_intersect(nc)) as nc
+from theta_sketches_impala_hive;
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+5,7,6,6,7,4,4,3,0
+====
\ No newline at end of file