You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/07/27 18:50:44 UTC

[pinot] branch master updated: add theta sketch scalar (#11153)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 11b85df331 add theta sketch scalar (#11153)
11b85df331 is described below

commit 11b85df3318ab414a7dff30e2b1d979259b984fc
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Jul 27 11:50:36 2023 -0700

    add theta sketch scalar (#11153)
    
    * add theta sketch scalar functions
    * fix sketch name to add "theta" specifically to differentiate other sketch types
    
    ---------
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../core/function/scalar/SketchFunctions.java      | 92 ++++++++++++++++++++++
 .../src/test/resources/queries/UDFAggregates.json  | 20 +++++
 2 files changed, 112 insertions(+)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
index f6245bec6f..13577e7beb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
@@ -20,8 +20,15 @@ package org.apache.pinot.core.function.scalar;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import java.math.BigDecimal;
+import java.util.Base64;
 import javax.annotation.Nullable;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.AnotB;
+import org.apache.datasketches.theta.Intersection;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Sketches;
+import org.apache.datasketches.theta.Union;
 import org.apache.datasketches.theta.UpdateSketch;
 import org.apache.datasketches.tuple.aninteger.IntegerSketch;
 import org.apache.datasketches.tuple.aninteger.IntegerSummary;
@@ -65,6 +72,8 @@ import org.apache.pinot.spi.utils.CommonConstants;
  * }
  */
 public class SketchFunctions {
+  private static final SetOperationBuilder SET_OPERATION_BUILDER = new SetOperationBuilder();
+
   private SketchFunctions() {
   }
 
@@ -184,4 +193,87 @@ public class SketchFunctions {
     }
     return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact());
   }
+
+  @ScalarFunction(names = {"getThetaSketchEstimate", "get_theta_sketch_estimate"})
+  public static long getThetaSketchEstimate(Object sketchObject) {
+    return Math.round(asThetaSketch(sketchObject).getEstimate());
+  }
+
+  @ScalarFunction(names = {"thetaSketchUnion", "theta_sketch_union"})
+  public static Sketch thetaSketchUnion(Object o1, Object o2) {
+    return thetaSketchUnionVar(o1, o2);
+  }
+
+  @ScalarFunction(names = {"thetaSketchUnion", "theta_sketch_union"})
+  public static Sketch thetaSketchUnion(Object o1, Object o2, Object o3) {
+    return thetaSketchUnionVar(o1, o2, o3);
+  }
+
+  @ScalarFunction(names = {"thetaSketchUnion", "theta_sketch_union"})
+  public static Sketch thetaSketchUnion(Object o1, Object o2, Object o3, Object o4) {
+    return thetaSketchUnionVar(o1, o2, o3, o4);
+  }
+
+  @ScalarFunction(names = {"thetaSketchUnion", "theta_sketch_union"})
+  public static Sketch thetaSketchUnion(Object o1, Object o2, Object o3, Object o4, Object o5) {
+    return thetaSketchUnionVar(o1, o2, o3, o4, o5);
+  }
+
+  @ScalarFunction(names = {"thetaSketchIntersect", "theta_sketch_intersect"})
+  public static Sketch thetaSketchIntersect(Object o1, Object o2) {
+    return thetaSketchIntersectVar(o1, o2);
+  }
+
+  @ScalarFunction(names = {"thetaSketchIntersect", "theta_sketch_intersect"})
+  public static Sketch thetaSketchIntersect(Object o1, Object o2, Object o3) {
+    return thetaSketchIntersectVar(o1, o2, o3);
+  }
+
+  @ScalarFunction(names = {"thetaSketchIntersect", "theta_sketch_intersect"})
+  public static Sketch thetaSketchIntersect(Object o1, Object o2, Object o3, Object o4) {
+    return thetaSketchIntersectVar(o1, o2, o3, o4);
+  }
+
+  @ScalarFunction(names = {"thetaSketchIntersect", "theta_sketch_intersect"})
+  public static Sketch thetaSketchIntersect(Object o1, Object o2, Object o3, Object o4, Object o5) {
+    return thetaSketchIntersectVar(o1, o2, o3, o4, o5);
+  }
+
+  @ScalarFunction(names = {"thetaSketchDiff", "theta_sketch_diff"})
+  public static Sketch thetaSketchDiff(Object sketchObjectA, Object sketchObjectB) {
+    AnotB diff = SET_OPERATION_BUILDER.buildANotB();
+    diff.setA(asThetaSketch(sketchObjectA));
+    diff.notB(asThetaSketch(sketchObjectB));
+    return diff.getResult(false, null, false);
+  }
+
+  private static Sketch thetaSketchUnionVar(Object... sketchObjects) {
+    Union union = SET_OPERATION_BUILDER.buildUnion();
+    for (Object sketchObj : sketchObjects) {
+      union.union(asThetaSketch(sketchObj));
+    }
+    return union.getResult(false, null);
+  }
+
+  private static Sketch thetaSketchIntersectVar(Object... sketchObjects) {
+    Intersection intersection = SET_OPERATION_BUILDER.buildIntersection();
+    for (Object sketchObj : sketchObjects) {
+      intersection.intersect(asThetaSketch(sketchObj));
+    }
+    return intersection.getResult(false, null);
+  }
+
+  private static Sketch asThetaSketch(Object sketchObj) {
+    if (sketchObj instanceof String) {
+      byte[] decoded = Base64.getDecoder().decode((String) sketchObj);
+      return Sketches.wrapSketch(Memory.wrap((decoded)));
+    } else if (sketchObj instanceof Sketch) {
+      return (Sketch) sketchObj;
+    } else if (sketchObj instanceof byte[]) {
+      return Sketches.wrapSketch(Memory.wrap((byte[]) sketchObj));
+    } else {
+      throw new RuntimeException("Exception occurred getting estimate from Theta Sketch, unsupported Object type: "
+          + sketchObj.getClass());
+    }
+  }
 }
diff --git a/pinot-query-runtime/src/test/resources/queries/UDFAggregates.json b/pinot-query-runtime/src/test/resources/queries/UDFAggregates.json
index f5a765e9ad..1bc39d0953 100644
--- a/pinot-query-runtime/src/test/resources/queries/UDFAggregates.json
+++ b/pinot-query-runtime/src/test/resources/queries/UDFAggregates.json
@@ -63,6 +63,26 @@
       {
         "sql": "SELECT SUMPRECISION(decimal_col) FROM {tbl}",
         "outputs": [["10000000000100000000110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 [...]
+      },
+      {
+        "sql": "SELECT string_col, SUMPRECISION(decimal_col) FROM {tbl} GROUP BY string_col",
+        "outputs": [["a" ,"10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 [...]
+      },
+      {
+        "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ string_col, SUMPRECISION(decimal_col) FROM {tbl} GROUP BY string_col",
+        "outputs": [["a" ,"10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 [...]
+      },
+      {
+        "sql": "select GET_THETA_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_THETA_SKETCH(string_col, 'nominalEntries=16')), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_DIFF(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_UNION(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(long_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_INTERSECT(DISTINCT_COUNT_RAW_THETA_SKETCH(double_col, ''),  [...]
+        "outputs": [[3, 0, 6, 6]]
+      },
+      {
+        "sql": "select bool_col, GET_THETA_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_THETA_SKETCH(string_col, 'nominalEntries=16')), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_DIFF(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_UNION(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(long_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_INTERSECT(DISTINCT_COUNT_RAW_THETA_SKETCH(double_ [...]
+        "outputs": [[true, 2, 0, 2, 3], [false, 2, 0, 4, 3]]
+      },
+      {
+        "sql": "select /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ bool_col, GET_THETA_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_THETA_SKETCH(string_col, 'nominalEntries=16')), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_DIFF(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_UNION(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(long_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_S [...]
+        "outputs": [[true, 2, 0, 2, 3], [false, 2, 0, 4, 3]]
       }
     ]
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org