You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ti...@apache.org on 2023/05/16 22:55:36 UTC
[pinot] branch master updated: Add PercentileKLL aggregation function (#10643)
This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 41fbb32846 Add PercentileKLL aggregation function (#10643)
41fbb32846 is described below
commit 41fbb32846eb5ec6e656db6e57fa4aaa7177ff9f
Author: Caner Balci <ca...@gmail.com>
AuthorDate: Tue May 16 15:55:30 2023 -0700
Add PercentileKLL aggregation function (#10643)
* Add KllAggregationFunction for efficient percentile calculation
* Fix test
* Address review comments
* Fix merge issue
---
.../apache/pinot/core/common/ObjectSerDeUtils.java | 27 ++-
.../function/AggregationFunctionFactory.java | 43 ++++
.../function/PercentileKLLAggregationFunction.java | 255 ++++++++++++++++++++
.../PercentileKLLMVAggregationFunction.java | 126 ++++++++++
.../PercentileRawKLLAggregationFunction.java | 58 +++++
.../PercentileRawKLLMVAggregationFunction.java | 58 +++++
.../function/AggregationFunctionFactoryTest.java | 6 +
...SegmentAggregationMultiValueRawQueriesTest.java | 41 ++++
...erSegmentAggregationSingleValueQueriesTest.java | 111 +++++++++
.../pinot/queries/PercentileKLLMVQueriesTest.java | 94 ++++++++
.../pinot/queries/PercentileKLLQueriesTest.java | 256 +++++++++++++++++++++
.../segment/local/customobject/SerializedKLL.java | 49 ++++
.../pinot/segment/spi/AggregationFunctionType.java | 12 +
13 files changed, 1135 insertions(+), 1 deletion(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 9412014cef..14af9bac95 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -57,6 +57,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.datasketches.kll.KllDoublesSketch;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.Sketch;
import org.apache.pinot.common.CustomObject;
@@ -129,7 +130,8 @@ public class ObjectSerDeUtils {
CovarianceTuple(32),
VarianceTuple(33),
PinotFourthMoment(34),
- ArgMinMaxObject(35);
+ ArgMinMaxObject(35),
+ KllDataSketch(36);
private final int _value;
@@ -178,6 +180,8 @@ public class ObjectSerDeUtils {
return ObjectType.DistinctTable;
} else if (value instanceof Sketch) {
return ObjectType.DataSketch;
+ } else if (value instanceof KllDoublesSketch) {
+ return ObjectType.KllDataSketch;
} else if (value instanceof Geometry) {
return ObjectType.Geometry;
} else if (value instanceof RoaringBitmap) {
@@ -922,6 +926,26 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<KllDoublesSketch> KLL_SKETCH_SER_DE = new ObjectSerDe<KllDoublesSketch>() {
+
+ @Override
+ public byte[] serialize(KllDoublesSketch value) {
+ return value.toByteArray();
+ }
+
+ @Override
+ public KllDoublesSketch deserialize(byte[] bytes) {
+ return KllDoublesSketch.wrap(Memory.wrap(bytes));
+ }
+
+ @Override
+ public KllDoublesSketch deserialize(ByteBuffer byteBuffer) {
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return KllDoublesSketch.wrap(Memory.wrap(bytes));
+ }
+ };
+
public static final ObjectSerDe<Geometry> GEOMETRY_SER_DE = new ObjectSerDe<Geometry>() {
@Override
@@ -1273,6 +1297,7 @@ public class ObjectSerDeUtils {
VARIANCE_TUPLE_OBJECT_SER_DE,
PINOT_FOURTH_MOMENT_OBJECT_SER_DE,
ARG_MIN_MAX_OBJECT_SER_DE,
+ KLL_SKETCH_SER_DE,
};
//@formatter:on
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index ba3fc837c4..418b864400 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -55,6 +55,17 @@ public class AggregationFunctionFactory {
if (remainingFunctionName.equals("SMARTTDIGEST")) {
return new PercentileSmartTDigestAggregationFunction(arguments);
}
+ if (remainingFunctionName.contains("KLL")) {
+ if (remainingFunctionName.equals("KLL")) {
+ return new PercentileKLLAggregationFunction(arguments);
+ } else if (remainingFunctionName.equals("KLLMV")) {
+ return new PercentileKLLMVAggregationFunction(arguments);
+ } else if (remainingFunctionName.equals("RAWKLL")) {
+ return new PercentileRawKLLAggregationFunction(arguments);
+ } else if (remainingFunctionName.equals("RAWKLLMV")) {
+ return new PercentileRawKLLMVAggregationFunction(arguments);
+ }
+ }
int numArguments = arguments.size();
if (numArguments == 1) {
// Single argument percentile (e.g. Percentile99(foo), PercentileTDigest95(bar), etc.)
@@ -77,6 +88,14 @@ public class AggregationFunctionFactory {
// PercentileRawTDigest
String percentileString = remainingFunctionName.substring(10);
return new PercentileRawTDigestAggregationFunction(firstArgument, parsePercentileToInt(percentileString));
+ } else if (remainingFunctionName.matches("KLL\\d+")) {
+ // PercentileKLL
+ String percentileString = remainingFunctionName.substring(3);
+ return new PercentileKLLAggregationFunction(firstArgument, parsePercentileToInt(percentileString));
+ } else if (remainingFunctionName.matches("RAWKLL\\d+")) {
+ // PercentileRawKLL
+ String percentileString = remainingFunctionName.substring(6);
+ return new PercentileRawKLLAggregationFunction(firstArgument, parsePercentileToInt(percentileString));
} else if (remainingFunctionName.matches("\\d+MV")) {
// PercentileMV
String percentileString = remainingFunctionName.substring(0, remainingFunctionName.length() - 2);
@@ -97,6 +116,14 @@ public class AggregationFunctionFactory {
// PercentileRawTDigestMV
String percentileString = remainingFunctionName.substring(10, remainingFunctionName.length() - 2);
return new PercentileRawTDigestMVAggregationFunction(firstArgument, parsePercentileToInt(percentileString));
+ } else if (remainingFunctionName.matches("KLL\\d+MV")) {
+ // PercentileKLLMV
+ String percentileString = remainingFunctionName.substring(3, remainingFunctionName.length() - 2);
+ return new PercentileKLLMVAggregationFunction(firstArgument, parsePercentileToInt(percentileString));
+ } else if (remainingFunctionName.matches("RAWKLL\\d+MV")) {
+ // PercentileRawKLLMV
+ String percentileString = remainingFunctionName.substring(6, remainingFunctionName.length() - 2);
+ return new PercentileRawKLLMVAggregationFunction(firstArgument, parsePercentileToInt(percentileString));
}
} else if (numArguments == 2) {
// Double arguments percentile (e.g. percentile(foo, 99), percentileTDigest(bar, 95), etc.) where the
@@ -123,6 +150,14 @@ public class AggregationFunctionFactory {
// PercentileRawTDigest
return new PercentileRawTDigestAggregationFunction(firstArgument, percentile);
}
+ if (remainingFunctionName.equals("KLL")) {
+ // PercentileKLL
+ return new PercentileKLLAggregationFunction(firstArgument, percentile);
+ }
+ if (remainingFunctionName.equals("RAWKLL")) {
+ // PercentileRawKLL
+ return new PercentileRawKLLAggregationFunction(firstArgument, percentile);
+ }
if (remainingFunctionName.equals("MV")) {
// PercentileMV
return new PercentileMVAggregationFunction(firstArgument, percentile);
@@ -143,6 +178,14 @@ public class AggregationFunctionFactory {
// PercentileRawTDigestMV
return new PercentileRawTDigestMVAggregationFunction(firstArgument, percentile);
}
+ if (remainingFunctionName.equals("KLLMV")) {
+ // PercentileKLLMV
+ return new PercentileKLLMVAggregationFunction(firstArgument, percentile);
+ }
+ if (remainingFunctionName.equals("RAWKLLMV")) {
+ // PercentileRawKLLMV
+ return new PercentileRawKLLMVAggregationFunction(firstArgument, percentile);
+ }
} else if (numArguments == 3) {
// Triple arguments percentile (e.g. percentileTDigest(bar, 95, 1000), etc.) where the
// second argument is a decimal number from 0.0 to 100.0 and third argument is a decimal number indicating
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLAggregationFunction.java
new file mode 100644
index 0000000000..3584aafa41
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLAggregationFunction.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.datasketches.memory.Memory;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * <p>
+ * {@code PercentileKLLAggregationFunction} provides an approximate percentile calculator using the KLL algorithm
+ * from <a href="https://datasketches.apache.org/docs/KLL/KLLSketch.html">Apache DataSketches library</a>.
+ * </p>
+ * <p>
+ * The interface is similar to plain 'Percentile' function except for the optional K value which determines
+ * the size, hence the accuracy of the sketch.
+ * </p>
+ * <p><b>PERCENTILE_KLL(col, percentile, kValue)</b></p>
+ * <p>E.g.:</p>
+ * <ul>
+ * <li><b>PERCENTILE_KLL(col, 90)</b></li>
+ * <li><b>PERCENTILE_KLL(col, 99.9, 800)</b></li>
+ * </ul>
+ *
+ * <p>
+ * If the column type is BYTES, the aggregation function will assume it is a serialized KllDoubleSketch and will
+ * attempt to deserialize it for further processing.
+ * </p>
+ *
+ * <p>
+ * There is a variation of the function (<b>PERCENTILE_RAW_KLL</b>) that returns the Base64 encoded
+ * sketch object to be used externally.
+ * </p>
+ */
+public class PercentileKLLAggregationFunction
+ extends BaseSingleInputAggregationFunction<KllDoublesSketch, Comparable> {
+
+ protected final double _percentile;
+ protected int _kValue = 200; // size of the sketch. This is the default size used by DataSketches lib as well
+
+ public PercentileKLLAggregationFunction(List<ExpressionContext> arguments) {
+ super(arguments.get(0));
+
+ // Check that there are correct number of arguments
+ int numArguments = arguments.size();
+ Preconditions.checkArgument(numArguments == 2 || numArguments == 3,
+ "Expecting 2 or 3 arguments for PercentileKLL function: "
+ + "PERCENTILE_KLL(column, percentile, k=200");
+
+ _percentile = arguments.get(1).getLiteral().getDoubleValue();
+ Preconditions.checkArgument(_percentile >= 0 && _percentile <= 100,
+ "Percentile value needs to be in range 0-100, inclusive");
+ if (numArguments == 3) {
+ _kValue = arguments.get(2).getLiteral().getIntValue();
+ }
+ }
+
+ public PercentileKLLAggregationFunction(ExpressionContext expression, double percentile) {
+ super(expression);
+ _percentile = percentile;
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.PERCENTILEKLL;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet valueSet = blockValSetMap.get(_expression);
+ DataType valueType = valueSet.getValueType();
+ KllDoublesSketch sketch = getOrCreateSketch(aggregationResultHolder);
+
+ if (valueType == DataType.BYTES) {
+ // Assuming the column contains serialized data sketch
+ KllDoublesSketch[] deserializedSketches =
+ deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+ for (int i = 0; i < length; i++) {
+ sketch.merge(deserializedSketches[i]);
+ }
+ } else {
+ double[] values = valueSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ sketch.update(values[i]);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet valueSet = blockValSetMap.get(_expression);
+ DataType valueType = valueSet.getValueType();
+
+ if (valueType == DataType.BYTES) {
+ // serialized sketch
+ KllDoublesSketch[] deserializedSketches =
+ deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+ for (int i = 0; i < length; i++) {
+ KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]);
+ sketch.merge(deserializedSketches[i]);
+ }
+ } else {
+ double[] values = valueSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]);
+ sketch.update(values[i]);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet valueSet = blockValSetMap.get(_expression);
+ DataType valueType = valueSet.getValueType();
+
+ if (valueType == DataType.BYTES) {
+ // serialized sketch
+ KllDoublesSketch[] deserializedSketches =
+ deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKey);
+ sketch.merge(deserializedSketches[i]);
+ }
+ }
+ } else {
+ double[] values = valueSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKey);
+ sketch.update(values[i]);
+ }
+ }
+ }
+ }
+
+ /**
+ * Extracts the sketch from the result holder or creates a new one if it does not exist.
+ */
+ protected KllDoublesSketch getOrCreateSketch(AggregationResultHolder aggregationResultHolder) {
+ KllDoublesSketch sketch = aggregationResultHolder.getResult();
+ if (sketch == null) {
+ sketch = KllDoublesSketch.newHeapInstance(_kValue);
+ aggregationResultHolder.setValue(sketch);
+ }
+ return sketch;
+ }
+
+ /**
+ * Extracts the sketch from the group by result holder for key
+ * or creates a new one if it does not exist.
+ */
+ protected KllDoublesSketch getOrCreateSketch(GroupByResultHolder groupByResultHolder, int groupKey) {
+ KllDoublesSketch sketch = groupByResultHolder.getResult(groupKey);
+ if (sketch == null) {
+ sketch = KllDoublesSketch.newHeapInstance(_kValue);
+ groupByResultHolder.setValueForKey(groupKey, sketch);
+ }
+ return sketch;
+ }
+
+ /**
+ * Deserializes the sketches from the bytes.
+ */
+ protected KllDoublesSketch[] deserializeSketches(byte[][] serializedSketches) {
+ KllDoublesSketch[] sketches = new KllDoublesSketch[serializedSketches.length];
+ for (int i = 0; i < serializedSketches.length; i++) {
+ sketches[i] = KllDoublesSketch.wrap(Memory.wrap(serializedSketches[i]));
+ }
+ return sketches;
+ }
+
+ @Override
+ public KllDoublesSketch extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+ return aggregationResultHolder.getResult();
+ }
+
+ @Override
+ public KllDoublesSketch extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+ return groupByResultHolder.getResult(groupKey);
+ }
+
+ @Override
+ public KllDoublesSketch merge(KllDoublesSketch sketch1, KllDoublesSketch sketch2) {
+ KllDoublesSketch union = KllDoublesSketch.newHeapInstance(_kValue);
+ if (sketch1 != null) {
+ union.merge(sketch1);
+ }
+ if (sketch2 != null) {
+ union.merge(sketch2);
+ }
+ return union;
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.DOUBLE;
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return AggregationFunctionType.PERCENTILEKLL.getName().toLowerCase()
+ + "(" + _expression + ", " + _percentile + ")";
+ }
+
+ @Override
+ public Comparable extractFinalResult(KllDoublesSketch sketch) {
+ return sketch.getQuantile(_percentile / 100);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLMVAggregationFunction.java
new file mode 100644
index 0000000000..61d1bd0bce
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLMVAggregationFunction.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+public class PercentileKLLMVAggregationFunction extends PercentileKLLAggregationFunction {
+ public PercentileKLLMVAggregationFunction(ExpressionContext expression, double percentile) {
+ super(expression, percentile);
+ }
+
+ public PercentileKLLMVAggregationFunction(List<ExpressionContext> arguments) {
+ super(arguments);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet valueSet = blockValSetMap.get(_expression);
+ DataType valueType = valueSet.getValueType();
+ KllDoublesSketch sketch = getOrCreateSketch(aggregationResultHolder);
+
+ if (valueType == DataType.BYTES) {
+ // Assuming the column contains serialized data sketches
+ KllDoublesSketch[] deserializedSketches = deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+ for (int i = 0; i < length; i++) {
+ sketch.merge(deserializedSketches[i]);
+ }
+ } else {
+ double[][] values = valueSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (double val : values[i]) {
+ sketch.update(val);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet valueSet = blockValSetMap.get(_expression);
+ DataType valueType = valueSet.getValueType();
+
+ if (valueType == DataType.BYTES) {
+ // serialized sketch
+ KllDoublesSketch[] deserializedSketches = deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+ for (int i = 0; i < length; i++) {
+ KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]);
+ sketch.merge(deserializedSketches[i]);
+ }
+ } else {
+ double[][] values = valueSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]);
+ for (double val : values[i]) {
+ sketch.update(val);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet valueSet = blockValSetMap.get(_expression);
+ DataType valueType = valueSet.getValueType();
+
+ if (valueType == DataType.BYTES) {
+ // serialized sketch
+ KllDoublesSketch[] deserializedSketches = deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ KllDoublesSketch sketch = this.getOrCreateSketch(groupByResultHolder, groupKey);
+ sketch.merge(deserializedSketches[i]);
+ }
+ }
+ } else {
+ double[][] values = valueSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKey);
+ for (double val : values[i]) {
+ sketch.update(val);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.PERCENTILEKLLMV;
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return AggregationFunctionType.PERCENTILEKLLMV.getName().toLowerCase()
+ + "(" + _expression + ", " + _percentile + ")";
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLAggregationFunction.java
new file mode 100644
index 0000000000..48bd421ee3
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLAggregationFunction.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.SerializedKLL;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class PercentileRawKLLAggregationFunction extends PercentileKLLAggregationFunction {
+ public PercentileRawKLLAggregationFunction(ExpressionContext expression, double percentile) {
+ super(expression, percentile);
+ }
+
+ public PercentileRawKLLAggregationFunction(List<ExpressionContext> arguments) {
+ super(arguments);
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.PERCENTILERAWKLL;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.STRING;
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return AggregationFunctionType.PERCENTILERAWKLL.getName().toLowerCase()
+ + "(" + _expression + ", " + _percentile + ")";
+ }
+
+ @Override
+ public SerializedKLL extractFinalResult(KllDoublesSketch sketch) {
+ return new SerializedKLL(sketch, _percentile);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLMVAggregationFunction.java
new file mode 100644
index 0000000000..3c0885e970
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLMVAggregationFunction.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.SerializedKLL;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class PercentileRawKLLMVAggregationFunction extends PercentileKLLMVAggregationFunction {
+ public PercentileRawKLLMVAggregationFunction(ExpressionContext expression, double percentile) {
+ super(expression, percentile);
+ }
+
+ public PercentileRawKLLMVAggregationFunction(List<ExpressionContext> arguments) {
+ super(arguments);
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.PERCENTILERAWKLLMV;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.STRING;
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return AggregationFunctionType.PERCENTILERAWKLLMV.getName().toLowerCase()
+ + "(" + _expression + ", " + _percentile + ")";
+ }
+
+ @Override
+ public SerializedKLL extractFinalResult(KllDoublesSketch sketch) {
+ return new SerializedKLL(sketch, _percentile);
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index a0e10869e0..a4eaa1f991 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -281,6 +281,12 @@ public class AggregationFunctionFactoryTest {
assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILERAWTDIGEST);
assertEquals(aggregationFunction.getResultColumnName(), "percentilerawtdigest(column, 99.9999)");
+ function = getFunction("PeRcEntiLEkll", "(column, 99.9999)");
+ aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof PercentileKLLAggregationFunction);
+ assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILEKLL);
+ assertEquals(aggregationFunction.getResultColumnName(), "percentilekll(column, 99.9999)");
+
function = getFunction("PeRcEnTiLeRaWtDiGeSt", "(column, 99.9999, 500)");
aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
assertTrue(aggregationFunction instanceof PercentileRawTDigestAggregationFunction);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
index fdd9a0ed65..65480aa95f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
@@ -18,10 +18,13 @@
*/
package org.apache.pinot.queries;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
+import org.apache.calcite.avatica.util.Base64;
+import org.apache.datasketches.kll.KllDoublesSketch;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
@@ -41,6 +44,8 @@ public class InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
// Allow 5% quantile error due to the randomness of TDigest merge
private static final double PERCENTILE_TDIGEST_DELTA = 0.05 * Integer.MAX_VALUE;
+ // Allow 2% quantile error due to the randomness of KLL merge
+ private static final double PERCENTILE_KLL_DELTA = 0.02 * Integer.MAX_VALUE;
@Test
public void testCountMV() {
@@ -540,6 +545,42 @@ public class InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
getBrokerResponse(regularQuery + FILTER + MV_GROUP_BY), quantileExtractor, PERCENTILE_TDIGEST_DELTA);
}
+ @Test
+ public void testPercentileRawKLLMV() {
+ testPercentileRawKLLMV(50);
+ testPercentileRawKLLMV(90);
+ testPercentileRawKLLMV(95);
+ testPercentileRawKLLMV(99);
+ }
+
+ private void testPercentileRawKLLMV(int percentile) {
+ Function<Object, Object> quantileExtractor =
+ value -> {
+ try {
+ KllDoublesSketch sketch =
+ (KllDoublesSketch) ObjectSerDeUtils.KLL_SKETCH_SER_DE.deserialize(Base64.decode((String) value));
+ return sketch.getQuantile(percentile / 100.0);
+ } catch (IOException e) {
+ return null;
+ }
+ };
+
+ String rawKllQuery = String.format("SELECT PERCENTILERAWKLL%dMV(column6) AS value FROM testTable", percentile);
+ String regularQuery = String.format("SELECT PERCENTILE%dMV(column6) AS value FROM testTable", percentile);
+ QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery), getBrokerResponse(regularQuery),
+ quantileExtractor, PERCENTILE_KLL_DELTA);
+ QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + FILTER),
+ getBrokerResponse(regularQuery + FILTER), quantileExtractor, PERCENTILE_KLL_DELTA);
+ QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + SV_GROUP_BY),
+ getBrokerResponse(regularQuery + SV_GROUP_BY), quantileExtractor, PERCENTILE_KLL_DELTA);
+ QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + FILTER + SV_GROUP_BY),
+ getBrokerResponse(regularQuery + FILTER + SV_GROUP_BY), quantileExtractor, PERCENTILE_KLL_DELTA);
+ QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + MV_GROUP_BY),
+ getBrokerResponse(regularQuery + MV_GROUP_BY), quantileExtractor, PERCENTILE_KLL_DELTA);
+ QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + FILTER + MV_GROUP_BY),
+ getBrokerResponse(regularQuery + FILTER + MV_GROUP_BY), quantileExtractor, PERCENTILE_KLL_DELTA);
+ }
+
@Test
public void testNumGroupsLimit() {
String query = "SELECT COUNT(*) FROM testTable GROUP BY column6";
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
index 5fcad1307d..a6e8320962 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
@@ -40,6 +40,8 @@ public class InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
// Allow 5% quantile error due to the randomness of TDigest merge
private static final double PERCENTILE_TDIGEST_DELTA = 0.05 * Integer.MAX_VALUE;
+ // Allow 2% quantile error due to the randomness of KLL merge
+ private static final double PERCENTILE_KLL_DELTA = 0.02 * Integer.MAX_VALUE;
@Test
public void testCount() {
@@ -589,6 +591,115 @@ public class InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
getBrokerResponse(regularQuery + FILTER + GROUP_BY), quantileExtractor, PERCENTILE_TDIGEST_DELTA);
}
+ @Test
+ public void testPercentileKLL() {
+ String query = "SELECT PERCENTILEKLL(column1, 50) AS v1, PERCENTILEKLL(column3, 50) AS v2 FROM testTable";
+
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ DataSchema expectedDataSchema =
+ new DataSchema(new String[]{"v1", "v2"}, new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
+ ResultTable expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1107310944L, 1082130431L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+ 240000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ brokerResponse = getBrokerResponse(query + FILTER);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1139674505L, 509607935L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+ 49032L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ brokerResponse = getBrokerResponse(query + GROUP_BY);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146791843L, 1418523221L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+ 360000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2142595699L, 334963174L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+ 73548L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ query = "SELECT PERCENTILEKLL(column1, 90) AS v1, PERCENTILEKLL(column3, 90) AS v2 FROM testTable";
+
+ brokerResponse = getBrokerResponse(query);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1946157055L, 1946157055L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+ 240000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ brokerResponse = getBrokerResponse(query + FILTER);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1939865599L, 902299647L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+ 49032L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ brokerResponse = getBrokerResponse(query + GROUP_BY);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146791843L, 1418523221L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+ 360000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2142595699L, 334963174L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+ 73548L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ query = "SELECT PERCENTILEKLL(column1, 95) AS v1, PERCENTILEKLL(column3, 95) AS v2 FROM testTable";
+
+ brokerResponse = getBrokerResponse(query);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2080374783L, 2051014655L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+ 240000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ brokerResponse = getBrokerResponse(query + FILTER);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2109734911L, 950009855L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+ 49032L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ brokerResponse = getBrokerResponse(query + GROUP_BY);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146791843L, 1418523221L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+ 360000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2142595699L, 334963174L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+ 73548L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ query = "SELECT PERCENTILEKLL(column1, 99) AS v1, PERCENTILEKLL(column3, 99) AS v2 FROM testTable";
+
+ brokerResponse = getBrokerResponse(query);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2143289343L, 2143289343L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+ 240000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ brokerResponse = getBrokerResponse(query + FILTER);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146232405L, 991952895L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+ 49032L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ brokerResponse = getBrokerResponse(query + GROUP_BY);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146791843L, 1418523221L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+ 360000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+ brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY);
+ expectedResultTable =
+ new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146232405L, 993001471L}));
+ QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+ 73548L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+ }
+
@Test
public void testNumGroupsLimit() {
String query = "SELECT COUNT(*) FROM testTable GROUP BY column1";
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLMVQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLMVQueriesTest.java
new file mode 100644
index 0000000000..40a8b9046d
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLMVQueriesTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+
+
+/**
+ * Variation of the PercentileKLLQueriesTest suite which tests PERCENTILE_KLL_MV
+ */
+public class PercentileKLLMVQueriesTest extends PercentileKLLQueriesTest {
+ private static final int MAX_NUM_MULTI_VALUES = 10;
+
+ @Override
+ protected void buildSegment()
+ throws Exception {
+ List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
+ for (int i = 0; i < NUM_ROWS; i++) {
+ GenericRow row = new GenericRow();
+
+ int numMultiValues = RANDOM.nextInt(MAX_NUM_MULTI_VALUES) + 1;
+ Double[] values = new Double[numMultiValues];
+ KllDoublesSketch sketch = KllDoublesSketch.newHeapInstance();
+ for (int j = 0; j < numMultiValues; j++) {
+ double value = RANDOM.nextDouble() * VALUE_RANGE;
+ values[j] = value;
+ sketch.update(value);
+ }
+ row.putValue(DOUBLE_COLUMN, values);
+ row.putValue(KLL_COLUMN, sketch.toByteArray());
+
+ String group = GROUPS[RANDOM.nextInt(GROUPS.length)];
+ row.putValue(GROUP_BY_COLUMN, group);
+
+ rows.add(row);
+ }
+
+ Schema schema = new Schema();
+ schema.addField(new DimensionFieldSpec(DOUBLE_COLUMN, FieldSpec.DataType.DOUBLE, false));
+ schema.addField(new MetricFieldSpec(KLL_COLUMN, FieldSpec.DataType.BYTES));
+ schema.addField(new DimensionFieldSpec(GROUP_BY_COLUMN, FieldSpec.DataType.STRING, true));
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+ config.setOutDir(INDEX_DIR.getPath());
+ config.setTableName(TABLE_NAME);
+ config.setSegmentName(SEGMENT_NAME);
+ config.setRawIndexCreationColumns(Collections.singletonList(KLL_COLUMN));
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+ driver.init(config, recordReader);
+ driver.build();
+ }
+ }
+
+ @Override
+ protected String getAggregationQuery(int percentile) {
+ return String.format("SELECT PERCENTILE%1$dMV(%2$s), PERCENTILEKLL%1$dMV(%2$s), PERCENTILEKLL%1$d(%3$s), "
+ + "PERCENTILEMV(%2$s, %1$d), PERCENTILEKLLMV(%2$s, %1$d), PERCENTILEKLL(%3$s, %1$d) FROM %4$s",
+ percentile, DOUBLE_COLUMN, KLL_COLUMN, TABLE_NAME);
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLQueriesTest.java
new file mode 100644
index 0000000000..05e626e4b4
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLQueriesTest.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import it.unimi.dsi.fastutil.doubles.DoubleList;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import org.apache.pinot.core.operator.query.GroupByOperator;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+/**
+ * Tests for PERCENTILE_KLL aggregation function.
+ *
+ * <ul>
+ * <li>Generates a segment with a double column, a KLL column and a group-by column</li>
+ * <li>Runs aggregation and group-by queries on the generated segment</li>
+ * <li>
+ * Compares the results for PERCENTILE_KLL on double column and KLL column with results for PERCENTILE on
+ * double column
+ * </li>
+ * </ul>
+ */
+public class PercentileKLLQueriesTest extends BaseQueriesTest {
+ protected static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "PercentileKllQueriesTest");
+ protected static final String TABLE_NAME = "testTable";
+ protected static final String SEGMENT_NAME = "testSegment";
+
+ protected static final int NUM_ROWS = 1000;
+ protected static final int VALUE_RANGE = Integer.MAX_VALUE;
+ protected static final double DELTA = 0.05 * VALUE_RANGE; // Allow 5% delta
+ protected static final String DOUBLE_COLUMN = "doubleColumn";
+ protected static final String KLL_COLUMN = "kllColumn";
+ protected static final String GROUP_BY_COLUMN = "groupByColumn";
+ protected static final String[] GROUPS = new String[]{"G1", "G2", "G3"};
+ protected static final long RANDOM_SEED = System.nanoTime();
+ protected static final Random RANDOM = new Random(RANDOM_SEED);
+ protected static final String ERROR_MESSAGE = "Random seed: " + RANDOM_SEED;
+
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
+
+ @Override
+ protected String getFilter() {
+ return ""; // No filtering required for this test.
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteQuietly(INDEX_DIR);
+
+ buildSegment();
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+ }
+
+ protected void buildSegment()
+ throws Exception {
+ List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
+ for (int i = 0; i < NUM_ROWS; i++) {
+ GenericRow row = new GenericRow();
+
+ double value = RANDOM.nextDouble() * VALUE_RANGE;
+ row.putValue(DOUBLE_COLUMN, value);
+
+ KllDoublesSketch sketch = KllDoublesSketch.newHeapInstance();
+ sketch.update(value);
+ row.putValue(KLL_COLUMN, sketch.toByteArray());
+
+ String group = GROUPS[RANDOM.nextInt(GROUPS.length)];
+ row.putValue(GROUP_BY_COLUMN, group);
+
+ rows.add(row);
+ }
+
+ Schema schema = new Schema();
+ schema.addField(new MetricFieldSpec(DOUBLE_COLUMN, FieldSpec.DataType.DOUBLE));
+ schema.addField(new MetricFieldSpec(KLL_COLUMN, FieldSpec.DataType.BYTES));
+ schema.addField(new DimensionFieldSpec(GROUP_BY_COLUMN, FieldSpec.DataType.STRING, true));
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+ config.setOutDir(INDEX_DIR.getPath());
+ config.setTableName(TABLE_NAME);
+ config.setSegmentName(SEGMENT_NAME);
+ config.setRawIndexCreationColumns(Collections.singletonList(KLL_COLUMN));
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+ driver.init(config, recordReader);
+ driver.build();
+ }
+ }
+
+ @Test
+ public void testInnerSegmentAggregation() {
+ // For inner segment case, percentile does not affect the intermediate result
+ AggregationOperator aggregationOperator = getOperator(getAggregationQuery(0));
+ AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+ List<Object> aggregationResult = resultsBlock.getResults();
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 6);
+ DoubleList doubleList0 = (DoubleList) aggregationResult.get(0);
+ Collections.sort(doubleList0);
+ assertSketch((KllDoublesSketch) aggregationResult.get(1), doubleList0);
+ assertSketch((KllDoublesSketch) aggregationResult.get(2), doubleList0);
+
+ DoubleList doubleList3 = (DoubleList) aggregationResult.get(3);
+ Collections.sort(doubleList3);
+ assertEquals(doubleList3, doubleList0);
+ assertSketch((KllDoublesSketch) aggregationResult.get(4), doubleList0);
+ assertSketch((KllDoublesSketch) aggregationResult.get(5), doubleList0);
+ }
+
+ @Test
+ public void testInterSegmentAggregation() {
+ for (int percentile = 0; percentile <= 100; percentile++) {
+ BrokerResponseNative brokerResponse = getBrokerResponse(getAggregationQuery(percentile));
+ Object[] results = brokerResponse.getResultTable().getRows().get(0);
+ assertEquals(results.length, 6);
+ double expectedResult = (Double) results[0];
+ for (int i = 1; i < 6; i++) {
+ assertEquals((Double) results[i], expectedResult, DELTA, ERROR_MESSAGE);
+ }
+ }
+ }
+
+ @Test
+ public void testInnerSegmentGroupBy() {
+ // For inner segment case, percentile does not affect the intermediate result
+ GroupByOperator groupByOperator = getOperator(getGroupByQuery(0));
+ GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
+ AggregationGroupByResult groupByResult = resultsBlock.getAggregationGroupByResult();
+ assertNotNull(groupByResult);
+ Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = groupByResult.getGroupKeyIterator();
+ while (groupKeyIterator.hasNext()) {
+ int groupId = groupKeyIterator.next()._groupId;
+ DoubleList doubleList0 = (DoubleList) groupByResult.getResultForGroupId(0, groupId);
+ Collections.sort(doubleList0);
+ assertSketch((KllDoublesSketch) groupByResult.getResultForGroupId(1, groupId), doubleList0);
+ assertSketch((KllDoublesSketch) groupByResult.getResultForGroupId(2, groupId), doubleList0);
+
+ DoubleList doubleList3 = (DoubleList) groupByResult.getResultForGroupId(3, groupId);
+ Collections.sort(doubleList3);
+ assertEquals(doubleList3, doubleList0);
+ assertSketch((KllDoublesSketch) groupByResult.getResultForGroupId(4, groupId), doubleList0);
+ assertSketch((KllDoublesSketch) groupByResult.getResultForGroupId(5, groupId), doubleList0);
+ }
+ }
+
+ @Test
+ public void testInterSegmentGroupBy() {
+ for (int percentile = 0; percentile <= 100; percentile++) {
+ BrokerResponseNative brokerResponse = getBrokerResponse(getGroupByQuery(percentile));
+ List<Object[]> rows = brokerResponse.getResultTable().getRows();
+ assertEquals(rows.size(), 3);
+ for (Object[] row : rows) {
+ assertEquals(row.length, 6);
+ double expectedResult = (Double) row[0];
+ for (int i = 1; i < 6; i++) {
+ assertEquals((Double) row[i], expectedResult, DELTA, ERROR_MESSAGE);
+ }
+ }
+ }
+ }
+
+ protected String getAggregationQuery(int percentile) {
+ return String.format("SELECT PERCENTILE%1$d(%2$s), PERCENTILEKLL%1$d(%2$s), PERCENTILEKLL%1$d(%3$s), "
+ + "PERCENTILE(%2$s, %1$d), PERCENTILEKLL(%2$s, %1$d), PERCENTILEKLL(%3$s, %1$d) FROM %4$s",
+ percentile, DOUBLE_COLUMN, KLL_COLUMN, TABLE_NAME);
+ }
+
+ private String getGroupByQuery(int percentile) {
+ return String.format("%s GROUP BY %s", getAggregationQuery(percentile), GROUP_BY_COLUMN);
+ }
+
+ private void assertSketch(KllDoublesSketch sketch, DoubleList doubleList) {
+ for (int percentile = 0; percentile <= 100; percentile++) {
+ double expected;
+ if (percentile == 100) {
+ expected = doubleList.getDouble(doubleList.size() - 1);
+ } else {
+ expected = doubleList.getDouble(doubleList.size() * percentile / 100);
+ }
+ assertEquals(sketch.getQuantile(percentile / 100.0), expected, DELTA, ERROR_MESSAGE);
+ }
+ }
+
+ @AfterClass
+ public void tearDown() {
+ _indexSegment.destroy();
+ FileUtils.deleteQuietly(INDEX_DIR);
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedKLL.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedKLL.java
new file mode 100644
index 0000000000..f31f7a198d
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedKLL.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.Base64;
+import org.apache.datasketches.kll.KllDoublesSketch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Serialized and comparable version of KllDoublesSketch.
+ * Compares two sketches for a specific percentile value.
+ */
+public class SerializedKLL implements Comparable<SerializedKLL> {
+ private final double _quantile;
+ private final KllDoublesSketch _sketch;
+
+ public SerializedKLL(KllDoublesSketch sketch, double percentile) {
+ _sketch = sketch;
+ _quantile = percentile / 100.0;
+ }
+
+ @Override
+ public int compareTo(SerializedKLL other) {
+ checkArgument(other._quantile == _quantile, "Quantile numbers don't match");
+ return Double.compare(_sketch.getQuantile(_quantile), other._sketch.getQuantile(_quantile));
+ }
+
+ @Override
+ public String toString() {
+ return Base64.getEncoder().encodeToString(_sketch.toByteArray());
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 0a2a22dd00..a9d2085c8f 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -58,6 +58,8 @@ public enum AggregationFunctionType {
PERCENTILETDIGEST("percentileTDigest"),
PERCENTILERAWTDIGEST("percentileRawTDigest"),
PERCENTILESMARTTDIGEST("percentileSmartTDigest"),
+ PERCENTILEKLL("percentileKLL"),
+ PERCENTILERAWKLL("percentileRawKLL"),
IDSET("idSet"),
HISTOGRAM("histogram"),
COVARPOP("covarPop"),
@@ -91,6 +93,8 @@ public enum AggregationFunctionType {
PERCENTILERAWESTMV("percentileRawEstMV"),
PERCENTILETDIGESTMV("percentileTDigestMV"),
PERCENTILERAWTDIGESTMV("percentileRawTDigestMV"),
+ PERCENTILEKLLMV("percentileKLLMV"),
+ PERCENTILERAWKLLMV("percentileRawKLLMV"),
DISTINCT("distinct"),
// boolean aggregate functions
@@ -151,6 +155,10 @@ public enum AggregationFunctionType {
return PERCENTILETDIGEST;
} else if (remainingFunctionName.equals("RAWTDIGEST") || remainingFunctionName.matches("RAWTDIGEST\\d+")) {
return PERCENTILERAWTDIGEST;
+ } else if (remainingFunctionName.equals("KLL") || remainingFunctionName.matches("KLL\\d+")) {
+ return PERCENTILEKLL;
+ } else if (remainingFunctionName.equals("RAWKLL") || remainingFunctionName.matches("RAWKLL\\d+")) {
+ return PERCENTILERAWKLL;
} else if (remainingFunctionName.equals("MV") || remainingFunctionName.matches("\\d+MV")) {
return PERCENTILEMV;
} else if (remainingFunctionName.equals("ESTMV") || remainingFunctionName.matches("EST\\d+MV")) {
@@ -161,6 +169,10 @@ public enum AggregationFunctionType {
return PERCENTILETDIGESTMV;
} else if (remainingFunctionName.equals("RAWTDIGESTMV") || remainingFunctionName.matches("RAWTDIGEST\\d+MV")) {
return PERCENTILERAWTDIGESTMV;
+ } else if (remainingFunctionName.equals("KLLMV") || remainingFunctionName.matches("KLL\\d+MV")) {
+ return PERCENTILEKLLMV;
+ } else if (remainingFunctionName.equals("RAWKLLMV") || remainingFunctionName.matches("RAWKLL\\d+MV")) {
+ return PERCENTILEKLLMV;
} else {
throw new IllegalArgumentException("Invalid aggregation function name: " + functionName);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org