You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/16 22:16:15 UTC

[incubator-pinot] 01/01: Support default BYTES (zero-length byte array) in aggregation function and aggregator

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch default_bytes
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit c8169e409e2217064e42c268077786d21da46134
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Thu May 16 15:13:57 2019 -0700

    Support default BYTES (zero-length byte array) in aggregation function and aggregator
    
    Treat zero-length byte array as null, and skip processing it
    Add DefaultBytesQueriesTest for test coverage
---
 .../core/data/aggregator/AvgValueAggregator.java   |  15 +-
 .../DistinctCountHLLValueAggregator.java           |  24 ++-
 .../aggregator/MinMaxRangeValueAggregator.java     |  15 +-
 .../aggregator/PercentileEstValueAggregator.java   |  21 ++-
 .../PercentileTDigestValueAggregator.java          |  23 ++-
 .../function/AvgAggregationFunction.java           |  49 ++++--
 .../DistinctCountHLLAggregationFunction.java       |  31 +++-
 .../function/MinMaxRangeAggregationFunction.java   |  51 +++---
 .../function/PercentileEstAggregationFunction.java |  49 ++++--
 .../PercentileTDigestAggregationFunction.java      |  47 ++++--
 .../pinot/core/common/ObjectSerDeUtilsTest.java    |   4 +-
 .../SegmentGenerationWithBytesTypeTest.java        |   2 +-
 ...eriesTest.java => DefaultBytesQueriesTest.java} | 188 ++++++++++++---------
 .../queries/PercentileTDigestMVQueriesTest.java    |   2 +-
 .../queries/PercentileTDigestQueriesTest.java      |   2 +-
 15 files changed, 331 insertions(+), 192 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
index e5841f7..a189105 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.aggregator;
 import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.core.query.aggregation.function.AvgAggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
 
 
@@ -40,7 +41,13 @@ public class AvgValueAggregator implements ValueAggregator<Object, AvgPair> {
   @Override
   public AvgPair getInitialAggregatedValue(Object rawValue) {
     if (rawValue instanceof byte[]) {
-      return deserializeAggregatedValue((byte[]) rawValue);
+      // Use default value for zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        return deserializeAggregatedValue(bytes);
+      } else {
+        return AvgAggregationFunction.getDefaultAvgPair();
+      }
     } else {
       return new AvgPair(((Number) rawValue).doubleValue(), 1L);
     }
@@ -49,7 +56,11 @@ public class AvgValueAggregator implements ValueAggregator<Object, AvgPair> {
   @Override
   public AvgPair applyRawValue(AvgPair value, Object rawValue) {
     if (rawValue instanceof byte[]) {
-      value.apply(deserializeAggregatedValue((byte[]) rawValue));
+      // Skip zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        value.apply(deserializeAggregatedValue(bytes));
+      }
     } else {
       value.apply(((Number) rawValue).doubleValue(), 1L);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
index 15ad278..5c0eab9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
@@ -47,11 +47,17 @@ public class DistinctCountHLLValueAggregator implements ValueAggregator<Object,
   public HyperLogLog getInitialAggregatedValue(Object rawValue) {
     HyperLogLog initialValue;
     if (rawValue instanceof byte[]) {
+      // Use default value for zero-length byte array
       byte[] bytes = (byte[]) rawValue;
-      initialValue = deserializeAggregatedValue(bytes);
-      _maxByteSize = Math.max(_maxByteSize, bytes.length);
+      if (bytes.length != 0) {
+        initialValue = deserializeAggregatedValue(bytes);
+        _maxByteSize = Math.max(_maxByteSize, bytes.length);
+      } else {
+        initialValue = DistinctCountHLLAggregationFunction.getDefaultHyperLogLog();
+        _maxByteSize = Math.max(_maxByteSize, DEFAULT_LOG2M_BYTE_SIZE);
+      }
     } else {
-      initialValue = new HyperLogLog(DistinctCountHLLAggregationFunction.DEFAULT_LOG2M);
+      initialValue = DistinctCountHLLAggregationFunction.getDefaultHyperLogLog();
       initialValue.offer(rawValue);
       _maxByteSize = Math.max(_maxByteSize, DEFAULT_LOG2M_BYTE_SIZE);
     }
@@ -61,10 +67,14 @@ public class DistinctCountHLLValueAggregator implements ValueAggregator<Object,
   @Override
   public HyperLogLog applyRawValue(HyperLogLog value, Object rawValue) {
     if (rawValue instanceof byte[]) {
-      try {
-        value.addAll(deserializeAggregatedValue((byte[]) rawValue));
-      } catch (CardinalityMergeException e) {
-        throw new RuntimeException(e);
+      // Skip zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        try {
+          value.addAll(deserializeAggregatedValue(bytes));
+        } catch (CardinalityMergeException e) {
+          throw new RuntimeException(e);
+        }
       }
     } else {
       value.offer(rawValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
index a6a8d55..2d176df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.aggregator;
 import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.core.query.aggregation.function.MinMaxRangeAggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
 
 
@@ -40,7 +41,13 @@ public class MinMaxRangeValueAggregator implements ValueAggregator<Object, MinMa
   @Override
   public MinMaxRangePair getInitialAggregatedValue(Object rawValue) {
     if (rawValue instanceof byte[]) {
-      return deserializeAggregatedValue((byte[]) rawValue);
+      // Use default value for zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        return deserializeAggregatedValue(bytes);
+      } else {
+        return MinMaxRangeAggregationFunction.getDefaultMinMaxRangePair();
+      }
     } else {
       double doubleValue = ((Number) rawValue).doubleValue();
       return new MinMaxRangePair(doubleValue, doubleValue);
@@ -50,7 +57,11 @@ public class MinMaxRangeValueAggregator implements ValueAggregator<Object, MinMa
   @Override
   public MinMaxRangePair applyRawValue(MinMaxRangePair value, Object rawValue) {
     if (rawValue instanceof byte[]) {
-      value.apply(deserializeAggregatedValue((byte[]) rawValue));
+      // Skip zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        value.apply(deserializeAggregatedValue(bytes));
+      }
     } else {
       double doubleValue = ((Number) rawValue).doubleValue();
       value.apply(doubleValue, doubleValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
index a57f324..c9a9e8c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
@@ -44,11 +44,17 @@ public class PercentileEstValueAggregator implements ValueAggregator<Object, Qua
   public QuantileDigest getInitialAggregatedValue(Object rawValue) {
     QuantileDigest initialValue;
     if (rawValue instanceof byte[]) {
+      // Use default value for zero-length byte array
       byte[] bytes = (byte[]) rawValue;
-      initialValue = deserializeAggregatedValue(bytes);
-      _maxByteSize = Math.max(_maxByteSize, bytes.length);
+      if (bytes.length != 0) {
+        initialValue = deserializeAggregatedValue(bytes);
+        _maxByteSize = Math.max(_maxByteSize, bytes.length);
+      } else {
+        initialValue = PercentileEstAggregationFunction.getDefaultQuantileDigest();
+        _maxByteSize = Math.max(_maxByteSize, initialValue.getByteSize());
+      }
     } else {
-      initialValue = new QuantileDigest(PercentileEstAggregationFunction.DEFAULT_MAX_ERROR);
+      initialValue = PercentileEstAggregationFunction.getDefaultQuantileDigest();
       initialValue.add(((Number) rawValue).longValue());
       _maxByteSize = Math.max(_maxByteSize, initialValue.getByteSize());
     }
@@ -58,11 +64,16 @@ public class PercentileEstValueAggregator implements ValueAggregator<Object, Qua
   @Override
   public QuantileDigest applyRawValue(QuantileDigest value, Object rawValue) {
     if (rawValue instanceof byte[]) {
-      value.merge(deserializeAggregatedValue((byte[]) rawValue));
+      // Skip zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        value.merge(deserializeAggregatedValue(bytes));
+        _maxByteSize = Math.max(_maxByteSize, value.getByteSize());
+      }
     } else {
       value.add(((Number) rawValue).longValue());
+      _maxByteSize = Math.max(_maxByteSize, value.getByteSize());
     }
-    _maxByteSize = Math.max(_maxByteSize, value.getByteSize());
     return value;
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
index 9d398eb..545ed7f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
@@ -44,11 +44,17 @@ public class PercentileTDigestValueAggregator implements ValueAggregator<Object,
   public TDigest getInitialAggregatedValue(Object rawValue) {
     TDigest initialValue;
     if (rawValue instanceof byte[]) {
+      // Use default value for zero-length byte array
       byte[] bytes = (byte[]) rawValue;
-      initialValue = deserializeAggregatedValue(bytes);
-      _maxByteSize = Math.max(_maxByteSize, bytes.length);
+      if (bytes.length != 0) {
+        initialValue = deserializeAggregatedValue(bytes);
+        _maxByteSize = Math.max(_maxByteSize, bytes.length);
+      } else {
+        initialValue = PercentileTDigestAggregationFunction.getDefaultTDigest();
+        _maxByteSize = Math.max(_maxByteSize, initialValue.byteSize());
+      }
     } else {
-      initialValue = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+      initialValue = PercentileTDigestAggregationFunction.getDefaultTDigest();
       initialValue.add(((Number) rawValue).doubleValue());
       _maxByteSize = Math.max(_maxByteSize, initialValue.byteSize());
     }
@@ -58,11 +64,16 @@ public class PercentileTDigestValueAggregator implements ValueAggregator<Object,
   @Override
   public TDigest applyRawValue(TDigest value, Object rawValue) {
     if (rawValue instanceof byte[]) {
-      value.add(deserializeAggregatedValue((byte[]) rawValue));
+      // Skip zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        value.add(deserializeAggregatedValue(bytes));
+        _maxByteSize = Math.max(_maxByteSize, value.byteSize());
+      }
     } else {
-      value.add(((Number) rawValue).doubleValue());
+      value.add(((Number) rawValue).longValue());
+      _maxByteSize = Math.max(_maxByteSize, value.byteSize());
     }
-    _maxByteSize = Math.max(_maxByteSize, value.byteSize());
     return value;
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
index 6aef5f0..a56ffbe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
@@ -33,6 +33,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
 public class AvgAggregationFunction implements AggregationFunction<AvgPair, Double> {
   private static final double DEFAULT_FINAL_RESULT = Double.NEGATIVE_INFINITY;
 
+  public static AvgPair getDefaultAvgPair() {
+    return new AvgPair(0.0, 0L);
+  }
+
   @Nonnull
   @Override
   public AggregationFunctionType getType() {
@@ -71,10 +75,10 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         double sum = 0.0;
         for (int i = 0; i < length; i++) {
-          sum += valueArray[i];
+          sum += doubleValues[i];
         }
         setAggregationResult(aggregationResultHolder, sum, (long) length);
         break;
@@ -84,9 +88,12 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
         sum = 0.0;
         long count = 0L;
         for (int i = 0; i < length; i++) {
-          AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
-          sum += value.getSum();
-          count += value.getCount();
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+            sum += value.getSum();
+            count += value.getCount();
+          }
         }
         setAggregationResult(aggregationResultHolder, sum, count);
         break;
@@ -114,17 +121,20 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          setGroupByResult(groupKeyArray[i], groupByResultHolder, valueArray[i], 1L);
+          setGroupByResult(groupKeyArray[i], groupByResultHolder, doubleValues[i], 1L);
         }
         break;
       case BYTES:
         // Serialized AvgPair
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
-          setGroupByResult(groupKeyArray[i], groupByResultHolder, value.getSum(), value.getCount());
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+            setGroupByResult(groupKeyArray[i], groupByResultHolder, value.getSum(), value.getCount());
+          }
         }
         break;
       default:
@@ -141,9 +151,9 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          double value = valueArray[i];
+          double value = doubleValues[i];
           for (int groupKey : groupKeysArray[i]) {
             setGroupByResult(groupKey, groupByResultHolder, value, 1L);
           }
@@ -153,11 +163,14 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
         // Serialized AvgPair
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
-          double sum = value.getSum();
-          long count = value.getCount();
-          for (int groupKey : groupKeysArray[i]) {
-            setGroupByResult(groupKey, groupByResultHolder, sum, count);
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+            double sum = value.getSum();
+            long count = value.getCount();
+            for (int groupKey : groupKeysArray[i]) {
+              setGroupByResult(groupKey, groupByResultHolder, sum, count);
+            }
           }
         }
         break;
@@ -181,7 +194,7 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
   public AvgPair extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
     AvgPair avgPair = aggregationResultHolder.getResult();
     if (avgPair == null) {
-      return new AvgPair(0.0, 0L);
+      return getDefaultAvgPair();
     } else {
       return avgPair;
     }
@@ -192,7 +205,7 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
   public AvgPair extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     AvgPair avgPair = groupByResultHolder.getResult(groupKey);
     if (avgPair == null) {
-      return new AvgPair(0.0, 0L);
+      return getDefaultAvgPair();
     } else {
       return avgPair;
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
index f35709c..25826e0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
@@ -34,6 +34,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
 public class DistinctCountHLLAggregationFunction implements AggregationFunction<HyperLogLog, Long> {
   public static final int DEFAULT_LOG2M = 8;
 
+  public static HyperLogLog getDefaultHyperLogLog() {
+    return new HyperLogLog(DEFAULT_LOG2M);
+  }
+
   @Nonnull
   @Override
   public AggregationFunctionType getType() {
@@ -105,7 +109,10 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         try {
           for (int i = 0; i < length; i++) {
-            hyperLogLog.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+            // Skip zero-length byte array
+            if (bytesValues[i].length != 0) {
+              hyperLogLog.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+            }
           }
         } catch (Exception e) {
           throw new RuntimeException("Caught exception while aggregating HyperLogLog", e);
@@ -156,8 +163,11 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         try {
           for (int i = 0; i < length; i++) {
-            setValueForGroupKey(groupByResultHolder, groupKeyArray[i],
-                ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+            // Skip zero-length byte array
+            if (bytesValues[i].length != 0) {
+              setValueForGroupKey(groupByResultHolder, groupKeyArray[i],
+                  ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+            }
           }
         } catch (Exception e) {
           throw new RuntimeException("Caught exception while aggregating HyperLogLog", e);
@@ -208,8 +218,11 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         try {
           for (int i = 0; i < length; i++) {
-            setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
-                ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+            // Skip zero-length byte array
+            if (bytesValues[i].length != 0) {
+              setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
+                  ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+            }
           }
         } catch (Exception e) {
           throw new RuntimeException("Caught exception while aggregating HyperLogLog", e);
@@ -225,7 +238,7 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
   public HyperLogLog extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
     HyperLogLog hyperLogLog = aggregationResultHolder.getResult();
     if (hyperLogLog == null) {
-      return new HyperLogLog(DEFAULT_LOG2M);
+      return getDefaultHyperLogLog();
     } else {
       return hyperLogLog;
     }
@@ -236,7 +249,7 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
   public HyperLogLog extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     HyperLogLog hyperLogLog = groupByResultHolder.getResult(groupKey);
     if (hyperLogLog == null) {
-      return new HyperLogLog(DEFAULT_LOG2M);
+      return getDefaultHyperLogLog();
     } else {
       return hyperLogLog;
     }
@@ -335,7 +348,7 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
   protected static HyperLogLog getHyperLogLog(@Nonnull AggregationResultHolder aggregationResultHolder) {
     HyperLogLog hyperLogLog = aggregationResultHolder.getResult();
     if (hyperLogLog == null) {
-      hyperLogLog = new HyperLogLog(DEFAULT_LOG2M);
+      hyperLogLog = getDefaultHyperLogLog();
       aggregationResultHolder.setValue(hyperLogLog);
     }
     return hyperLogLog;
@@ -351,7 +364,7 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
   protected static HyperLogLog getHyperLogLog(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     HyperLogLog hyperLogLog = groupByResultHolder.getResult(groupKey);
     if (hyperLogLog == null) {
-      hyperLogLog = new HyperLogLog(DEFAULT_LOG2M);
+      hyperLogLog = getDefaultHyperLogLog();
       groupByResultHolder.setValueForKey(groupKey, hyperLogLog);
     }
     return hyperLogLog;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
index 717817c..84d5e06 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
@@ -32,6 +32,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
 
 public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMaxRangePair, Double> {
 
+  public static MinMaxRangePair getDefaultMinMaxRangePair() {
+    return new MinMaxRangePair(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY);
+  }
+
   @Nonnull
   @Override
   public AggregationFunctionType getType() {
@@ -70,11 +74,11 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         double min = Double.POSITIVE_INFINITY;
         double max = Double.NEGATIVE_INFINITY;
         for (int i = 0; i < length; i++) {
-          double value = valueArray[i];
+          double value = doubleValues[i];
           if (value < min) {
             min = value;
           }
@@ -90,12 +94,15 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
         min = Double.POSITIVE_INFINITY;
         max = Double.NEGATIVE_INFINITY;
         for (int i = 0; i < length; i++) {
-          MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
-          if (value.getMin() < min) {
-            min = value.getMin();
-          }
-          if (value.getMax() > max) {
-            max = value.getMax();
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+            if (value.getMin() < min) {
+              min = value.getMin();
+            }
+            if (value.getMax() > max) {
+              max = value.getMax();
+            }
           }
         }
         setAggregationResult(aggregationResultHolder, min, max);
@@ -124,9 +131,9 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          double value = valueArray[i];
+          double value = doubleValues[i];
           setGroupByResult(groupKeyArray[i], groupByResultHolder, value, value);
         }
         break;
@@ -134,8 +141,11 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
         // Serialized MinMaxRangePair
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
-          setGroupByResult(groupKeyArray[i], groupByResultHolder, value.getMin(), value.getMax());
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+            setGroupByResult(groupKeyArray[i], groupByResultHolder, value.getMin(), value.getMax());
+          }
         }
         break;
       default:
@@ -152,9 +162,9 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          double value = valueArray[i];
+          double value = doubleValues[i];
           for (int groupKey : groupKeysArray[i]) {
             setGroupByResult(groupKey, groupByResultHolder, value, value);
           }
@@ -164,9 +174,12 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
         // Serialized MinMaxRangePair
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
-          for (int groupKey : groupKeysArray[i]) {
-            setGroupByResult(groupKey, groupByResultHolder, value.getMin(), value.getMax());
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+            for (int groupKey : groupKeysArray[i]) {
+              setGroupByResult(groupKey, groupByResultHolder, value.getMin(), value.getMax());
+            }
           }
         }
         break;
@@ -190,7 +203,7 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
   public MinMaxRangePair extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
     MinMaxRangePair minMaxRangePair = aggregationResultHolder.getResult();
     if (minMaxRangePair == null) {
-      return new MinMaxRangePair(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY);
+      return getDefaultMinMaxRangePair();
     } else {
       return minMaxRangePair;
     }
@@ -201,7 +214,7 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
   public MinMaxRangePair extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     MinMaxRangePair minMaxRangePair = groupByResultHolder.getResult(groupKey);
     if (minMaxRangePair == null) {
-      return new MinMaxRangePair(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY);
+      return getDefaultMinMaxRangePair();
     } else {
       return minMaxRangePair;
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
index 3c2c91b..6c0b8ae 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
@@ -33,6 +33,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
 public class PercentileEstAggregationFunction implements AggregationFunction<QuantileDigest, Long> {
   public static final double DEFAULT_MAX_ERROR = 0.05;
 
+  public static QuantileDigest getDefaultQuantileDigest() {
+    return new QuantileDigest(DEFAULT_MAX_ERROR);
+  }
+
   protected final int _percentile;
 
   public PercentileEstAggregationFunction(int percentile) {
@@ -79,16 +83,19 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        long[] longValues = blockValSets[0].getLongValuesSV();
         for (int i = 0; i < length; i++) {
-          quantileDigest.add((long) valueArray[i]);
+          quantileDigest.add(longValues[i]);
         }
         break;
       case BYTES:
         // Serialized QuantileDigest
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          quantileDigest.merge(ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]));
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            quantileDigest.merge(ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]));
+          }
         }
         break;
       default:
@@ -105,18 +112,21 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        long[] longValues = blockValSets[0].getLongValuesSV();
         for (int i = 0; i < length; i++) {
           QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKeyArray[i]);
-          quantileDigest.add((long) valueArray[i]);
+          quantileDigest.add(longValues[i]);
         }
         break;
       case BYTES:
         // Serialized QuantileDigest
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKeyArray[i]);
-          quantileDigest.merge(ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]));
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKeyArray[i]);
+            quantileDigest.merge(ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]));
+          }
         }
         break;
       default:
@@ -133,12 +143,12 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        long[] longValues = blockValSets[0].getLongValuesSV();
         for (int i = 0; i < length; i++) {
-          double value = valueArray[i];
+          long value = longValues[i];
           for (int groupKey : groupKeysArray[i]) {
             QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKey);
-            quantileDigest.add((long) value);
+            quantileDigest.add(value);
           }
         }
         break;
@@ -146,10 +156,13 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
         // Serialized QuantileDigest
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          QuantileDigest value = ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]);
-          for (int groupKey : groupKeysArray[i]) {
-            QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKey);
-            quantileDigest.merge(value);
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            QuantileDigest value = ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]);
+            for (int groupKey : groupKeysArray[i]) {
+              QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKey);
+              quantileDigest.merge(value);
+            }
           }
         }
         break;
@@ -163,7 +176,7 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
   public QuantileDigest extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
     QuantileDigest quantileDigest = aggregationResultHolder.getResult();
     if (quantileDigest == null) {
-      return new QuantileDigest(DEFAULT_MAX_ERROR);
+      return getDefaultQuantileDigest();
     } else {
       return quantileDigest;
     }
@@ -174,7 +187,7 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
   public QuantileDigest extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     QuantileDigest quantileDigest = groupByResultHolder.getResult(groupKey);
     if (quantileDigest == null) {
-      return new QuantileDigest(DEFAULT_MAX_ERROR);
+      return getDefaultQuantileDigest();
     } else {
       return quantileDigest;
     }
@@ -214,7 +227,7 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
   protected static QuantileDigest getQuantileDigest(@Nonnull AggregationResultHolder aggregationResultHolder) {
     QuantileDigest quantileDigest = aggregationResultHolder.getResult();
     if (quantileDigest == null) {
-      quantileDigest = new QuantileDigest(DEFAULT_MAX_ERROR);
+      quantileDigest = getDefaultQuantileDigest();
       aggregationResultHolder.setValue(quantileDigest);
     }
     return quantileDigest;
@@ -230,7 +243,7 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
   protected static QuantileDigest getQuantileDigest(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     QuantileDigest quantileDigest = groupByResultHolder.getResult(groupKey);
     if (quantileDigest == null) {
-      quantileDigest = new QuantileDigest(DEFAULT_MAX_ERROR);
+      quantileDigest = getDefaultQuantileDigest();
       groupByResultHolder.setValueForKey(groupKey, quantileDigest);
     }
     return quantileDigest;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
index 0d4db7f..d037535 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
@@ -37,6 +37,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
 public class PercentileTDigestAggregationFunction implements AggregationFunction<TDigest, Double> {
   public static final int DEFAULT_TDIGEST_COMPRESSION = 100;
 
+  public static TDigest getDefaultTDigest() {
+    return TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+  }
+
   protected final int _percentile;
 
   public PercentileTDigestAggregationFunction(int percentile) {
@@ -83,16 +87,19 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          tDigest.add(valueArray[i]);
+          tDigest.add(doubleValues[i]);
         }
         break;
       case BYTES:
         // Serialized TDigest
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          tDigest.add(ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i])));
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            tDigest.add(ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i])));
+          }
         }
         break;
       default:
@@ -109,18 +116,21 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
           TDigest tDigest = getTDigest(groupByResultHolder, groupKeyArray[i]);
-          tDigest.add(valueArray[i]);
+          tDigest.add(doubleValues[i]);
         }
         break;
       case BYTES:
         // Serialized TDigest
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          TDigest tDigest = getTDigest(groupByResultHolder, groupKeyArray[i]);
-          tDigest.add(ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i])));
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            TDigest tDigest = getTDigest(groupByResultHolder, groupKeyArray[i]);
+            tDigest.add(ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i])));
+          }
         }
         break;
       default:
@@ -137,9 +147,9 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          double value = valueArray[i];
+          double value = doubleValues[i];
           for (int groupKey : groupKeysArray[i]) {
             TDigest tDigest = getTDigest(groupByResultHolder, groupKey);
             tDigest.add(value);
@@ -149,10 +159,13 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
         // Serialized QuantileDigest
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          TDigest value = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i]));
-          for (int groupKey : groupKeysArray[i]) {
-            TDigest tDigest = getTDigest(groupByResultHolder, groupKey);
-            tDigest.add(value);
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            TDigest value = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i]));
+            for (int groupKey : groupKeysArray[i]) {
+              TDigest tDigest = getTDigest(groupByResultHolder, groupKey);
+              tDigest.add(value);
+            }
           }
         }
         break;
@@ -166,7 +179,7 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
   public TDigest extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
     TDigest tDigest = aggregationResultHolder.getResult();
     if (tDigest == null) {
-      return TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+      return getDefaultTDigest();
     } else {
       return tDigest;
     }
@@ -177,7 +190,7 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
   public TDigest extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     TDigest tDigest = groupByResultHolder.getResult(groupKey);
     if (tDigest == null) {
-      return TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+      return getDefaultTDigest();
     } else {
       return tDigest;
     }
@@ -229,7 +242,7 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
   protected static TDigest getTDigest(@Nonnull AggregationResultHolder aggregationResultHolder) {
     TDigest tDigest = aggregationResultHolder.getResult();
     if (tDigest == null) {
-      tDigest = TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+      tDigest = getDefaultTDigest();
       aggregationResultHolder.setValue(tDigest);
     }
     return tDigest;
@@ -245,7 +258,7 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
   protected static TDigest getTDigest(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     TDigest tDigest = groupByResultHolder.getResult(groupKey);
     if (tDigest == null) {
-      tDigest = TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+      tDigest = getDefaultTDigest();
       groupByResultHolder.setValueForKey(groupKey, tDigest);
     }
     return tDigest;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index 8350ee6..74515b0 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -137,7 +137,7 @@ public class ObjectSerDeUtilsTest {
   @Test
   public void testQuantileDigest() {
     for (int i = 0; i < NUM_ITERATIONS; i++) {
-      QuantileDigest expected = new QuantileDigest(PercentileEstAggregationFunction.DEFAULT_MAX_ERROR);
+      QuantileDigest expected = PercentileEstAggregationFunction.getDefaultQuantileDigest();
       int size = RANDOM.nextInt(100) + 1;
       for (int j = 0; j < size; j++) {
         expected.add(RANDOM.nextLong());
@@ -188,7 +188,7 @@ public class ObjectSerDeUtilsTest {
   @Test
   public void testTDigest() {
     for (int i = 0; i < NUM_ITERATIONS; i++) {
-      TDigest expected = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+      TDigest expected = PercentileTDigestAggregationFunction.getDefaultTDigest();
       int size = RANDOM.nextInt(100) + 1;
       for (int j = 0; j < size; j++) {
         expected.add(RANDOM.nextDouble());
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithBytesTypeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithBytesTypeTest.java
index 6d6d810..7749a44 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithBytesTypeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithBytesTypeTest.java
@@ -285,7 +285,7 @@ public class SegmentGenerationWithBytesTypeTest {
       for (int i = 0; i < NUM_ROWS; i++) {
         GenericData.Record record = new GenericData.Record(avroSchema);
 
-        TDigest tDigest = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+        TDigest tDigest = PercentileTDigestAggregationFunction.getDefaultTDigest();
         tDigest.add(_random.nextDouble());
 
         ByteBuffer buffer = ByteBuffer.allocate(tDigest.byteSize());
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DefaultBytesQueriesTest.java
similarity index 55%
copy from pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
copy to pinot-core/src/test/java/org/apache/pinot/queries/DefaultBytesQueriesTest.java
index 67ab0fa..1686513 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DefaultBytesQueriesTest.java
@@ -18,17 +18,14 @@
  */
 package org.apache.pinot.queries;
 
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import com.tdunning.math.stats.TDigest;
-import it.unimi.dsi.fastutil.doubles.DoubleList;
 import java.io.File;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.data.DimensionFieldSpec;
@@ -51,7 +48,9 @@ import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
 import org.apache.pinot.core.operator.query.AggregationOperator;
-import org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
+import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
+import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
 import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -60,34 +59,33 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 
 /**
- * Tests for PERCENTILE_TDIGEST aggregation function.
+ * Tests for default bytes (zero-length byte array) values.
  *
+ * <p>Aggregation function that supports bytes values:
  * <ul>
- *   <li>Generates a segment with a double column, a TDigest column and a group-by column</li>
- *   <li>Runs aggregation and group-by queries on the generated segment</li>
- *   <li>
- *     Compares the results for PERCENTILE_TDIGEST on double column and TDigest column with results for PERCENTILE on
- *     double column
- *   </li>
+ *   <li>AVG</li>
+ *   <li>DISTINCTCOUNTHLL</li>
+ *   <li>MINMAXRANGE</li>
+ *   <li>PERCENTILEEST</li>
+ *   <li>PERCENTILETDIGEST</li>
  * </ul>
  */
-public class PercentileTDigestQueriesTest extends BaseQueriesTest {
-  protected static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "PercentileTDigestQueriesTest");
+public class DefaultBytesQueriesTest extends BaseQueriesTest {
+  protected static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "DefaultBytesQueriesTest");
   protected static final String TABLE_NAME = "testTable";
   protected static final String SEGMENT_NAME = "testSegment";
 
   protected static final int NUM_ROWS = 1000;
-  protected static final double VALUE_RANGE = Integer.MAX_VALUE;
-  protected static final double DELTA = 0.05 * VALUE_RANGE; // Allow 5% quantile error
-  protected static final String DOUBLE_COLUMN = "doubleColumn";
-  protected static final String TDIGEST_COLUMN = "tDigestColumn";
+  protected static final String BYTES_COLUMN = "bytesColumn";
   protected static final String GROUP_BY_COLUMN = "groupByColumn";
   protected static final String[] GROUPS = new String[]{"G1", "G2", "G3"};
   protected static final long RANDOM_SEED = System.nanoTime();
   protected static final Random RANDOM = new Random(RANDOM_SEED);
-  protected static final String ERROR_MESSAGE = "Random seed: " + RANDOM_SEED;
 
   private ImmutableSegment _indexSegment;
   private List<SegmentDataManager> _segmentDataManagers;
@@ -118,20 +116,13 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
         Arrays.asList(new ImmutableSegmentDataManager(_indexSegment), new ImmutableSegmentDataManager(_indexSegment));
   }
 
-  protected void buildSegment()
+  private void buildSegment()
       throws Exception {
     List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
     for (int i = 0; i < NUM_ROWS; i++) {
       HashMap<String, Object> valueMap = new HashMap<>();
 
-      double value = RANDOM.nextDouble() * VALUE_RANGE;
-      valueMap.put(DOUBLE_COLUMN, value);
-
-      TDigest tDigest = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
-      tDigest.add(value);
-      ByteBuffer byteBuffer = ByteBuffer.allocate(tDigest.byteSize());
-      tDigest.asBytes(byteBuffer);
-      valueMap.put(TDIGEST_COLUMN, byteBuffer.array());
+      valueMap.put(BYTES_COLUMN, FieldSpec.DEFAULT_METRIC_NULL_VALUE_OF_BYTES);
 
       String group = GROUPS[RANDOM.nextInt(GROUPS.length)];
       valueMap.put(GROUP_BY_COLUMN, group);
@@ -142,15 +133,13 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
     }
 
     Schema schema = new Schema();
-    schema.addField(new MetricFieldSpec(DOUBLE_COLUMN, FieldSpec.DataType.DOUBLE));
-    schema.addField(new MetricFieldSpec(TDIGEST_COLUMN, FieldSpec.DataType.BYTES));
+    schema.addField(new MetricFieldSpec(BYTES_COLUMN, FieldSpec.DataType.BYTES));
     schema.addField(new DimensionFieldSpec(GROUP_BY_COLUMN, FieldSpec.DataType.STRING, true));
 
     SegmentGeneratorConfig config = new SegmentGeneratorConfig(schema);
     config.setOutDir(INDEX_DIR.getPath());
     config.setTableName(TABLE_NAME);
     config.setSegmentName(SEGMENT_NAME);
-    config.setRawIndexCreationColumns(Collections.singletonList(TDIGEST_COLUMN));
 
     SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
     try (RecordReader recordReader = new GenericRowRecordReader(rows, schema)) {
@@ -162,97 +151,128 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
   @Test
   public void testInnerSegmentAggregation() {
     // For inner segment case, percentile does not affect the intermediate result
-    AggregationOperator aggregationOperator = getOperatorForQuery(getAggregationQuery(0));
+    AggregationOperator aggregationOperator = getOperatorForQuery(getAggregationQuery());
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     List<Object> aggregationResult = resultsBlock.getAggregationResult();
     Assert.assertNotNull(aggregationResult);
-    Assert.assertEquals(aggregationResult.size(), 3);
-    DoubleList doubleList = (DoubleList) aggregationResult.get(0);
-    Collections.sort(doubleList);
-    assertTDigest((TDigest) aggregationResult.get(1), doubleList);
-    assertTDigest((TDigest) aggregationResult.get(2), doubleList);
+    assertEquals(aggregationResult.size(), 5);
+    // Avg
+    AvgPair avgPair = (AvgPair) aggregationResult.get(0);
+    assertEquals(avgPair.getSum(), 0.0);
+    assertEquals(avgPair.getCount(), 0L);
+    // DistinctCountHLL
+    HyperLogLog hyperLogLog = (HyperLogLog) aggregationResult.get(1);
+    assertEquals(hyperLogLog.cardinality(), 0L);
+    // MinMaxRange
+    MinMaxRangePair minMaxRangePair = (MinMaxRangePair) aggregationResult.get(2);
+    assertEquals(minMaxRangePair.getMax(), Double.NEGATIVE_INFINITY);
+    assertEquals(minMaxRangePair.getMin(), Double.POSITIVE_INFINITY);
+    // PercentileEst
+    QuantileDigest quantileDigest = (QuantileDigest) aggregationResult.get(3);
+    assertEquals(quantileDigest.getQuantile(0.5), Long.MIN_VALUE);
+    // PercentileTDigest
+    TDigest tDigest = (TDigest) aggregationResult.get(4);
+    assertTrue(Double.isNaN(tDigest.quantile(0.5)));
   }
 
   @Test
   public void testInterSegmentAggregation() {
     for (int percentile = 0; percentile <= 100; percentile++) {
-      BrokerResponseNative brokerResponse = getBrokerResponseForQuery(getAggregationQuery(percentile));
+      BrokerResponseNative brokerResponse = getBrokerResponseForQuery(getAggregationQuery());
       List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
       Assert.assertNotNull(aggregationResults);
-      Assert.assertEquals(aggregationResults.size(), 3);
-      double expected = Double.parseDouble((String) aggregationResults.get(0).getValue());
-      double resultForDoubleColumn = Double.parseDouble((String) aggregationResults.get(1).getValue());
-      Assert.assertEquals(resultForDoubleColumn, expected, DELTA, ERROR_MESSAGE);
-      double resultForTDigestColumn = Double.parseDouble((String) aggregationResults.get(2).getValue());
-      Assert.assertEquals(resultForTDigestColumn, expected, DELTA, ERROR_MESSAGE);
+      assertEquals(aggregationResults.size(), 5);
+      // Avg
+      assertEquals(Double.parseDouble((String) aggregationResults.get(0).getValue()), Double.NEGATIVE_INFINITY);
+      // DistinctCountHLL
+      assertEquals(Long.parseLong((String) aggregationResults.get(1).getValue()), 0L);
+      // MinMaxRange
+      assertEquals(Double.parseDouble((String) aggregationResults.get(2).getValue()), Double.NEGATIVE_INFINITY);
+      // PercentileEst
+      assertEquals(Long.parseLong((String) aggregationResults.get(3).getValue()), Long.MIN_VALUE);
+      // PercentileTDigest
+      assertTrue(Double.isNaN(Double.parseDouble((String) aggregationResults.get(4).getValue())));
     }
   }
 
   @Test
   public void testInnerSegmentGroupBy() {
     // For inner segment case, percentile does not affect the intermediate result
-    AggregationGroupByOperator groupByOperator = getOperatorForQuery(getGroupByQuery(0));
+    AggregationGroupByOperator groupByOperator = getOperatorForQuery(getGroupByQuery());
     IntermediateResultsBlock resultsBlock = groupByOperator.nextBlock();
     AggregationGroupByResult groupByResult = resultsBlock.getAggregationGroupByResult();
     Assert.assertNotNull(groupByResult);
     Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = groupByResult.getGroupKeyIterator();
     while (groupKeyIterator.hasNext()) {
       GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
-      DoubleList doubleList = (DoubleList) groupByResult.getResultForKey(groupKey, 0);
-      Collections.sort(doubleList);
-      assertTDigest((TDigest) groupByResult.getResultForKey(groupKey, 1), doubleList);
-      assertTDigest((TDigest) groupByResult.getResultForKey(groupKey, 2), doubleList);
+      // Avg
+      AvgPair avgPair = (AvgPair) groupByResult.getResultForKey(groupKey, 0);
+      assertEquals(avgPair.getSum(), 0.0);
+      assertEquals(avgPair.getCount(), 0L);
+      // DistinctCountHLL
+      HyperLogLog hyperLogLog = (HyperLogLog) groupByResult.getResultForKey(groupKey, 1);
+      assertEquals(hyperLogLog.cardinality(), 0L);
+      // MinMaxRange
+      MinMaxRangePair minMaxRangePair = (MinMaxRangePair) groupByResult.getResultForKey(groupKey, 2);
+      assertEquals(minMaxRangePair.getMax(), Double.NEGATIVE_INFINITY);
+      assertEquals(minMaxRangePair.getMin(), Double.POSITIVE_INFINITY);
+      // PercentileEst
+      QuantileDigest quantileDigest = (QuantileDigest) groupByResult.getResultForKey(groupKey, 3);
+      assertEquals(quantileDigest.getQuantile(0.5), Long.MIN_VALUE);
+      // PercentileTDigest
+      TDigest tDigest = (TDigest) groupByResult.getResultForKey(groupKey, 4);
+      assertTrue(Double.isNaN(tDigest.quantile(0.5)));
     }
   }
 
   @Test
   public void testInterSegmentGroupBy() {
     for (int percentile = 0; percentile <= 100; percentile++) {
-      BrokerResponseNative brokerResponse = getBrokerResponseForQuery(getGroupByQuery(percentile));
+      BrokerResponseNative brokerResponse = getBrokerResponseForQuery(getGroupByQuery());
       List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
       Assert.assertNotNull(aggregationResults);
-      Assert.assertEquals(aggregationResults.size(), 3);
-      Map<String, Double> expectedValues = new HashMap<>();
-      for (GroupByResult groupByResult : aggregationResults.get(0).getGroupByResult()) {
-        expectedValues.put(groupByResult.getGroup().get(0), Double.parseDouble((String) groupByResult.getValue()));
+      assertEquals(aggregationResults.size(), 5);
+      // Avg
+      List<GroupByResult> groupByResults = aggregationResults.get(0).getGroupByResult();
+      assertEquals(groupByResults.size(), 3);
+      for (GroupByResult groupByResult : groupByResults) {
+        assertEquals(Double.parseDouble((String) groupByResult.getValue()), Double.NEGATIVE_INFINITY);
+      }
+      // DistinctCountHLL
+      groupByResults = aggregationResults.get(1).getGroupByResult();
+      assertEquals(groupByResults.size(), 3);
+      for (GroupByResult groupByResult : groupByResults) {
+        assertEquals(Long.parseLong((String) groupByResult.getValue()), 0L);
       }
-      for (GroupByResult groupByResult : aggregationResults.get(1).getGroupByResult()) {
-        String group = groupByResult.getGroup().get(0);
-        double expected = expectedValues.get(group);
-        double resultForDoubleColumn = Double.parseDouble((String) groupByResult.getValue());
-        Assert.assertEquals(resultForDoubleColumn, expected, DELTA, ERROR_MESSAGE);
+      // MinMaxRange
+      groupByResults = aggregationResults.get(2).getGroupByResult();
+      assertEquals(groupByResults.size(), 3);
+      for (GroupByResult groupByResult : groupByResults) {
+        assertEquals(Double.parseDouble((String) groupByResult.getValue()), Double.NEGATIVE_INFINITY);
       }
-      for (GroupByResult groupByResult : aggregationResults.get(2).getGroupByResult()) {
-        String group = groupByResult.getGroup().get(0);
-        double expected = expectedValues.get(group);
-        double resultForTDigestColumn = Double.parseDouble((String) groupByResult.getValue());
-        Assert.assertEquals(resultForTDigestColumn, expected, DELTA, ERROR_MESSAGE);
+      // PercentileEst
+      groupByResults = aggregationResults.get(3).getGroupByResult();
+      assertEquals(groupByResults.size(), 3);
+      for (GroupByResult groupByResult : groupByResults) {
+        assertEquals(Long.parseLong((String) groupByResult.getValue()), Long.MIN_VALUE);
+      }
+      // PercentileTDigest
+      groupByResults = aggregationResults.get(4).getGroupByResult();
+      assertEquals(groupByResults.size(), 3);
+      for (GroupByResult groupByResult : groupByResults) {
+        assertTrue(Double.isNaN(Double.parseDouble((String) groupByResult.getValue())));
       }
     }
   }
 
-  protected String getAggregationQuery(int percentile) {
-    return String
-        .format("SELECT PERCENTILE%d(%s), PERCENTILETDIGEST%d(%s), PERCENTILETDIGEST%d(%s) FROM %s", percentile,
-            DOUBLE_COLUMN, percentile, DOUBLE_COLUMN, percentile, TDIGEST_COLUMN, TABLE_NAME);
+  private String getAggregationQuery() {
+    return String.format(
+        "SELECT AVG(%s), DISTINCTCOUNTHLL(%s), MINMAXRANGE(%s), PERCENTILEEST50(%s), PERCENTILETDIGEST50(%s) FROM %s",
+        BYTES_COLUMN, BYTES_COLUMN, BYTES_COLUMN, BYTES_COLUMN, BYTES_COLUMN, TABLE_NAME);
   }
 
-  private String getGroupByQuery(int percentile) {
-    return String.format("%s GROUP BY %s", getAggregationQuery(percentile), GROUP_BY_COLUMN);
-  }
-
-  private void assertTDigest(TDigest tDigest, DoubleList doubleList) {
-    for (int percentile = 0; percentile <= 100; percentile++) {
-      double expected;
-      if (percentile == 100) {
-        expected = doubleList.getDouble(doubleList.size() - 1);
-      } else {
-        expected = doubleList.getDouble(doubleList.size() * percentile / 100);
-      }
-      Assert
-          .assertEquals(PercentileTDigestAggregationFunction.calculatePercentile(tDigest, percentile), expected, DELTA,
-              ERROR_MESSAGE);
-    }
+  private String getGroupByQuery() {
+    return String.format("%s GROUP BY %s", getAggregationQuery(), GROUP_BY_COLUMN);
   }
 
   @AfterClass
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestMVQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestMVQueriesTest.java
index 4f2bf59..a9f6218 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestMVQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestMVQueriesTest.java
@@ -60,7 +60,7 @@ public class PercentileTDigestMVQueriesTest extends PercentileTDigestQueriesTest
 
       int numMultiValues = RANDOM.nextInt(MAX_NUM_MULTI_VALUES) + 1;
       Double[] values = new Double[numMultiValues];
-      TDigest tDigest = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+      TDigest tDigest = PercentileTDigestAggregationFunction.getDefaultTDigest();
       for (int j = 0; j < numMultiValues; j++) {
         double value = RANDOM.nextDouble() * VALUE_RANGE;
         values[j] = value;
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
index 67ab0fa..6206120 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
@@ -127,7 +127,7 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
       double value = RANDOM.nextDouble() * VALUE_RANGE;
       valueMap.put(DOUBLE_COLUMN, value);
 
-      TDigest tDigest = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+      TDigest tDigest = PercentileTDigestAggregationFunction.getDefaultTDigest();
       tDigest.add(value);
       ByteBuffer byteBuffer = ByteBuffer.allocate(tDigest.byteSize());
       tDigest.asBytes(byteBuffer);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org