You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ti...@apache.org on 2023/05/16 22:55:36 UTC

[pinot] branch master updated: Add PercentileKLL aggregation function (#10643)

This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 41fbb32846 Add PercentileKLL aggregation function (#10643)
41fbb32846 is described below

commit 41fbb32846eb5ec6e656db6e57fa4aaa7177ff9f
Author: Caner Balci <ca...@gmail.com>
AuthorDate: Tue May 16 15:55:30 2023 -0700

    Add PercentileKLL aggregation function (#10643)
    
    * Add KllAggregationFunction for efficient percentile calculation
    
    * Fix test
    
    * Address review comments
    
    * Fix merge issue
---
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  27 ++-
 .../function/AggregationFunctionFactory.java       |  43 ++++
 .../function/PercentileKLLAggregationFunction.java | 255 ++++++++++++++++++++
 .../PercentileKLLMVAggregationFunction.java        | 126 ++++++++++
 .../PercentileRawKLLAggregationFunction.java       |  58 +++++
 .../PercentileRawKLLMVAggregationFunction.java     |  58 +++++
 .../function/AggregationFunctionFactoryTest.java   |   6 +
 ...SegmentAggregationMultiValueRawQueriesTest.java |  41 ++++
 ...erSegmentAggregationSingleValueQueriesTest.java | 111 +++++++++
 .../pinot/queries/PercentileKLLMVQueriesTest.java  |  94 ++++++++
 .../pinot/queries/PercentileKLLQueriesTest.java    | 256 +++++++++++++++++++++
 .../segment/local/customobject/SerializedKLL.java  |  49 ++++
 .../pinot/segment/spi/AggregationFunctionType.java |  12 +
 13 files changed, 1135 insertions(+), 1 deletion(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 9412014cef..14af9bac95 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -57,6 +57,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.datasketches.kll.KllDoublesSketch;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.pinot.common.CustomObject;
@@ -129,7 +130,8 @@ public class ObjectSerDeUtils {
     CovarianceTuple(32),
     VarianceTuple(33),
     PinotFourthMoment(34),
-    ArgMinMaxObject(35);
+    ArgMinMaxObject(35),
+    KllDataSketch(36);
 
     private final int _value;
 
@@ -178,6 +180,8 @@ public class ObjectSerDeUtils {
         return ObjectType.DistinctTable;
       } else if (value instanceof Sketch) {
         return ObjectType.DataSketch;
+      } else if (value instanceof KllDoublesSketch) {
+        return ObjectType.KllDataSketch;
       } else if (value instanceof Geometry) {
         return ObjectType.Geometry;
       } else if (value instanceof RoaringBitmap) {
@@ -922,6 +926,26 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<KllDoublesSketch> KLL_SKETCH_SER_DE = new ObjectSerDe<KllDoublesSketch>() {
+
+    @Override
+    public byte[] serialize(KllDoublesSketch value) {
+      return value.toByteArray();
+    }
+
+    @Override
+    public KllDoublesSketch deserialize(byte[] bytes) {
+      return KllDoublesSketch.wrap(Memory.wrap(bytes));
+    }
+
+    @Override
+    public KllDoublesSketch deserialize(ByteBuffer byteBuffer) {
+      byte[] bytes = new byte[byteBuffer.remaining()];
+      byteBuffer.get(bytes);
+      return KllDoublesSketch.wrap(Memory.wrap(bytes));
+    }
+  };
+
   public static final ObjectSerDe<Geometry> GEOMETRY_SER_DE = new ObjectSerDe<Geometry>() {
 
     @Override
@@ -1273,6 +1297,7 @@ public class ObjectSerDeUtils {
       VARIANCE_TUPLE_OBJECT_SER_DE,
       PINOT_FOURTH_MOMENT_OBJECT_SER_DE,
       ARG_MIN_MAX_OBJECT_SER_DE,
+      KLL_SKETCH_SER_DE,
   };
   //@formatter:on
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index ba3fc837c4..418b864400 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -55,6 +55,17 @@ public class AggregationFunctionFactory {
         if (remainingFunctionName.equals("SMARTTDIGEST")) {
           return new PercentileSmartTDigestAggregationFunction(arguments);
         }
+        if (remainingFunctionName.contains("KLL")) {
+          if (remainingFunctionName.equals("KLL")) {
+            return new PercentileKLLAggregationFunction(arguments);
+          } else if (remainingFunctionName.equals("KLLMV")) {
+            return new PercentileKLLMVAggregationFunction(arguments);
+          } else if (remainingFunctionName.equals("RAWKLL")) {
+            return new PercentileRawKLLAggregationFunction(arguments);
+          } else if (remainingFunctionName.equals("RAWKLLMV")) {
+            return new PercentileRawKLLMVAggregationFunction(arguments);
+          }
+        }
         int numArguments = arguments.size();
         if (numArguments == 1) {
           // Single argument percentile (e.g. Percentile99(foo), PercentileTDigest95(bar), etc.)
@@ -77,6 +88,14 @@ public class AggregationFunctionFactory {
             // PercentileRawTDigest
             String percentileString = remainingFunctionName.substring(10);
             return new PercentileRawTDigestAggregationFunction(firstArgument, parsePercentileToInt(percentileString));
+          } else if (remainingFunctionName.matches("KLL\\d+")) {
+            // PercentileKLL
+            String percentileString = remainingFunctionName.substring(3);
+            return new PercentileKLLAggregationFunction(firstArgument, parsePercentileToInt(percentileString));
+          } else if (remainingFunctionName.matches("RAWKLL\\d+")) {
+            // PercentileRawKLL
+            String percentileString = remainingFunctionName.substring(6);
+            return new PercentileRawKLLAggregationFunction(firstArgument, parsePercentileToInt(percentileString));
           } else if (remainingFunctionName.matches("\\d+MV")) {
             // PercentileMV
             String percentileString = remainingFunctionName.substring(0, remainingFunctionName.length() - 2);
@@ -97,6 +116,14 @@ public class AggregationFunctionFactory {
             // PercentileRawTDigestMV
             String percentileString = remainingFunctionName.substring(10, remainingFunctionName.length() - 2);
             return new PercentileRawTDigestMVAggregationFunction(firstArgument, parsePercentileToInt(percentileString));
+          } else if (remainingFunctionName.matches("KLL\\d+MV")) {
+            // PercentileKLLMV
+            String percentileString = remainingFunctionName.substring(3, remainingFunctionName.length() - 2);
+            return new PercentileKLLMVAggregationFunction(firstArgument, parsePercentileToInt(percentileString));
+          } else if (remainingFunctionName.matches("RAWKLL\\d+MV")) {
+            // PercentileRawKLLMV
+            String percentileString = remainingFunctionName.substring(6, remainingFunctionName.length() - 2);
+            return new PercentileRawKLLMVAggregationFunction(firstArgument, parsePercentileToInt(percentileString));
           }
         } else if (numArguments == 2) {
           // Double arguments percentile (e.g. percentile(foo, 99), percentileTDigest(bar, 95), etc.) where the
@@ -123,6 +150,14 @@ public class AggregationFunctionFactory {
             // PercentileRawTDigest
             return new PercentileRawTDigestAggregationFunction(firstArgument, percentile);
           }
+          if (remainingFunctionName.equals("KLL")) {
+            // PercentileKLL
+            return new PercentileKLLAggregationFunction(firstArgument, percentile);
+          }
+          if (remainingFunctionName.equals("RAWKLL")) {
+            // PercentileRawKLL
+            return new PercentileRawKLLAggregationFunction(firstArgument, percentile);
+          }
           if (remainingFunctionName.equals("MV")) {
             // PercentileMV
             return new PercentileMVAggregationFunction(firstArgument, percentile);
@@ -143,6 +178,14 @@ public class AggregationFunctionFactory {
             // PercentileRawTDigestMV
             return new PercentileRawTDigestMVAggregationFunction(firstArgument, percentile);
           }
+          if (remainingFunctionName.equals("KLLMV")) {
+            // PercentileKLLMV
+            return new PercentileKLLMVAggregationFunction(firstArgument, percentile);
+          }
+          if (remainingFunctionName.equals("RAWKLLMV")) {
+            // PercentileRawKLLMV
+            return new PercentileRawKLLMVAggregationFunction(firstArgument, percentile);
+          }
         } else if (numArguments == 3) {
           // Triple arguments percentile (e.g. percentileTDigest(bar, 95, 1000), etc.) where the
           // second argument is a decimal number from 0.0 to 100.0 and third argument is a decimal number indicating
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLAggregationFunction.java
new file mode 100644
index 0000000000..3584aafa41
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLAggregationFunction.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.datasketches.memory.Memory;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * <p>
+ *  {@code PercentileKLLAggregationFunction} provides an approximate percentile calculator using the KLL algorithm
+ *  from <a href="https://datasketches.apache.org/docs/KLL/KLLSketch.html">Apache DataSketches library</a>.
+ * </p>
+ * <p>
+ *  The interface is similar to plain 'Percentile' function except for the optional K value which determines
+ *  the size, hence the accuracy of the sketch.
+ * </p>
+ * <p><b>PERCENTILE_KLL(col, percentile, kValue)</b></p>
+ * <p>E.g.:</p>
+ * <ul>
+ *   <li><b>PERCENTILE_KLL(col, 90)</b></li>
+ *   <li><b>PERCENTILE_KLL(col, 99.9, 800)</b></li>
+ * </ul>
+ *
+ * <p>
+ *   If the column type is BYTES, the aggregation function will assume it is a serialized KllDoubleSketch and will
+ *   attempt to deserialize it for further processing.
+ * </p>
+ *
+ * <p>
+ *   There is a variation of the function (<b>PERCENTILE_RAW_KLL</b>) that returns the Base64 encoded
+ *   sketch object to be used externally.
+ * </p>
+ */
+public class PercentileKLLAggregationFunction
+    extends BaseSingleInputAggregationFunction<KllDoublesSketch, Comparable> {
+
+  protected final double _percentile;
+  protected int _kValue = 200; // size of the sketch. This is the default size used by DataSketches lib as well
+
+  public PercentileKLLAggregationFunction(List<ExpressionContext> arguments) {
+    super(arguments.get(0));
+
+    // Check that there are correct number of arguments
+    int numArguments = arguments.size();
+    Preconditions.checkArgument(numArguments == 2 || numArguments == 3,
+        "Expecting 2 or 3 arguments for PercentileKLL function: "
+            + "PERCENTILE_KLL(column, percentile, k=200");
+
+    _percentile = arguments.get(1).getLiteral().getDoubleValue();
+    Preconditions.checkArgument(_percentile >= 0 && _percentile <= 100,
+            "Percentile value needs to be in range 0-100, inclusive");
+    if (numArguments == 3) {
+      _kValue = arguments.get(2).getLiteral().getIntValue();
+    }
+  }
+
+  public PercentileKLLAggregationFunction(ExpressionContext expression, double percentile) {
+    super(expression);
+    _percentile = percentile;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.PERCENTILEKLL;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet valueSet = blockValSetMap.get(_expression);
+    DataType valueType = valueSet.getValueType();
+    KllDoublesSketch sketch = getOrCreateSketch(aggregationResultHolder);
+
+    if (valueType == DataType.BYTES) {
+      // Assuming the column contains serialized data sketch
+      KllDoublesSketch[] deserializedSketches =
+          deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+      for (int i = 0; i < length; i++) {
+        sketch.merge(deserializedSketches[i]);
+      }
+    } else {
+      double[] values = valueSet.getDoubleValuesSV();
+      for (int i = 0; i < length; i++) {
+        sketch.update(values[i]);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet valueSet = blockValSetMap.get(_expression);
+    DataType valueType = valueSet.getValueType();
+
+    if (valueType == DataType.BYTES) {
+      // serialized sketch
+      KllDoublesSketch[] deserializedSketches =
+          deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+      for (int i = 0; i < length; i++) {
+        KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]);
+        sketch.merge(deserializedSketches[i]);
+      }
+    } else {
+      double[] values = valueSet.getDoubleValuesSV();
+      for (int i = 0; i < length; i++) {
+        KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]);
+        sketch.update(values[i]);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet valueSet = blockValSetMap.get(_expression);
+    DataType valueType = valueSet.getValueType();
+
+    if (valueType == DataType.BYTES) {
+      // serialized sketch
+      KllDoublesSketch[] deserializedSketches =
+          deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+      for (int i = 0; i < length; i++) {
+        for (int groupKey : groupKeysArray[i]) {
+          KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKey);
+          sketch.merge(deserializedSketches[i]);
+        }
+      }
+    } else {
+      double[] values = valueSet.getDoubleValuesSV();
+      for (int i = 0; i < length; i++) {
+        for (int groupKey : groupKeysArray[i]) {
+          KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKey);
+          sketch.update(values[i]);
+        }
+      }
+    }
+  }
+
+  /**
+   * Extracts the sketch from the result holder or creates a new one if it does not exist.
+   */
+  protected KllDoublesSketch getOrCreateSketch(AggregationResultHolder aggregationResultHolder) {
+    KllDoublesSketch sketch = aggregationResultHolder.getResult();
+    if (sketch == null) {
+      sketch = KllDoublesSketch.newHeapInstance(_kValue);
+      aggregationResultHolder.setValue(sketch);
+    }
+    return sketch;
+  }
+
+  /**
+   * Extracts the sketch from the group by result holder for key
+   * or creates a new one if it does not exist.
+   */
+  protected KllDoublesSketch getOrCreateSketch(GroupByResultHolder groupByResultHolder, int groupKey) {
+    KllDoublesSketch sketch = groupByResultHolder.getResult(groupKey);
+    if (sketch == null) {
+      sketch = KllDoublesSketch.newHeapInstance(_kValue);
+      groupByResultHolder.setValueForKey(groupKey, sketch);
+    }
+    return sketch;
+  }
+
+  /**
+   * Deserializes the sketches from the bytes.
+   */
+  protected KllDoublesSketch[] deserializeSketches(byte[][] serializedSketches) {
+    KllDoublesSketch[] sketches = new KllDoublesSketch[serializedSketches.length];
+    for (int i = 0; i < serializedSketches.length; i++) {
+      sketches[i] = KllDoublesSketch.wrap(Memory.wrap(serializedSketches[i]));
+    }
+    return sketches;
+  }
+
+  @Override
+  public KllDoublesSketch extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    return aggregationResultHolder.getResult();
+  }
+
+  @Override
+  public KllDoublesSketch extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    return groupByResultHolder.getResult(groupKey);
+  }
+
+  @Override
+  public KllDoublesSketch merge(KllDoublesSketch sketch1, KllDoublesSketch sketch2) {
+    KllDoublesSketch union = KllDoublesSketch.newHeapInstance(_kValue);
+    if (sketch1 != null) {
+      union.merge(sketch1);
+    }
+    if (sketch2 != null) {
+      union.merge(sketch2);
+    }
+    return union;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.DOUBLE;
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return AggregationFunctionType.PERCENTILEKLL.getName().toLowerCase()
+        + "(" + _expression + ", " + _percentile + ")";
+  }
+
+  @Override
+  public Comparable extractFinalResult(KllDoublesSketch sketch) {
+    return sketch.getQuantile(_percentile / 100);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLMVAggregationFunction.java
new file mode 100644
index 0000000000..61d1bd0bce
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLMVAggregationFunction.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+public class PercentileKLLMVAggregationFunction extends PercentileKLLAggregationFunction {
+  public PercentileKLLMVAggregationFunction(ExpressionContext expression, double percentile) {
+    super(expression, percentile);
+  }
+
+  public PercentileKLLMVAggregationFunction(List<ExpressionContext> arguments) {
+    super(arguments);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet valueSet = blockValSetMap.get(_expression);
+    DataType valueType = valueSet.getValueType();
+    KllDoublesSketch sketch = getOrCreateSketch(aggregationResultHolder);
+
+    if (valueType == DataType.BYTES) {
+      // Assuming the column contains serialized data sketches
+      KllDoublesSketch[] deserializedSketches = deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+      for (int i = 0; i < length; i++) {
+        sketch.merge(deserializedSketches[i]);
+      }
+    } else {
+      double[][] values = valueSet.getDoubleValuesMV();
+      for (int i = 0; i < length; i++) {
+        for (double val : values[i]) {
+          sketch.update(val);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet valueSet = blockValSetMap.get(_expression);
+    DataType valueType = valueSet.getValueType();
+
+    if (valueType == DataType.BYTES) {
+      // serialized sketch
+      KllDoublesSketch[] deserializedSketches = deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+      for (int i = 0; i < length; i++) {
+        KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]);
+        sketch.merge(deserializedSketches[i]);
+      }
+    } else {
+      double[][] values = valueSet.getDoubleValuesMV();
+      for (int i = 0; i < length; i++) {
+        KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKeyArray[i]);
+        for (double val : values[i]) {
+          sketch.update(val);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet valueSet = blockValSetMap.get(_expression);
+    DataType valueType = valueSet.getValueType();
+
+    if (valueType == DataType.BYTES) {
+      // serialized sketch
+      KllDoublesSketch[] deserializedSketches = deserializeSketches(blockValSetMap.get(_expression).getBytesValuesSV());
+      for (int i = 0; i < length; i++) {
+        for (int groupKey : groupKeysArray[i]) {
+          KllDoublesSketch sketch = this.getOrCreateSketch(groupByResultHolder, groupKey);
+          sketch.merge(deserializedSketches[i]);
+        }
+      }
+    } else {
+      double[][] values = valueSet.getDoubleValuesMV();
+      for (int i = 0; i < length; i++) {
+        for (int groupKey : groupKeysArray[i]) {
+          KllDoublesSketch sketch = getOrCreateSketch(groupByResultHolder, groupKey);
+          for (double val : values[i]) {
+            sketch.update(val);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.PERCENTILEKLLMV;
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return AggregationFunctionType.PERCENTILEKLLMV.getName().toLowerCase()
+        + "(" + _expression + ", " + _percentile + ")";
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLAggregationFunction.java
new file mode 100644
index 0000000000..48bd421ee3
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLAggregationFunction.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.SerializedKLL;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class PercentileRawKLLAggregationFunction extends PercentileKLLAggregationFunction {
+  public PercentileRawKLLAggregationFunction(ExpressionContext expression, double percentile) {
+    super(expression, percentile);
+  }
+
+  public PercentileRawKLLAggregationFunction(List<ExpressionContext> arguments) {
+    super(arguments);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.PERCENTILERAWKLL;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.STRING;
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return AggregationFunctionType.PERCENTILERAWKLL.getName().toLowerCase()
+        + "(" + _expression + ", " + _percentile + ")";
+  }
+
+  @Override
+  public SerializedKLL extractFinalResult(KllDoublesSketch sketch) {
+    return new SerializedKLL(sketch, _percentile);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLMVAggregationFunction.java
new file mode 100644
index 0000000000..3c0885e970
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileRawKLLMVAggregationFunction.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.SerializedKLL;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class PercentileRawKLLMVAggregationFunction extends PercentileKLLMVAggregationFunction {
+  public PercentileRawKLLMVAggregationFunction(ExpressionContext expression, double percentile) {
+    super(expression, percentile);
+  }
+
+  public PercentileRawKLLMVAggregationFunction(List<ExpressionContext> arguments) {
+    super(arguments);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.PERCENTILERAWKLLMV;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.STRING;
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return AggregationFunctionType.PERCENTILERAWKLLMV.getName().toLowerCase()
+        + "(" + _expression + ", " + _percentile + ")";
+  }
+
+  @Override
+  public SerializedKLL extractFinalResult(KllDoublesSketch sketch) {
+    return new SerializedKLL(sketch, _percentile);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index a0e10869e0..a4eaa1f991 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -281,6 +281,12 @@ public class AggregationFunctionFactoryTest {
     assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILERAWTDIGEST);
     assertEquals(aggregationFunction.getResultColumnName(), "percentilerawtdigest(column, 99.9999)");
 
+    function = getFunction("PeRcEntiLEkll", "(column, 99.9999)");
+    aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof PercentileKLLAggregationFunction);
+    assertEquals(aggregationFunction.getType(), AggregationFunctionType.PERCENTILEKLL);
+    assertEquals(aggregationFunction.getResultColumnName(), "percentilekll(column, 99.9999)");
+
     function = getFunction("PeRcEnTiLeRaWtDiGeSt", "(column, 99.9999, 500)");
     aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
     assertTrue(aggregationFunction instanceof PercentileRawTDigestAggregationFunction);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
index fdd9a0ed65..65480aa95f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationMultiValueRawQueriesTest.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pinot.queries;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.Function;
+import org.apache.calcite.avatica.util.Base64;
+import org.apache.datasketches.kll.KllDoublesSketch;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
@@ -41,6 +44,8 @@ public class InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
 
   // Allow 5% quantile error due to the randomness of TDigest merge
   private static final double PERCENTILE_TDIGEST_DELTA = 0.05 * Integer.MAX_VALUE;
+  // Allow 2% quantile error due to the randomness of KLL merge
+  private static final double PERCENTILE_KLL_DELTA = 0.02 * Integer.MAX_VALUE;
 
   @Test
   public void testCountMV() {
@@ -540,6 +545,42 @@ public class InterSegmentAggregationMultiValueRawQueriesTest extends BaseMultiVa
         getBrokerResponse(regularQuery + FILTER + MV_GROUP_BY), quantileExtractor, PERCENTILE_TDIGEST_DELTA);
   }
 
+  @Test
+  public void testPercentileRawKLLMV() {
+    testPercentileRawKLLMV(50);
+    testPercentileRawKLLMV(90);
+    testPercentileRawKLLMV(95);
+    testPercentileRawKLLMV(99);
+  }
+
+  private void testPercentileRawKLLMV(int percentile) {
+    Function<Object, Object> quantileExtractor =
+        value -> {
+          try {
+            KllDoublesSketch sketch =
+                (KllDoublesSketch) ObjectSerDeUtils.KLL_SKETCH_SER_DE.deserialize(Base64.decode((String) value));
+            return sketch.getQuantile(percentile / 100.0);
+          } catch (IOException e) {
+            return null;
+          }
+        };
+
+    String rawKllQuery = String.format("SELECT PERCENTILERAWKLL%dMV(column6) AS value FROM testTable", percentile);
+    String regularQuery = String.format("SELECT PERCENTILE%dMV(column6) AS value FROM testTable", percentile);
+    QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery), getBrokerResponse(regularQuery),
+        quantileExtractor, PERCENTILE_KLL_DELTA);
+    QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + FILTER),
+        getBrokerResponse(regularQuery + FILTER), quantileExtractor, PERCENTILE_KLL_DELTA);
+    QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + SV_GROUP_BY),
+        getBrokerResponse(regularQuery + SV_GROUP_BY), quantileExtractor, PERCENTILE_KLL_DELTA);
+    QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + FILTER + SV_GROUP_BY),
+        getBrokerResponse(regularQuery + FILTER + SV_GROUP_BY), quantileExtractor, PERCENTILE_KLL_DELTA);
+    QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + MV_GROUP_BY),
+        getBrokerResponse(regularQuery + MV_GROUP_BY), quantileExtractor, PERCENTILE_KLL_DELTA);
+    QueriesTestUtils.testInterSegmentsResult(getBrokerResponse(rawKllQuery + FILTER + MV_GROUP_BY),
+        getBrokerResponse(regularQuery + FILTER + MV_GROUP_BY), quantileExtractor, PERCENTILE_KLL_DELTA);
+  }
+
   @Test
   public void testNumGroupsLimit() {
     String query = "SELECT COUNT(*) FROM testTable GROUP BY column6";
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
index 5fcad1307d..a6e8320962 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentAggregationSingleValueQueriesTest.java
@@ -40,6 +40,8 @@ public class InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
 
   // Allow 5% quantile error due to the randomness of TDigest merge
   private static final double PERCENTILE_TDIGEST_DELTA = 0.05 * Integer.MAX_VALUE;
+  // Allow 2% quantile error due to the randomness of KLL merge
+  private static final double PERCENTILE_KLL_DELTA = 0.02 * Integer.MAX_VALUE;
 
   @Test
   public void testCount() {
@@ -589,6 +591,115 @@ public class InterSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
         getBrokerResponse(regularQuery + FILTER + GROUP_BY), quantileExtractor, PERCENTILE_TDIGEST_DELTA);
   }
 
+  @Test
+  public void testPercentileKLL() {
+    String query = "SELECT PERCENTILEKLL(column1, 50) AS v1, PERCENTILEKLL(column3, 50) AS v2 FROM testTable";
+
+    BrokerResponseNative brokerResponse = getBrokerResponse(query);
+    DataSchema expectedDataSchema =
+        new DataSchema(new String[]{"v1", "v2"}, new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
+    ResultTable expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1107310944L, 1082130431L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+        240000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    brokerResponse = getBrokerResponse(query + FILTER);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1139674505L, 509607935L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+        49032L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    brokerResponse = getBrokerResponse(query + GROUP_BY);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146791843L, 1418523221L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+        360000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2142595699L, 334963174L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+        73548L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    query = "SELECT PERCENTILEKLL(column1, 90) AS v1, PERCENTILEKLL(column3, 90) AS v2 FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1946157055L, 1946157055L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+        240000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    brokerResponse = getBrokerResponse(query + FILTER);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{1939865599L, 902299647L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+        49032L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    brokerResponse = getBrokerResponse(query + GROUP_BY);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146791843L, 1418523221L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+        360000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2142595699L, 334963174L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+        73548L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    query = "SELECT PERCENTILEKLL(column1, 95) AS v1, PERCENTILEKLL(column3, 95) AS v2 FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2080374783L, 2051014655L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+        240000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    brokerResponse = getBrokerResponse(query + FILTER);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2109734911L, 950009855L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+        49032L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    brokerResponse = getBrokerResponse(query + GROUP_BY);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146791843L, 1418523221L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+        360000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2142595699L, 334963174L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+        73548L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    query = "SELECT PERCENTILEKLL(column1, 99) AS v1, PERCENTILEKLL(column3, 99) AS v2 FROM testTable";
+
+    brokerResponse = getBrokerResponse(query);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2143289343L, 2143289343L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+        240000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    brokerResponse = getBrokerResponse(query + FILTER);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146232405L, 991952895L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+        49032L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    brokerResponse = getBrokerResponse(query + GROUP_BY);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146791843L, 1418523221L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 120000L, 0L,
+        360000L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+
+    brokerResponse = getBrokerResponse(query + FILTER + GROUP_BY);
+    expectedResultTable =
+        new ResultTable(expectedDataSchema, Collections.singletonList(new Object[]{2146232405L, 993001471L}));
+    QueriesTestUtils.testInterSegmentsResult(brokerResponse, 24516L, 252256L,
+        73548L, 120000L, expectedResultTable, PERCENTILE_KLL_DELTA);
+  }
+
   @Test
   public void testNumGroupsLimit() {
     String query = "SELECT COUNT(*) FROM testTable GROUP BY column1";
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLMVQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLMVQueriesTest.java
new file mode 100644
index 0000000000..40a8b9046d
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLMVQueriesTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+
+
+/**
+ * Variation of the PercentileKLLQueriesTest suite which tests PERCENTILE_KLL_MV
+ */
+public class PercentileKLLMVQueriesTest extends PercentileKLLQueriesTest {
+  private static final int MAX_NUM_MULTI_VALUES = 10;
+
+  @Override
+  protected void buildSegment()
+      throws Exception {
+    List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      GenericRow row = new GenericRow();
+
+      int numMultiValues = RANDOM.nextInt(MAX_NUM_MULTI_VALUES) + 1;
+      Double[] values = new Double[numMultiValues];
+      KllDoublesSketch sketch = KllDoublesSketch.newHeapInstance();
+      for (int j = 0; j < numMultiValues; j++) {
+        double value = RANDOM.nextDouble() * VALUE_RANGE;
+        values[j] = value;
+        sketch.update(value);
+      }
+      row.putValue(DOUBLE_COLUMN, values);
+      row.putValue(KLL_COLUMN, sketch.toByteArray());
+
+      String group = GROUPS[RANDOM.nextInt(GROUPS.length)];
+      row.putValue(GROUP_BY_COLUMN, group);
+
+      rows.add(row);
+    }
+
+    Schema schema = new Schema();
+    schema.addField(new DimensionFieldSpec(DOUBLE_COLUMN, FieldSpec.DataType.DOUBLE, false));
+    schema.addField(new MetricFieldSpec(KLL_COLUMN, FieldSpec.DataType.BYTES));
+    schema.addField(new DimensionFieldSpec(GROUP_BY_COLUMN, FieldSpec.DataType.STRING, true));
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+    config.setOutDir(INDEX_DIR.getPath());
+    config.setTableName(TABLE_NAME);
+    config.setSegmentName(SEGMENT_NAME);
+    config.setRawIndexCreationColumns(Collections.singletonList(KLL_COLUMN));
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+      driver.init(config, recordReader);
+      driver.build();
+    }
+  }
+
+  @Override
+  protected String getAggregationQuery(int percentile) {
+    return String.format("SELECT PERCENTILE%1$dMV(%2$s), PERCENTILEKLL%1$dMV(%2$s), PERCENTILEKLL%1$d(%3$s), "
+            + "PERCENTILEMV(%2$s, %1$d), PERCENTILEKLLMV(%2$s, %1$d), PERCENTILEKLL(%3$s, %1$d) FROM %4$s",
+        percentile, DOUBLE_COLUMN, KLL_COLUMN, TABLE_NAME);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLQueriesTest.java
new file mode 100644
index 0000000000..05e626e4b4
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileKLLQueriesTest.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import it.unimi.dsi.fastutil.doubles.DoubleList;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.datasketches.kll.KllDoublesSketch;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import org.apache.pinot.core.operator.query.GroupByOperator;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+/**
+ * Tests for PERCENTILE_KLL aggregation function.
+ *
+ * <ul>
+ *   <li>Generates a segment with a double column, a KLL column and a group-by column</li>
+ *   <li>Runs aggregation and group-by queries on the generated segment</li>
+ *   <li>
+ *     Compares the results for PERCENTILE_KLL on double column and KLL column with results for PERCENTILE on
+ *     double column
+ *   </li>
+ * </ul>
+ */
+public class PercentileKLLQueriesTest extends BaseQueriesTest {
+  protected static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "PercentileKllQueriesTest");
+  protected static final String TABLE_NAME = "testTable";
+  protected static final String SEGMENT_NAME = "testSegment";
+
+  protected static final int NUM_ROWS = 1000;
+  protected static final int VALUE_RANGE = Integer.MAX_VALUE;
+  protected static final double DELTA = 0.05 * VALUE_RANGE; // Allow 5% delta
+  protected static final String DOUBLE_COLUMN = "doubleColumn";
+  protected static final String KLL_COLUMN = "kllColumn";
+  protected static final String GROUP_BY_COLUMN = "groupByColumn";
+  protected static final String[] GROUPS = new String[]{"G1", "G2", "G3"};
+  protected static final long RANDOM_SEED = System.nanoTime();
+  protected static final Random RANDOM = new Random(RANDOM_SEED);
+  protected static final String ERROR_MESSAGE = "Random seed: " + RANDOM_SEED;
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return ""; // No filtering required for this test.
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    buildSegment();
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  protected void buildSegment()
+      throws Exception {
+    List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      GenericRow row = new GenericRow();
+
+      double value = RANDOM.nextDouble() * VALUE_RANGE;
+      row.putValue(DOUBLE_COLUMN, value);
+
+      KllDoublesSketch sketch = KllDoublesSketch.newHeapInstance();
+      sketch.update(value);
+      row.putValue(KLL_COLUMN, sketch.toByteArray());
+
+      String group = GROUPS[RANDOM.nextInt(GROUPS.length)];
+      row.putValue(GROUP_BY_COLUMN, group);
+
+      rows.add(row);
+    }
+
+    Schema schema = new Schema();
+    schema.addField(new MetricFieldSpec(DOUBLE_COLUMN, FieldSpec.DataType.DOUBLE));
+    schema.addField(new MetricFieldSpec(KLL_COLUMN, FieldSpec.DataType.BYTES));
+    schema.addField(new DimensionFieldSpec(GROUP_BY_COLUMN, FieldSpec.DataType.STRING, true));
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+    config.setOutDir(INDEX_DIR.getPath());
+    config.setTableName(TABLE_NAME);
+    config.setSegmentName(SEGMENT_NAME);
+    config.setRawIndexCreationColumns(Collections.singletonList(KLL_COLUMN));
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+      driver.init(config, recordReader);
+      driver.build();
+    }
+  }
+
+  @Test
+  public void testInnerSegmentAggregation() {
+    // For inner segment case, percentile does not affect the intermediate result
+    AggregationOperator aggregationOperator = getOperator(getAggregationQuery(0));
+    AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    List<Object> aggregationResult = resultsBlock.getResults();
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 6);
+    DoubleList doubleList0 = (DoubleList) aggregationResult.get(0);
+    Collections.sort(doubleList0);
+    assertSketch((KllDoublesSketch) aggregationResult.get(1), doubleList0);
+    assertSketch((KllDoublesSketch) aggregationResult.get(2), doubleList0);
+
+    DoubleList doubleList3 = (DoubleList) aggregationResult.get(3);
+    Collections.sort(doubleList3);
+    assertEquals(doubleList3, doubleList0);
+    assertSketch((KllDoublesSketch) aggregationResult.get(4), doubleList0);
+    assertSketch((KllDoublesSketch) aggregationResult.get(5), doubleList0);
+  }
+
+  @Test
+  public void testInterSegmentAggregation() {
+    for (int percentile = 0; percentile <= 100; percentile++) {
+      BrokerResponseNative brokerResponse = getBrokerResponse(getAggregationQuery(percentile));
+      Object[] results = brokerResponse.getResultTable().getRows().get(0);
+      assertEquals(results.length, 6);
+      double expectedResult = (Double) results[0];
+      for (int i = 1; i < 6; i++) {
+        assertEquals((Double) results[i], expectedResult, DELTA, ERROR_MESSAGE);
+      }
+    }
+  }
+
+  @Test
+  public void testInnerSegmentGroupBy() {
+    // For inner segment case, percentile does not affect the intermediate result
+    GroupByOperator groupByOperator = getOperator(getGroupByQuery(0));
+    GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
+    AggregationGroupByResult groupByResult = resultsBlock.getAggregationGroupByResult();
+    assertNotNull(groupByResult);
+    Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = groupByResult.getGroupKeyIterator();
+    while (groupKeyIterator.hasNext()) {
+      int groupId = groupKeyIterator.next()._groupId;
+      DoubleList doubleList0 = (DoubleList) groupByResult.getResultForGroupId(0, groupId);
+      Collections.sort(doubleList0);
+      assertSketch((KllDoublesSketch) groupByResult.getResultForGroupId(1, groupId), doubleList0);
+      assertSketch((KllDoublesSketch) groupByResult.getResultForGroupId(2, groupId), doubleList0);
+
+      DoubleList doubleList3 = (DoubleList) groupByResult.getResultForGroupId(3, groupId);
+      Collections.sort(doubleList3);
+      assertEquals(doubleList3, doubleList0);
+      assertSketch((KllDoublesSketch) groupByResult.getResultForGroupId(4, groupId), doubleList0);
+      assertSketch((KllDoublesSketch) groupByResult.getResultForGroupId(5, groupId), doubleList0);
+    }
+  }
+
+  @Test
+  public void testInterSegmentGroupBy() {
+    for (int percentile = 0; percentile <= 100; percentile++) {
+      BrokerResponseNative brokerResponse = getBrokerResponse(getGroupByQuery(percentile));
+      List<Object[]> rows = brokerResponse.getResultTable().getRows();
+      assertEquals(rows.size(), 3);
+      for (Object[] row : rows) {
+        assertEquals(row.length, 6);
+        double expectedResult = (Double) row[0];
+        for (int i = 1; i < 6; i++) {
+          assertEquals((Double) row[i], expectedResult, DELTA, ERROR_MESSAGE);
+        }
+      }
+    }
+  }
+
+  protected String getAggregationQuery(int percentile) {
+    return String.format("SELECT PERCENTILE%1$d(%2$s), PERCENTILEKLL%1$d(%2$s), PERCENTILEKLL%1$d(%3$s), "
+            + "PERCENTILE(%2$s, %1$d), PERCENTILEKLL(%2$s, %1$d), PERCENTILEKLL(%3$s, %1$d) FROM %4$s",
+        percentile, DOUBLE_COLUMN, KLL_COLUMN, TABLE_NAME);
+  }
+
+  private String getGroupByQuery(int percentile) {
+    return String.format("%s GROUP BY %s", getAggregationQuery(percentile), GROUP_BY_COLUMN);
+  }
+
+  private void assertSketch(KllDoublesSketch sketch, DoubleList doubleList) {
+    for (int percentile = 0; percentile <= 100; percentile++) {
+      double expected;
+      if (percentile == 100) {
+        expected = doubleList.getDouble(doubleList.size() - 1);
+      } else {
+        expected = doubleList.getDouble(doubleList.size() * percentile / 100);
+      }
+      assertEquals(sketch.getQuantile(percentile / 100.0), expected, DELTA, ERROR_MESSAGE);
+    }
+  }
+
+  @AfterClass
+  public void tearDown() {
+    _indexSegment.destroy();
+    FileUtils.deleteQuietly(INDEX_DIR);
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedKLL.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedKLL.java
new file mode 100644
index 0000000000..f31f7a198d
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/SerializedKLL.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.Base64;
+import org.apache.datasketches.kll.KllDoublesSketch;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * Serialized and comparable version of KllDoublesSketch.
+ * Compares two sketches for a specific percentile value.
+ */
+public class SerializedKLL implements Comparable<SerializedKLL> {
+  private final double _quantile;
+  private final KllDoublesSketch _sketch;
+
+  public SerializedKLL(KllDoublesSketch sketch, double percentile) {
+    _sketch = sketch;
+    _quantile = percentile / 100.0;
+  }
+
+  @Override
+  public int compareTo(SerializedKLL other) {
+    checkArgument(other._quantile == _quantile, "Quantile numbers don't match");
+    return Double.compare(_sketch.getQuantile(_quantile), other._sketch.getQuantile(_quantile));
+  }
+
+  @Override
+  public String toString() {
+    return Base64.getEncoder().encodeToString(_sketch.toByteArray());
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 0a2a22dd00..a9d2085c8f 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -58,6 +58,8 @@ public enum AggregationFunctionType {
   PERCENTILETDIGEST("percentileTDigest"),
   PERCENTILERAWTDIGEST("percentileRawTDigest"),
   PERCENTILESMARTTDIGEST("percentileSmartTDigest"),
+  PERCENTILEKLL("percentileKLL"),
+  PERCENTILERAWKLL("percentileRawKLL"),
   IDSET("idSet"),
   HISTOGRAM("histogram"),
   COVARPOP("covarPop"),
@@ -91,6 +93,8 @@ public enum AggregationFunctionType {
   PERCENTILERAWESTMV("percentileRawEstMV"),
   PERCENTILETDIGESTMV("percentileTDigestMV"),
   PERCENTILERAWTDIGESTMV("percentileRawTDigestMV"),
+  PERCENTILEKLLMV("percentileKLLMV"),
+  PERCENTILERAWKLLMV("percentileRawKLLMV"),
   DISTINCT("distinct"),
 
   // boolean aggregate functions
@@ -151,6 +155,10 @@ public enum AggregationFunctionType {
         return PERCENTILETDIGEST;
       } else if (remainingFunctionName.equals("RAWTDIGEST") || remainingFunctionName.matches("RAWTDIGEST\\d+")) {
         return PERCENTILERAWTDIGEST;
+      } else if (remainingFunctionName.equals("KLL") || remainingFunctionName.matches("KLL\\d+")) {
+        return PERCENTILEKLL;
+      } else if (remainingFunctionName.equals("RAWKLL") || remainingFunctionName.matches("RAWKLL\\d+")) {
+        return PERCENTILERAWKLL;
       } else if (remainingFunctionName.equals("MV") || remainingFunctionName.matches("\\d+MV")) {
         return PERCENTILEMV;
       } else if (remainingFunctionName.equals("ESTMV") || remainingFunctionName.matches("EST\\d+MV")) {
@@ -161,6 +169,10 @@ public enum AggregationFunctionType {
         return PERCENTILETDIGESTMV;
       } else if (remainingFunctionName.equals("RAWTDIGESTMV") || remainingFunctionName.matches("RAWTDIGEST\\d+MV")) {
         return PERCENTILERAWTDIGESTMV;
+      } else if (remainingFunctionName.equals("KLLMV") || remainingFunctionName.matches("KLL\\d+MV")) {
+        return PERCENTILEKLLMV;
+      } else if (remainingFunctionName.equals("RAWKLLMV") || remainingFunctionName.matches("RAWKLL\\d+MV")) {
+        return PERCENTILEKLLMV;
       } else {
         throw new IllegalArgumentException("Invalid aggregation function name: " + functionName);
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org