You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/07/27 18:50:44 UTC
[pinot] branch master updated: add theta sketch scalar (#11153)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 11b85df331 add theta sketch scalar (#11153)
11b85df331 is described below
commit 11b85df3318ab414a7dff30e2b1d979259b984fc
Author: Rong Rong <ro...@apache.org>
AuthorDate: Thu Jul 27 11:50:36 2023 -0700
add theta sketch scalar (#11153)
* add theta sketch scalar functions
* fix sketch name to add "theta" specifically to differentiate other sketch types
---------
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../core/function/scalar/SketchFunctions.java | 92 ++++++++++++++++++++++
.../src/test/resources/queries/UDFAggregates.json | 20 +++++
2 files changed, 112 insertions(+)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
index f6245bec6f..13577e7beb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
@@ -20,8 +20,15 @@ package org.apache.pinot.core.function.scalar;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import java.math.BigDecimal;
+import java.util.Base64;
import javax.annotation.Nullable;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.AnotB;
+import org.apache.datasketches.theta.Intersection;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Sketches;
+import org.apache.datasketches.theta.Union;
import org.apache.datasketches.theta.UpdateSketch;
import org.apache.datasketches.tuple.aninteger.IntegerSketch;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
@@ -65,6 +72,8 @@ import org.apache.pinot.spi.utils.CommonConstants;
* }
*/
public class SketchFunctions {
+ private static final SetOperationBuilder SET_OPERATION_BUILDER = new SetOperationBuilder();
+
private SketchFunctions() {
}
@@ -184,4 +193,87 @@ public class SketchFunctions {
}
return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact());
}
+
+ @ScalarFunction(names = {"getThetaSketchEstimate", "get_theta_sketch_estimate"})
+ public static long getThetaSketchEstimate(Object sketchObject) {
+ return Math.round(asThetaSketch(sketchObject).getEstimate());
+ }
+
+ @ScalarFunction(names = {"thetaSketchUnion", "theta_sketch_union"})
+ public static Sketch thetaSketchUnion(Object o1, Object o2) {
+ return thetaSketchUnionVar(o1, o2);
+ }
+
+ @ScalarFunction(names = {"thetaSketchUnion", "theta_sketch_union"})
+ public static Sketch thetaSketchUnion(Object o1, Object o2, Object o3) {
+ return thetaSketchUnionVar(o1, o2, o3);
+ }
+
+ @ScalarFunction(names = {"thetaSketchUnion", "theta_sketch_union"})
+ public static Sketch thetaSketchUnion(Object o1, Object o2, Object o3, Object o4) {
+ return thetaSketchUnionVar(o1, o2, o3, o4);
+ }
+
+ @ScalarFunction(names = {"thetaSketchUnion", "theta_sketch_union"})
+ public static Sketch thetaSketchUnion(Object o1, Object o2, Object o3, Object o4, Object o5) {
+ return thetaSketchUnionVar(o1, o2, o3, o4, o5);
+ }
+
+ @ScalarFunction(names = {"thetaSketchIntersect", "theta_sketch_intersect"})
+ public static Sketch thetaSketchIntersect(Object o1, Object o2) {
+ return thetaSketchIntersectVar(o1, o2);
+ }
+
+ @ScalarFunction(names = {"thetaSketchIntersect", "theta_sketch_intersect"})
+ public static Sketch thetaSketchIntersect(Object o1, Object o2, Object o3) {
+ return thetaSketchIntersectVar(o1, o2, o3);
+ }
+
+ @ScalarFunction(names = {"thetaSketchIntersect", "theta_sketch_intersect"})
+ public static Sketch thetaSketchIntersect(Object o1, Object o2, Object o3, Object o4) {
+ return thetaSketchIntersectVar(o1, o2, o3, o4);
+ }
+
+ @ScalarFunction(names = {"thetaSketchIntersect", "theta_sketch_intersect"})
+ public static Sketch thetaSketchIntersect(Object o1, Object o2, Object o3, Object o4, Object o5) {
+ return thetaSketchIntersectVar(o1, o2, o3, o4, o5);
+ }
+
+ @ScalarFunction(names = {"thetaSketchDiff", "theta_sketch_diff"})
+ public static Sketch thetaSketchDiff(Object sketchObjectA, Object sketchObjectB) {
+ AnotB diff = SET_OPERATION_BUILDER.buildANotB();
+ diff.setA(asThetaSketch(sketchObjectA));
+ diff.notB(asThetaSketch(sketchObjectB));
+ return diff.getResult(false, null, false);
+ }
+
+ private static Sketch thetaSketchUnionVar(Object... sketchObjects) {
+ Union union = SET_OPERATION_BUILDER.buildUnion();
+ for (Object sketchObj : sketchObjects) {
+ union.union(asThetaSketch(sketchObj));
+ }
+ return union.getResult(false, null);
+ }
+
+ private static Sketch thetaSketchIntersectVar(Object... sketchObjects) {
+ Intersection intersection = SET_OPERATION_BUILDER.buildIntersection();
+ for (Object sketchObj : sketchObjects) {
+ intersection.intersect(asThetaSketch(sketchObj));
+ }
+ return intersection.getResult(false, null);
+ }
+
+ private static Sketch asThetaSketch(Object sketchObj) {
+ if (sketchObj instanceof String) {
+ byte[] decoded = Base64.getDecoder().decode((String) sketchObj);
+ return Sketches.wrapSketch(Memory.wrap((decoded)));
+ } else if (sketchObj instanceof Sketch) {
+ return (Sketch) sketchObj;
+ } else if (sketchObj instanceof byte[]) {
+ return Sketches.wrapSketch(Memory.wrap((byte[]) sketchObj));
+ } else {
+ throw new RuntimeException("Exception occurred getting estimate from Theta Sketch, unsupported Object type: "
+ + sketchObj.getClass());
+ }
+ }
}
diff --git a/pinot-query-runtime/src/test/resources/queries/UDFAggregates.json b/pinot-query-runtime/src/test/resources/queries/UDFAggregates.json
index f5a765e9ad..1bc39d0953 100644
--- a/pinot-query-runtime/src/test/resources/queries/UDFAggregates.json
+++ b/pinot-query-runtime/src/test/resources/queries/UDFAggregates.json
@@ -63,6 +63,26 @@
{
"sql": "SELECT SUMPRECISION(decimal_col) FROM {tbl}",
"outputs": [["10000000000100000000110000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 [...]
+ },
+ {
+ "sql": "SELECT string_col, SUMPRECISION(decimal_col) FROM {tbl} GROUP BY string_col",
+ "outputs": [["a" ,"10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 [...]
+ },
+ {
+ "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ string_col, SUMPRECISION(decimal_col) FROM {tbl} GROUP BY string_col",
+ "outputs": [["a" ,"10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 [...]
+ },
+ {
+ "sql": "select GET_THETA_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_THETA_SKETCH(string_col, 'nominalEntries=16')), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_DIFF(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_UNION(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(long_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_INTERSECT(DISTINCT_COUNT_RAW_THETA_SKETCH(double_col, ''), [...]
+ "outputs": [[3, 0, 6, 6]]
+ },
+ {
+ "sql": "select bool_col, GET_THETA_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_THETA_SKETCH(string_col, 'nominalEntries=16')), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_DIFF(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_UNION(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(long_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_INTERSECT(DISTINCT_COUNT_RAW_THETA_SKETCH(double_ [...]
+ "outputs": [[true, 2, 0, 2, 3], [false, 2, 0, 4, 3]]
+ },
+ {
+ "sql": "select /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ bool_col, GET_THETA_SKETCH_ESTIMATE(DISTINCT_COUNT_RAW_THETA_SKETCH(string_col, 'nominalEntries=16')), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_DIFF(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_SKETCH_UNION(DISTINCT_COUNT_RAW_THETA_SKETCH(int_col, ''), DISTINCT_COUNT_RAW_THETA_SKETCH(long_col, ''))), GET_THETA_SKETCH_ESTIMATE(THETA_S [...]
+ "outputs": [[true, 2, 0, 2, 3], [false, 2, 0, 4, 3]]
}
]
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org