You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/16 22:16:14 UTC

[incubator-pinot] branch default_bytes created (now c8169e4)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch default_bytes
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at c8169e4  Support default BYTES (zero-length byte array) in aggregation function and aggregator

This branch includes the following new commits:

     new c8169e4  Support default BYTES (zero-length byte array) in aggregation function and aggregator

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Support default BYTES (zero-length byte array) in aggregation function and aggregator

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch default_bytes
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit c8169e409e2217064e42c268077786d21da46134
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Thu May 16 15:13:57 2019 -0700

    Support default BYTES (zero-length byte array) in aggregation function and aggregator
    
    Treat zero-length byte array as null, and skip processing it
    Add DefaultBytesQueriesTest for test coverage
---
 .../core/data/aggregator/AvgValueAggregator.java   |  15 +-
 .../DistinctCountHLLValueAggregator.java           |  24 ++-
 .../aggregator/MinMaxRangeValueAggregator.java     |  15 +-
 .../aggregator/PercentileEstValueAggregator.java   |  21 ++-
 .../PercentileTDigestValueAggregator.java          |  23 ++-
 .../function/AvgAggregationFunction.java           |  49 ++++--
 .../DistinctCountHLLAggregationFunction.java       |  31 +++-
 .../function/MinMaxRangeAggregationFunction.java   |  51 +++---
 .../function/PercentileEstAggregationFunction.java |  49 ++++--
 .../PercentileTDigestAggregationFunction.java      |  47 ++++--
 .../pinot/core/common/ObjectSerDeUtilsTest.java    |   4 +-
 .../SegmentGenerationWithBytesTypeTest.java        |   2 +-
 ...eriesTest.java => DefaultBytesQueriesTest.java} | 188 ++++++++++++---------
 .../queries/PercentileTDigestMVQueriesTest.java    |   2 +-
 .../queries/PercentileTDigestQueriesTest.java      |   2 +-
 15 files changed, 331 insertions(+), 192 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
index e5841f7..a189105 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.aggregator;
 import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.core.query.aggregation.function.AvgAggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
 
 
@@ -40,7 +41,13 @@ public class AvgValueAggregator implements ValueAggregator<Object, AvgPair> {
   @Override
   public AvgPair getInitialAggregatedValue(Object rawValue) {
     if (rawValue instanceof byte[]) {
-      return deserializeAggregatedValue((byte[]) rawValue);
+      // Use default value for zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        return deserializeAggregatedValue(bytes);
+      } else {
+        return AvgAggregationFunction.getDefaultAvgPair();
+      }
     } else {
       return new AvgPair(((Number) rawValue).doubleValue(), 1L);
     }
@@ -49,7 +56,11 @@ public class AvgValueAggregator implements ValueAggregator<Object, AvgPair> {
   @Override
   public AvgPair applyRawValue(AvgPair value, Object rawValue) {
     if (rawValue instanceof byte[]) {
-      value.apply(deserializeAggregatedValue((byte[]) rawValue));
+      // Skip zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        value.apply(deserializeAggregatedValue(bytes));
+      }
     } else {
       value.apply(((Number) rawValue).doubleValue(), 1L);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
index 15ad278..5c0eab9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
@@ -47,11 +47,17 @@ public class DistinctCountHLLValueAggregator implements ValueAggregator<Object,
   public HyperLogLog getInitialAggregatedValue(Object rawValue) {
     HyperLogLog initialValue;
     if (rawValue instanceof byte[]) {
+      // Use default value for zero-length byte array
       byte[] bytes = (byte[]) rawValue;
-      initialValue = deserializeAggregatedValue(bytes);
-      _maxByteSize = Math.max(_maxByteSize, bytes.length);
+      if (bytes.length != 0) {
+        initialValue = deserializeAggregatedValue(bytes);
+        _maxByteSize = Math.max(_maxByteSize, bytes.length);
+      } else {
+        initialValue = DistinctCountHLLAggregationFunction.getDefaultHyperLogLog();
+        _maxByteSize = Math.max(_maxByteSize, DEFAULT_LOG2M_BYTE_SIZE);
+      }
     } else {
-      initialValue = new HyperLogLog(DistinctCountHLLAggregationFunction.DEFAULT_LOG2M);
+      initialValue = DistinctCountHLLAggregationFunction.getDefaultHyperLogLog();
       initialValue.offer(rawValue);
       _maxByteSize = Math.max(_maxByteSize, DEFAULT_LOG2M_BYTE_SIZE);
     }
@@ -61,10 +67,14 @@ public class DistinctCountHLLValueAggregator implements ValueAggregator<Object,
   @Override
   public HyperLogLog applyRawValue(HyperLogLog value, Object rawValue) {
     if (rawValue instanceof byte[]) {
-      try {
-        value.addAll(deserializeAggregatedValue((byte[]) rawValue));
-      } catch (CardinalityMergeException e) {
-        throw new RuntimeException(e);
+      // Skip zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        try {
+          value.addAll(deserializeAggregatedValue(bytes));
+        } catch (CardinalityMergeException e) {
+          throw new RuntimeException(e);
+        }
       }
     } else {
       value.offer(rawValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
index a6a8d55..2d176df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.aggregator;
 import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.core.query.aggregation.function.MinMaxRangeAggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
 
 
@@ -40,7 +41,13 @@ public class MinMaxRangeValueAggregator implements ValueAggregator<Object, MinMa
   @Override
   public MinMaxRangePair getInitialAggregatedValue(Object rawValue) {
     if (rawValue instanceof byte[]) {
-      return deserializeAggregatedValue((byte[]) rawValue);
+      // Use default value for zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        return deserializeAggregatedValue(bytes);
+      } else {
+        return MinMaxRangeAggregationFunction.getDefaultMinMaxRangePair();
+      }
     } else {
       double doubleValue = ((Number) rawValue).doubleValue();
       return new MinMaxRangePair(doubleValue, doubleValue);
@@ -50,7 +57,11 @@ public class MinMaxRangeValueAggregator implements ValueAggregator<Object, MinMa
   @Override
   public MinMaxRangePair applyRawValue(MinMaxRangePair value, Object rawValue) {
     if (rawValue instanceof byte[]) {
-      value.apply(deserializeAggregatedValue((byte[]) rawValue));
+      // Skip zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        value.apply(deserializeAggregatedValue(bytes));
+      }
     } else {
       double doubleValue = ((Number) rawValue).doubleValue();
       value.apply(doubleValue, doubleValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
index a57f324..c9a9e8c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
@@ -44,11 +44,17 @@ public class PercentileEstValueAggregator implements ValueAggregator<Object, Qua
   public QuantileDigest getInitialAggregatedValue(Object rawValue) {
     QuantileDigest initialValue;
     if (rawValue instanceof byte[]) {
+      // Use default value for zero-length byte array
       byte[] bytes = (byte[]) rawValue;
-      initialValue = deserializeAggregatedValue(bytes);
-      _maxByteSize = Math.max(_maxByteSize, bytes.length);
+      if (bytes.length != 0) {
+        initialValue = deserializeAggregatedValue(bytes);
+        _maxByteSize = Math.max(_maxByteSize, bytes.length);
+      } else {
+        initialValue = PercentileEstAggregationFunction.getDefaultQuantileDigest();
+        _maxByteSize = Math.max(_maxByteSize, initialValue.getByteSize());
+      }
     } else {
-      initialValue = new QuantileDigest(PercentileEstAggregationFunction.DEFAULT_MAX_ERROR);
+      initialValue = PercentileEstAggregationFunction.getDefaultQuantileDigest();
       initialValue.add(((Number) rawValue).longValue());
       _maxByteSize = Math.max(_maxByteSize, initialValue.getByteSize());
     }
@@ -58,11 +64,16 @@ public class PercentileEstValueAggregator implements ValueAggregator<Object, Qua
   @Override
   public QuantileDigest applyRawValue(QuantileDigest value, Object rawValue) {
     if (rawValue instanceof byte[]) {
-      value.merge(deserializeAggregatedValue((byte[]) rawValue));
+      // Skip zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        value.merge(deserializeAggregatedValue(bytes));
+        _maxByteSize = Math.max(_maxByteSize, value.getByteSize());
+      }
     } else {
       value.add(((Number) rawValue).longValue());
+      _maxByteSize = Math.max(_maxByteSize, value.getByteSize());
     }
-    _maxByteSize = Math.max(_maxByteSize, value.getByteSize());
     return value;
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
index 9d398eb..545ed7f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
@@ -44,11 +44,17 @@ public class PercentileTDigestValueAggregator implements ValueAggregator<Object,
   public TDigest getInitialAggregatedValue(Object rawValue) {
     TDigest initialValue;
     if (rawValue instanceof byte[]) {
+      // Use default value for zero-length byte array
       byte[] bytes = (byte[]) rawValue;
-      initialValue = deserializeAggregatedValue(bytes);
-      _maxByteSize = Math.max(_maxByteSize, bytes.length);
+      if (bytes.length != 0) {
+        initialValue = deserializeAggregatedValue(bytes);
+        _maxByteSize = Math.max(_maxByteSize, bytes.length);
+      } else {
+        initialValue = PercentileTDigestAggregationFunction.getDefaultTDigest();
+        _maxByteSize = Math.max(_maxByteSize, initialValue.byteSize());
+      }
     } else {
-      initialValue = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+      initialValue = PercentileTDigestAggregationFunction.getDefaultTDigest();
       initialValue.add(((Number) rawValue).doubleValue());
       _maxByteSize = Math.max(_maxByteSize, initialValue.byteSize());
     }
@@ -58,11 +64,16 @@ public class PercentileTDigestValueAggregator implements ValueAggregator<Object,
   @Override
   public TDigest applyRawValue(TDigest value, Object rawValue) {
     if (rawValue instanceof byte[]) {
-      value.add(deserializeAggregatedValue((byte[]) rawValue));
+      // Skip zero-length byte array
+      byte[] bytes = (byte[]) rawValue;
+      if (bytes.length != 0) {
+        value.add(deserializeAggregatedValue(bytes));
+        _maxByteSize = Math.max(_maxByteSize, value.byteSize());
+      }
     } else {
-      value.add(((Number) rawValue).doubleValue());
+      value.add(((Number) rawValue).longValue());
+      _maxByteSize = Math.max(_maxByteSize, value.byteSize());
     }
-    _maxByteSize = Math.max(_maxByteSize, value.byteSize());
     return value;
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
index 6aef5f0..a56ffbe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
@@ -33,6 +33,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
 public class AvgAggregationFunction implements AggregationFunction<AvgPair, Double> {
   private static final double DEFAULT_FINAL_RESULT = Double.NEGATIVE_INFINITY;
 
+  public static AvgPair getDefaultAvgPair() {
+    return new AvgPair(0.0, 0L);
+  }
+
   @Nonnull
   @Override
   public AggregationFunctionType getType() {
@@ -71,10 +75,10 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         double sum = 0.0;
         for (int i = 0; i < length; i++) {
-          sum += valueArray[i];
+          sum += doubleValues[i];
         }
         setAggregationResult(aggregationResultHolder, sum, (long) length);
         break;
@@ -84,9 +88,12 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
         sum = 0.0;
         long count = 0L;
         for (int i = 0; i < length; i++) {
-          AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
-          sum += value.getSum();
-          count += value.getCount();
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+            sum += value.getSum();
+            count += value.getCount();
+          }
         }
         setAggregationResult(aggregationResultHolder, sum, count);
         break;
@@ -114,17 +121,20 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          setGroupByResult(groupKeyArray[i], groupByResultHolder, valueArray[i], 1L);
+          setGroupByResult(groupKeyArray[i], groupByResultHolder, doubleValues[i], 1L);
         }
         break;
       case BYTES:
         // Serialized AvgPair
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
-          setGroupByResult(groupKeyArray[i], groupByResultHolder, value.getSum(), value.getCount());
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+            setGroupByResult(groupKeyArray[i], groupByResultHolder, value.getSum(), value.getCount());
+          }
         }
         break;
       default:
@@ -141,9 +151,9 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          double value = valueArray[i];
+          double value = doubleValues[i];
           for (int groupKey : groupKeysArray[i]) {
             setGroupByResult(groupKey, groupByResultHolder, value, 1L);
           }
@@ -153,11 +163,14 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
         // Serialized AvgPair
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
-          double sum = value.getSum();
-          long count = value.getCount();
-          for (int groupKey : groupKeysArray[i]) {
-            setGroupByResult(groupKey, groupByResultHolder, sum, count);
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+            double sum = value.getSum();
+            long count = value.getCount();
+            for (int groupKey : groupKeysArray[i]) {
+              setGroupByResult(groupKey, groupByResultHolder, sum, count);
+            }
           }
         }
         break;
@@ -181,7 +194,7 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
   public AvgPair extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
     AvgPair avgPair = aggregationResultHolder.getResult();
     if (avgPair == null) {
-      return new AvgPair(0.0, 0L);
+      return getDefaultAvgPair();
     } else {
       return avgPair;
     }
@@ -192,7 +205,7 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
   public AvgPair extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     AvgPair avgPair = groupByResultHolder.getResult(groupKey);
     if (avgPair == null) {
-      return new AvgPair(0.0, 0L);
+      return getDefaultAvgPair();
     } else {
       return avgPair;
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
index f35709c..25826e0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
@@ -34,6 +34,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
 public class DistinctCountHLLAggregationFunction implements AggregationFunction<HyperLogLog, Long> {
   public static final int DEFAULT_LOG2M = 8;
 
+  public static HyperLogLog getDefaultHyperLogLog() {
+    return new HyperLogLog(DEFAULT_LOG2M);
+  }
+
   @Nonnull
   @Override
   public AggregationFunctionType getType() {
@@ -105,7 +109,10 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         try {
           for (int i = 0; i < length; i++) {
-            hyperLogLog.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+            // Skip zero-length byte array
+            if (bytesValues[i].length != 0) {
+              hyperLogLog.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+            }
           }
         } catch (Exception e) {
           throw new RuntimeException("Caught exception while aggregating HyperLogLog", e);
@@ -156,8 +163,11 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         try {
           for (int i = 0; i < length; i++) {
-            setValueForGroupKey(groupByResultHolder, groupKeyArray[i],
-                ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+            // Skip zero-length byte array
+            if (bytesValues[i].length != 0) {
+              setValueForGroupKey(groupByResultHolder, groupKeyArray[i],
+                  ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+            }
           }
         } catch (Exception e) {
           throw new RuntimeException("Caught exception while aggregating HyperLogLog", e);
@@ -208,8 +218,11 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         try {
           for (int i = 0; i < length; i++) {
-            setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
-                ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+            // Skip zero-length byte array
+            if (bytesValues[i].length != 0) {
+              setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
+                  ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+            }
           }
         } catch (Exception e) {
           throw new RuntimeException("Caught exception while aggregating HyperLogLog", e);
@@ -225,7 +238,7 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
   public HyperLogLog extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
     HyperLogLog hyperLogLog = aggregationResultHolder.getResult();
     if (hyperLogLog == null) {
-      return new HyperLogLog(DEFAULT_LOG2M);
+      return getDefaultHyperLogLog();
     } else {
       return hyperLogLog;
     }
@@ -236,7 +249,7 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
   public HyperLogLog extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     HyperLogLog hyperLogLog = groupByResultHolder.getResult(groupKey);
     if (hyperLogLog == null) {
-      return new HyperLogLog(DEFAULT_LOG2M);
+      return getDefaultHyperLogLog();
     } else {
       return hyperLogLog;
     }
@@ -335,7 +348,7 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
   protected static HyperLogLog getHyperLogLog(@Nonnull AggregationResultHolder aggregationResultHolder) {
     HyperLogLog hyperLogLog = aggregationResultHolder.getResult();
     if (hyperLogLog == null) {
-      hyperLogLog = new HyperLogLog(DEFAULT_LOG2M);
+      hyperLogLog = getDefaultHyperLogLog();
       aggregationResultHolder.setValue(hyperLogLog);
     }
     return hyperLogLog;
@@ -351,7 +364,7 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
   protected static HyperLogLog getHyperLogLog(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     HyperLogLog hyperLogLog = groupByResultHolder.getResult(groupKey);
     if (hyperLogLog == null) {
-      hyperLogLog = new HyperLogLog(DEFAULT_LOG2M);
+      hyperLogLog = getDefaultHyperLogLog();
       groupByResultHolder.setValueForKey(groupKey, hyperLogLog);
     }
     return hyperLogLog;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
index 717817c..84d5e06 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
@@ -32,6 +32,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
 
 public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMaxRangePair, Double> {
 
+  public static MinMaxRangePair getDefaultMinMaxRangePair() {
+    return new MinMaxRangePair(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY);
+  }
+
   @Nonnull
   @Override
   public AggregationFunctionType getType() {
@@ -70,11 +74,11 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         double min = Double.POSITIVE_INFINITY;
         double max = Double.NEGATIVE_INFINITY;
         for (int i = 0; i < length; i++) {
-          double value = valueArray[i];
+          double value = doubleValues[i];
           if (value < min) {
             min = value;
           }
@@ -90,12 +94,15 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
         min = Double.POSITIVE_INFINITY;
         max = Double.NEGATIVE_INFINITY;
         for (int i = 0; i < length; i++) {
-          MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
-          if (value.getMin() < min) {
-            min = value.getMin();
-          }
-          if (value.getMax() > max) {
-            max = value.getMax();
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+            if (value.getMin() < min) {
+              min = value.getMin();
+            }
+            if (value.getMax() > max) {
+              max = value.getMax();
+            }
           }
         }
         setAggregationResult(aggregationResultHolder, min, max);
@@ -124,9 +131,9 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          double value = valueArray[i];
+          double value = doubleValues[i];
           setGroupByResult(groupKeyArray[i], groupByResultHolder, value, value);
         }
         break;
@@ -134,8 +141,11 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
         // Serialized MinMaxRangePair
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
-          setGroupByResult(groupKeyArray[i], groupByResultHolder, value.getMin(), value.getMax());
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+            setGroupByResult(groupKeyArray[i], groupByResultHolder, value.getMin(), value.getMax());
+          }
         }
         break;
       default:
@@ -152,9 +162,9 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          double value = valueArray[i];
+          double value = doubleValues[i];
           for (int groupKey : groupKeysArray[i]) {
             setGroupByResult(groupKey, groupByResultHolder, value, value);
           }
@@ -164,9 +174,12 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
         // Serialized MinMaxRangePair
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
-          for (int groupKey : groupKeysArray[i]) {
-            setGroupByResult(groupKey, groupByResultHolder, value.getMin(), value.getMax());
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+            for (int groupKey : groupKeysArray[i]) {
+              setGroupByResult(groupKey, groupByResultHolder, value.getMin(), value.getMax());
+            }
           }
         }
         break;
@@ -190,7 +203,7 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
   public MinMaxRangePair extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
     MinMaxRangePair minMaxRangePair = aggregationResultHolder.getResult();
     if (minMaxRangePair == null) {
-      return new MinMaxRangePair(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY);
+      return getDefaultMinMaxRangePair();
     } else {
       return minMaxRangePair;
     }
@@ -201,7 +214,7 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
   public MinMaxRangePair extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     MinMaxRangePair minMaxRangePair = groupByResultHolder.getResult(groupKey);
     if (minMaxRangePair == null) {
-      return new MinMaxRangePair(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY);
+      return getDefaultMinMaxRangePair();
     } else {
       return minMaxRangePair;
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
index 3c2c91b..6c0b8ae 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
@@ -33,6 +33,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
 public class PercentileEstAggregationFunction implements AggregationFunction<QuantileDigest, Long> {
   public static final double DEFAULT_MAX_ERROR = 0.05;
 
+  public static QuantileDigest getDefaultQuantileDigest() {
+    return new QuantileDigest(DEFAULT_MAX_ERROR);
+  }
+
   protected final int _percentile;
 
   public PercentileEstAggregationFunction(int percentile) {
@@ -79,16 +83,19 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        long[] longValues = blockValSets[0].getLongValuesSV();
         for (int i = 0; i < length; i++) {
-          quantileDigest.add((long) valueArray[i]);
+          quantileDigest.add(longValues[i]);
         }
         break;
       case BYTES:
         // Serialized QuantileDigest
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          quantileDigest.merge(ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]));
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            quantileDigest.merge(ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]));
+          }
         }
         break;
       default:
@@ -105,18 +112,21 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        long[] longValues = blockValSets[0].getLongValuesSV();
         for (int i = 0; i < length; i++) {
           QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKeyArray[i]);
-          quantileDigest.add((long) valueArray[i]);
+          quantileDigest.add(longValues[i]);
         }
         break;
       case BYTES:
         // Serialized QuantileDigest
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKeyArray[i]);
-          quantileDigest.merge(ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]));
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKeyArray[i]);
+            quantileDigest.merge(ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]));
+          }
         }
         break;
       default:
@@ -133,12 +143,12 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        long[] longValues = blockValSets[0].getLongValuesSV();
         for (int i = 0; i < length; i++) {
-          double value = valueArray[i];
+          long value = longValues[i];
           for (int groupKey : groupKeysArray[i]) {
             QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKey);
-            quantileDigest.add((long) value);
+            quantileDigest.add(value);
           }
         }
         break;
@@ -146,10 +156,13 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
         // Serialized QuantileDigest
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          QuantileDigest value = ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]);
-          for (int groupKey : groupKeysArray[i]) {
-            QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKey);
-            quantileDigest.merge(value);
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            QuantileDigest value = ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]);
+            for (int groupKey : groupKeysArray[i]) {
+              QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKey);
+              quantileDigest.merge(value);
+            }
           }
         }
         break;
@@ -163,7 +176,7 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
   public QuantileDigest extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
     QuantileDigest quantileDigest = aggregationResultHolder.getResult();
     if (quantileDigest == null) {
-      return new QuantileDigest(DEFAULT_MAX_ERROR);
+      return getDefaultQuantileDigest();
     } else {
       return quantileDigest;
     }
@@ -174,7 +187,7 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
   public QuantileDigest extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     QuantileDigest quantileDigest = groupByResultHolder.getResult(groupKey);
     if (quantileDigest == null) {
-      return new QuantileDigest(DEFAULT_MAX_ERROR);
+      return getDefaultQuantileDigest();
     } else {
       return quantileDigest;
     }
@@ -214,7 +227,7 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
   protected static QuantileDigest getQuantileDigest(@Nonnull AggregationResultHolder aggregationResultHolder) {
     QuantileDigest quantileDigest = aggregationResultHolder.getResult();
     if (quantileDigest == null) {
-      quantileDigest = new QuantileDigest(DEFAULT_MAX_ERROR);
+      quantileDigest = getDefaultQuantileDigest();
       aggregationResultHolder.setValue(quantileDigest);
     }
     return quantileDigest;
@@ -230,7 +243,7 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
   protected static QuantileDigest getQuantileDigest(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     QuantileDigest quantileDigest = groupByResultHolder.getResult(groupKey);
     if (quantileDigest == null) {
-      quantileDigest = new QuantileDigest(DEFAULT_MAX_ERROR);
+      quantileDigest = getDefaultQuantileDigest();
       groupByResultHolder.setValueForKey(groupKey, quantileDigest);
     }
     return quantileDigest;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
index 0d4db7f..d037535 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
@@ -37,6 +37,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
 public class PercentileTDigestAggregationFunction implements AggregationFunction<TDigest, Double> {
   public static final int DEFAULT_TDIGEST_COMPRESSION = 100;
 
+  public static TDigest getDefaultTDigest() {
+    return TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+  }
+
   protected final int _percentile;
 
   public PercentileTDigestAggregationFunction(int percentile) {
@@ -83,16 +87,19 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          tDigest.add(valueArray[i]);
+          tDigest.add(doubleValues[i]);
         }
         break;
       case BYTES:
         // Serialized TDigest
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          tDigest.add(ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i])));
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            tDigest.add(ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i])));
+          }
         }
         break;
       default:
@@ -109,18 +116,21 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
           TDigest tDigest = getTDigest(groupByResultHolder, groupKeyArray[i]);
-          tDigest.add(valueArray[i]);
+          tDigest.add(doubleValues[i]);
         }
         break;
       case BYTES:
         // Serialized TDigest
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          TDigest tDigest = getTDigest(groupByResultHolder, groupKeyArray[i]);
-          tDigest.add(ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i])));
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            TDigest tDigest = getTDigest(groupByResultHolder, groupKeyArray[i]);
+            tDigest.add(ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i])));
+          }
         }
         break;
       default:
@@ -137,9 +147,9 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
       case LONG:
       case FLOAT:
       case DOUBLE:
-        double[] valueArray = blockValSets[0].getDoubleValuesSV();
+        double[] doubleValues = blockValSets[0].getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          double value = valueArray[i];
+          double value = doubleValues[i];
           for (int groupKey : groupKeysArray[i]) {
             TDigest tDigest = getTDigest(groupByResultHolder, groupKey);
             tDigest.add(value);
@@ -149,10 +159,13 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
         // Serialized QuantileDigest
         byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          TDigest value = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i]));
-          for (int groupKey : groupKeysArray[i]) {
-            TDigest tDigest = getTDigest(groupByResultHolder, groupKey);
-            tDigest.add(value);
+          // Skip zero-length byte array
+          if (bytesValues[i].length != 0) {
+            TDigest value = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i]));
+            for (int groupKey : groupKeysArray[i]) {
+              TDigest tDigest = getTDigest(groupByResultHolder, groupKey);
+              tDigest.add(value);
+            }
           }
         }
         break;
@@ -166,7 +179,7 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
   public TDigest extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
     TDigest tDigest = aggregationResultHolder.getResult();
     if (tDigest == null) {
-      return TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+      return getDefaultTDigest();
     } else {
       return tDigest;
     }
@@ -177,7 +190,7 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
   public TDigest extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     TDigest tDigest = groupByResultHolder.getResult(groupKey);
     if (tDigest == null) {
-      return TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+      return getDefaultTDigest();
     } else {
       return tDigest;
     }
@@ -229,7 +242,7 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
   protected static TDigest getTDigest(@Nonnull AggregationResultHolder aggregationResultHolder) {
     TDigest tDigest = aggregationResultHolder.getResult();
     if (tDigest == null) {
-      tDigest = TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+      tDigest = getDefaultTDigest();
       aggregationResultHolder.setValue(tDigest);
     }
     return tDigest;
@@ -245,7 +258,7 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
   protected static TDigest getTDigest(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
     TDigest tDigest = groupByResultHolder.getResult(groupKey);
     if (tDigest == null) {
-      tDigest = TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+      tDigest = getDefaultTDigest();
       groupByResultHolder.setValueForKey(groupKey, tDigest);
     }
     return tDigest;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index 8350ee6..74515b0 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -137,7 +137,7 @@ public class ObjectSerDeUtilsTest {
   @Test
   public void testQuantileDigest() {
     for (int i = 0; i < NUM_ITERATIONS; i++) {
-      QuantileDigest expected = new QuantileDigest(PercentileEstAggregationFunction.DEFAULT_MAX_ERROR);
+      QuantileDigest expected = PercentileEstAggregationFunction.getDefaultQuantileDigest();
       int size = RANDOM.nextInt(100) + 1;
       for (int j = 0; j < size; j++) {
         expected.add(RANDOM.nextLong());
@@ -188,7 +188,7 @@ public class ObjectSerDeUtilsTest {
   @Test
   public void testTDigest() {
     for (int i = 0; i < NUM_ITERATIONS; i++) {
-      TDigest expected = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+      TDigest expected = PercentileTDigestAggregationFunction.getDefaultTDigest();
       int size = RANDOM.nextInt(100) + 1;
       for (int j = 0; j < size; j++) {
         expected.add(RANDOM.nextDouble());
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithBytesTypeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithBytesTypeTest.java
index 6d6d810..7749a44 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithBytesTypeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithBytesTypeTest.java
@@ -285,7 +285,7 @@ public class SegmentGenerationWithBytesTypeTest {
       for (int i = 0; i < NUM_ROWS; i++) {
         GenericData.Record record = new GenericData.Record(avroSchema);
 
-        TDigest tDigest = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+        TDigest tDigest = PercentileTDigestAggregationFunction.getDefaultTDigest();
         tDigest.add(_random.nextDouble());
 
         ByteBuffer buffer = ByteBuffer.allocate(tDigest.byteSize());
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DefaultBytesQueriesTest.java
similarity index 55%
copy from pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
copy to pinot-core/src/test/java/org/apache/pinot/queries/DefaultBytesQueriesTest.java
index 67ab0fa..1686513 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DefaultBytesQueriesTest.java
@@ -18,17 +18,14 @@
  */
 package org.apache.pinot.queries;
 
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import com.tdunning.math.stats.TDigest;
-import it.unimi.dsi.fastutil.doubles.DoubleList;
 import java.io.File;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.data.DimensionFieldSpec;
@@ -51,7 +48,9 @@ import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
 import org.apache.pinot.core.operator.query.AggregationOperator;
-import org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
+import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
+import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
 import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -60,34 +59,33 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 
 /**
- * Tests for PERCENTILE_TDIGEST aggregation function.
+ * Tests for default bytes (zero-length byte array) values.
  *
+ * <p>Aggregation function that supports bytes values:
  * <ul>
- *   <li>Generates a segment with a double column, a TDigest column and a group-by column</li>
- *   <li>Runs aggregation and group-by queries on the generated segment</li>
- *   <li>
- *     Compares the results for PERCENTILE_TDIGEST on double column and TDigest column with results for PERCENTILE on
- *     double column
- *   </li>
+ *   <li>AVG</li>
+ *   <li>DISTINCTCOUNTHLL</li>
+ *   <li>MINMAXRANGE</li>
+ *   <li>PERCENTILEEST</li>
+ *   <li>PERCENTILETDIGEST</li>
  * </ul>
  */
-public class PercentileTDigestQueriesTest extends BaseQueriesTest {
-  protected static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "PercentileTDigestQueriesTest");
+public class DefaultBytesQueriesTest extends BaseQueriesTest {
+  protected static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "DefaultBytesQueriesTest");
   protected static final String TABLE_NAME = "testTable";
   protected static final String SEGMENT_NAME = "testSegment";
 
   protected static final int NUM_ROWS = 1000;
-  protected static final double VALUE_RANGE = Integer.MAX_VALUE;
-  protected static final double DELTA = 0.05 * VALUE_RANGE; // Allow 5% quantile error
-  protected static final String DOUBLE_COLUMN = "doubleColumn";
-  protected static final String TDIGEST_COLUMN = "tDigestColumn";
+  protected static final String BYTES_COLUMN = "bytesColumn";
   protected static final String GROUP_BY_COLUMN = "groupByColumn";
   protected static final String[] GROUPS = new String[]{"G1", "G2", "G3"};
   protected static final long RANDOM_SEED = System.nanoTime();
   protected static final Random RANDOM = new Random(RANDOM_SEED);
-  protected static final String ERROR_MESSAGE = "Random seed: " + RANDOM_SEED;
 
   private ImmutableSegment _indexSegment;
   private List<SegmentDataManager> _segmentDataManagers;
@@ -118,20 +116,13 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
         Arrays.asList(new ImmutableSegmentDataManager(_indexSegment), new ImmutableSegmentDataManager(_indexSegment));
   }
 
-  protected void buildSegment()
+  private void buildSegment()
       throws Exception {
     List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
     for (int i = 0; i < NUM_ROWS; i++) {
       HashMap<String, Object> valueMap = new HashMap<>();
 
-      double value = RANDOM.nextDouble() * VALUE_RANGE;
-      valueMap.put(DOUBLE_COLUMN, value);
-
-      TDigest tDigest = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
-      tDigest.add(value);
-      ByteBuffer byteBuffer = ByteBuffer.allocate(tDigest.byteSize());
-      tDigest.asBytes(byteBuffer);
-      valueMap.put(TDIGEST_COLUMN, byteBuffer.array());
+      valueMap.put(BYTES_COLUMN, FieldSpec.DEFAULT_METRIC_NULL_VALUE_OF_BYTES);
 
       String group = GROUPS[RANDOM.nextInt(GROUPS.length)];
       valueMap.put(GROUP_BY_COLUMN, group);
@@ -142,15 +133,13 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
     }
 
     Schema schema = new Schema();
-    schema.addField(new MetricFieldSpec(DOUBLE_COLUMN, FieldSpec.DataType.DOUBLE));
-    schema.addField(new MetricFieldSpec(TDIGEST_COLUMN, FieldSpec.DataType.BYTES));
+    schema.addField(new MetricFieldSpec(BYTES_COLUMN, FieldSpec.DataType.BYTES));
     schema.addField(new DimensionFieldSpec(GROUP_BY_COLUMN, FieldSpec.DataType.STRING, true));
 
     SegmentGeneratorConfig config = new SegmentGeneratorConfig(schema);
     config.setOutDir(INDEX_DIR.getPath());
     config.setTableName(TABLE_NAME);
     config.setSegmentName(SEGMENT_NAME);
-    config.setRawIndexCreationColumns(Collections.singletonList(TDIGEST_COLUMN));
 
     SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
     try (RecordReader recordReader = new GenericRowRecordReader(rows, schema)) {
@@ -162,97 +151,128 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
   @Test
   public void testInnerSegmentAggregation() {
     // For inner segment case, percentile does not affect the intermediate result
-    AggregationOperator aggregationOperator = getOperatorForQuery(getAggregationQuery(0));
+    AggregationOperator aggregationOperator = getOperatorForQuery(getAggregationQuery());
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     List<Object> aggregationResult = resultsBlock.getAggregationResult();
     Assert.assertNotNull(aggregationResult);
-    Assert.assertEquals(aggregationResult.size(), 3);
-    DoubleList doubleList = (DoubleList) aggregationResult.get(0);
-    Collections.sort(doubleList);
-    assertTDigest((TDigest) aggregationResult.get(1), doubleList);
-    assertTDigest((TDigest) aggregationResult.get(2), doubleList);
+    assertEquals(aggregationResult.size(), 5);
+    // Avg
+    AvgPair avgPair = (AvgPair) aggregationResult.get(0);
+    assertEquals(avgPair.getSum(), 0.0);
+    assertEquals(avgPair.getCount(), 0L);
+    // DistinctCountHLL
+    HyperLogLog hyperLogLog = (HyperLogLog) aggregationResult.get(1);
+    assertEquals(hyperLogLog.cardinality(), 0L);
+    // MinMaxRange
+    MinMaxRangePair minMaxRangePair = (MinMaxRangePair) aggregationResult.get(2);
+    assertEquals(minMaxRangePair.getMax(), Double.NEGATIVE_INFINITY);
+    assertEquals(minMaxRangePair.getMin(), Double.POSITIVE_INFINITY);
+    // PercentileEst
+    QuantileDigest quantileDigest = (QuantileDigest) aggregationResult.get(3);
+    assertEquals(quantileDigest.getQuantile(0.5), Long.MIN_VALUE);
+    // PercentileTDigest
+    TDigest tDigest = (TDigest) aggregationResult.get(4);
+    assertTrue(Double.isNaN(tDigest.quantile(0.5)));
   }
 
   @Test
   public void testInterSegmentAggregation() {
     for (int percentile = 0; percentile <= 100; percentile++) {
-      BrokerResponseNative brokerResponse = getBrokerResponseForQuery(getAggregationQuery(percentile));
+      BrokerResponseNative brokerResponse = getBrokerResponseForQuery(getAggregationQuery());
       List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
       Assert.assertNotNull(aggregationResults);
-      Assert.assertEquals(aggregationResults.size(), 3);
-      double expected = Double.parseDouble((String) aggregationResults.get(0).getValue());
-      double resultForDoubleColumn = Double.parseDouble((String) aggregationResults.get(1).getValue());
-      Assert.assertEquals(resultForDoubleColumn, expected, DELTA, ERROR_MESSAGE);
-      double resultForTDigestColumn = Double.parseDouble((String) aggregationResults.get(2).getValue());
-      Assert.assertEquals(resultForTDigestColumn, expected, DELTA, ERROR_MESSAGE);
+      assertEquals(aggregationResults.size(), 5);
+      // Avg
+      assertEquals(Double.parseDouble((String) aggregationResults.get(0).getValue()), Double.NEGATIVE_INFINITY);
+      // DistinctCountHLL
+      assertEquals(Long.parseLong((String) aggregationResults.get(1).getValue()), 0L);
+      // MinMaxRange
+      assertEquals(Double.parseDouble((String) aggregationResults.get(2).getValue()), Double.NEGATIVE_INFINITY);
+      // PercentileEst
+      assertEquals(Long.parseLong((String) aggregationResults.get(3).getValue()), Long.MIN_VALUE);
+      // PercentileTDigest
+      assertTrue(Double.isNaN(Double.parseDouble((String) aggregationResults.get(4).getValue())));
     }
   }
 
   @Test
   public void testInnerSegmentGroupBy() {
     // For inner segment case, percentile does not affect the intermediate result
-    AggregationGroupByOperator groupByOperator = getOperatorForQuery(getGroupByQuery(0));
+    AggregationGroupByOperator groupByOperator = getOperatorForQuery(getGroupByQuery());
     IntermediateResultsBlock resultsBlock = groupByOperator.nextBlock();
     AggregationGroupByResult groupByResult = resultsBlock.getAggregationGroupByResult();
     Assert.assertNotNull(groupByResult);
     Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = groupByResult.getGroupKeyIterator();
     while (groupKeyIterator.hasNext()) {
       GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
-      DoubleList doubleList = (DoubleList) groupByResult.getResultForKey(groupKey, 0);
-      Collections.sort(doubleList);
-      assertTDigest((TDigest) groupByResult.getResultForKey(groupKey, 1), doubleList);
-      assertTDigest((TDigest) groupByResult.getResultForKey(groupKey, 2), doubleList);
+      // Avg
+      AvgPair avgPair = (AvgPair) groupByResult.getResultForKey(groupKey, 0);
+      assertEquals(avgPair.getSum(), 0.0);
+      assertEquals(avgPair.getCount(), 0L);
+      // DistinctCountHLL
+      HyperLogLog hyperLogLog = (HyperLogLog) groupByResult.getResultForKey(groupKey, 1);
+      assertEquals(hyperLogLog.cardinality(), 0L);
+      // MinMaxRange
+      MinMaxRangePair minMaxRangePair = (MinMaxRangePair) groupByResult.getResultForKey(groupKey, 2);
+      assertEquals(minMaxRangePair.getMax(), Double.NEGATIVE_INFINITY);
+      assertEquals(minMaxRangePair.getMin(), Double.POSITIVE_INFINITY);
+      // PercentileEst
+      QuantileDigest quantileDigest = (QuantileDigest) groupByResult.getResultForKey(groupKey, 3);
+      assertEquals(quantileDigest.getQuantile(0.5), Long.MIN_VALUE);
+      // PercentileTDigest
+      TDigest tDigest = (TDigest) groupByResult.getResultForKey(groupKey, 4);
+      assertTrue(Double.isNaN(tDigest.quantile(0.5)));
     }
   }
 
   @Test
   public void testInterSegmentGroupBy() {
     for (int percentile = 0; percentile <= 100; percentile++) {
-      BrokerResponseNative brokerResponse = getBrokerResponseForQuery(getGroupByQuery(percentile));
+      BrokerResponseNative brokerResponse = getBrokerResponseForQuery(getGroupByQuery());
       List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
       Assert.assertNotNull(aggregationResults);
-      Assert.assertEquals(aggregationResults.size(), 3);
-      Map<String, Double> expectedValues = new HashMap<>();
-      for (GroupByResult groupByResult : aggregationResults.get(0).getGroupByResult()) {
-        expectedValues.put(groupByResult.getGroup().get(0), Double.parseDouble((String) groupByResult.getValue()));
+      assertEquals(aggregationResults.size(), 5);
+      // Avg
+      List<GroupByResult> groupByResults = aggregationResults.get(0).getGroupByResult();
+      assertEquals(groupByResults.size(), 3);
+      for (GroupByResult groupByResult : groupByResults) {
+        assertEquals(Double.parseDouble((String) groupByResult.getValue()), Double.NEGATIVE_INFINITY);
+      }
+      // DistinctCountHLL
+      groupByResults = aggregationResults.get(1).getGroupByResult();
+      assertEquals(groupByResults.size(), 3);
+      for (GroupByResult groupByResult : groupByResults) {
+        assertEquals(Long.parseLong((String) groupByResult.getValue()), 0L);
       }
-      for (GroupByResult groupByResult : aggregationResults.get(1).getGroupByResult()) {
-        String group = groupByResult.getGroup().get(0);
-        double expected = expectedValues.get(group);
-        double resultForDoubleColumn = Double.parseDouble((String) groupByResult.getValue());
-        Assert.assertEquals(resultForDoubleColumn, expected, DELTA, ERROR_MESSAGE);
+      // MinMaxRange
+      groupByResults = aggregationResults.get(2).getGroupByResult();
+      assertEquals(groupByResults.size(), 3);
+      for (GroupByResult groupByResult : groupByResults) {
+        assertEquals(Double.parseDouble((String) groupByResult.getValue()), Double.NEGATIVE_INFINITY);
       }
-      for (GroupByResult groupByResult : aggregationResults.get(2).getGroupByResult()) {
-        String group = groupByResult.getGroup().get(0);
-        double expected = expectedValues.get(group);
-        double resultForTDigestColumn = Double.parseDouble((String) groupByResult.getValue());
-        Assert.assertEquals(resultForTDigestColumn, expected, DELTA, ERROR_MESSAGE);
+      // PercentileEst
+      groupByResults = aggregationResults.get(3).getGroupByResult();
+      assertEquals(groupByResults.size(), 3);
+      for (GroupByResult groupByResult : groupByResults) {
+        assertEquals(Long.parseLong((String) groupByResult.getValue()), Long.MIN_VALUE);
+      }
+      // PercentileTDigest
+      groupByResults = aggregationResults.get(4).getGroupByResult();
+      assertEquals(groupByResults.size(), 3);
+      for (GroupByResult groupByResult : groupByResults) {
+        assertTrue(Double.isNaN(Double.parseDouble((String) groupByResult.getValue())));
       }
     }
   }
 
-  protected String getAggregationQuery(int percentile) {
-    return String
-        .format("SELECT PERCENTILE%d(%s), PERCENTILETDIGEST%d(%s), PERCENTILETDIGEST%d(%s) FROM %s", percentile,
-            DOUBLE_COLUMN, percentile, DOUBLE_COLUMN, percentile, TDIGEST_COLUMN, TABLE_NAME);
+  private String getAggregationQuery() {
+    return String.format(
+        "SELECT AVG(%s), DISTINCTCOUNTHLL(%s), MINMAXRANGE(%s), PERCENTILEEST50(%s), PERCENTILETDIGEST50(%s) FROM %s",
+        BYTES_COLUMN, BYTES_COLUMN, BYTES_COLUMN, BYTES_COLUMN, BYTES_COLUMN, TABLE_NAME);
   }
 
-  private String getGroupByQuery(int percentile) {
-    return String.format("%s GROUP BY %s", getAggregationQuery(percentile), GROUP_BY_COLUMN);
-  }
-
-  private void assertTDigest(TDigest tDigest, DoubleList doubleList) {
-    for (int percentile = 0; percentile <= 100; percentile++) {
-      double expected;
-      if (percentile == 100) {
-        expected = doubleList.getDouble(doubleList.size() - 1);
-      } else {
-        expected = doubleList.getDouble(doubleList.size() * percentile / 100);
-      }
-      Assert
-          .assertEquals(PercentileTDigestAggregationFunction.calculatePercentile(tDigest, percentile), expected, DELTA,
-              ERROR_MESSAGE);
-    }
+  private String getGroupByQuery() {
+    return String.format("%s GROUP BY %s", getAggregationQuery(), GROUP_BY_COLUMN);
   }
 
   @AfterClass
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestMVQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestMVQueriesTest.java
index 4f2bf59..a9f6218 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestMVQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestMVQueriesTest.java
@@ -60,7 +60,7 @@ public class PercentileTDigestMVQueriesTest extends PercentileTDigestQueriesTest
 
       int numMultiValues = RANDOM.nextInt(MAX_NUM_MULTI_VALUES) + 1;
       Double[] values = new Double[numMultiValues];
-      TDigest tDigest = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+      TDigest tDigest = PercentileTDigestAggregationFunction.getDefaultTDigest();
       for (int j = 0; j < numMultiValues; j++) {
         double value = RANDOM.nextDouble() * VALUE_RANGE;
         values[j] = value;
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
index 67ab0fa..6206120 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
@@ -127,7 +127,7 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
       double value = RANDOM.nextDouble() * VALUE_RANGE;
       valueMap.put(DOUBLE_COLUMN, value);
 
-      TDigest tDigest = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+      TDigest tDigest = PercentileTDigestAggregationFunction.getDefaultTDigest();
       tDigest.add(value);
       ByteBuffer byteBuffer = ByteBuffer.allocate(tDigest.byteSize());
       tDigest.asBytes(byteBuffer);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org