You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/16 22:16:15 UTC
[incubator-pinot] 01/01: Support default BYTES (zero-length byte
array) in aggregation function and aggregator
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch default_bytes
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit c8169e409e2217064e42c268077786d21da46134
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Thu May 16 15:13:57 2019 -0700
Support default BYTES (zero-length byte array) in aggregation function and aggregator
Treat zero-length byte array as null, and skip processing it
Add DefaultBytesQueriesTest for test coverage
---
.../core/data/aggregator/AvgValueAggregator.java | 15 +-
.../DistinctCountHLLValueAggregator.java | 24 ++-
.../aggregator/MinMaxRangeValueAggregator.java | 15 +-
.../aggregator/PercentileEstValueAggregator.java | 21 ++-
.../PercentileTDigestValueAggregator.java | 23 ++-
.../function/AvgAggregationFunction.java | 49 ++++--
.../DistinctCountHLLAggregationFunction.java | 31 +++-
.../function/MinMaxRangeAggregationFunction.java | 51 +++---
.../function/PercentileEstAggregationFunction.java | 49 ++++--
.../PercentileTDigestAggregationFunction.java | 47 ++++--
.../pinot/core/common/ObjectSerDeUtilsTest.java | 4 +-
.../SegmentGenerationWithBytesTypeTest.java | 2 +-
...eriesTest.java => DefaultBytesQueriesTest.java} | 188 ++++++++++++---------
.../queries/PercentileTDigestMVQueriesTest.java | 2 +-
.../queries/PercentileTDigestQueriesTest.java | 2 +-
15 files changed, 331 insertions(+), 192 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
index e5841f7..a189105 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.aggregator;
import org.apache.pinot.common.data.FieldSpec.DataType;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.core.query.aggregation.function.AvgAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
@@ -40,7 +41,13 @@ public class AvgValueAggregator implements ValueAggregator<Object, AvgPair> {
@Override
public AvgPair getInitialAggregatedValue(Object rawValue) {
if (rawValue instanceof byte[]) {
- return deserializeAggregatedValue((byte[]) rawValue);
+ // Use default value for zero-length byte array
+ byte[] bytes = (byte[]) rawValue;
+ if (bytes.length != 0) {
+ return deserializeAggregatedValue(bytes);
+ } else {
+ return AvgAggregationFunction.getDefaultAvgPair();
+ }
} else {
return new AvgPair(((Number) rawValue).doubleValue(), 1L);
}
@@ -49,7 +56,11 @@ public class AvgValueAggregator implements ValueAggregator<Object, AvgPair> {
@Override
public AvgPair applyRawValue(AvgPair value, Object rawValue) {
if (rawValue instanceof byte[]) {
- value.apply(deserializeAggregatedValue((byte[]) rawValue));
+ // Skip zero-length byte array
+ byte[] bytes = (byte[]) rawValue;
+ if (bytes.length != 0) {
+ value.apply(deserializeAggregatedValue(bytes));
+ }
} else {
value.apply(((Number) rawValue).doubleValue(), 1L);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
index 15ad278..5c0eab9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
@@ -47,11 +47,17 @@ public class DistinctCountHLLValueAggregator implements ValueAggregator<Object,
public HyperLogLog getInitialAggregatedValue(Object rawValue) {
HyperLogLog initialValue;
if (rawValue instanceof byte[]) {
+ // Use default value for zero-length byte array
byte[] bytes = (byte[]) rawValue;
- initialValue = deserializeAggregatedValue(bytes);
- _maxByteSize = Math.max(_maxByteSize, bytes.length);
+ if (bytes.length != 0) {
+ initialValue = deserializeAggregatedValue(bytes);
+ _maxByteSize = Math.max(_maxByteSize, bytes.length);
+ } else {
+ initialValue = DistinctCountHLLAggregationFunction.getDefaultHyperLogLog();
+ _maxByteSize = Math.max(_maxByteSize, DEFAULT_LOG2M_BYTE_SIZE);
+ }
} else {
- initialValue = new HyperLogLog(DistinctCountHLLAggregationFunction.DEFAULT_LOG2M);
+ initialValue = DistinctCountHLLAggregationFunction.getDefaultHyperLogLog();
initialValue.offer(rawValue);
_maxByteSize = Math.max(_maxByteSize, DEFAULT_LOG2M_BYTE_SIZE);
}
@@ -61,10 +67,14 @@ public class DistinctCountHLLValueAggregator implements ValueAggregator<Object,
@Override
public HyperLogLog applyRawValue(HyperLogLog value, Object rawValue) {
if (rawValue instanceof byte[]) {
- try {
- value.addAll(deserializeAggregatedValue((byte[]) rawValue));
- } catch (CardinalityMergeException e) {
- throw new RuntimeException(e);
+ // Skip zero-length byte array
+ byte[] bytes = (byte[]) rawValue;
+ if (bytes.length != 0) {
+ try {
+ value.addAll(deserializeAggregatedValue(bytes));
+ } catch (CardinalityMergeException e) {
+ throw new RuntimeException(e);
+ }
}
} else {
value.offer(rawValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
index a6a8d55..2d176df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.aggregator;
import org.apache.pinot.common.data.FieldSpec.DataType;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.core.query.aggregation.function.MinMaxRangeAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
@@ -40,7 +41,13 @@ public class MinMaxRangeValueAggregator implements ValueAggregator<Object, MinMa
@Override
public MinMaxRangePair getInitialAggregatedValue(Object rawValue) {
if (rawValue instanceof byte[]) {
- return deserializeAggregatedValue((byte[]) rawValue);
+ // Use default value for zero-length byte array
+ byte[] bytes = (byte[]) rawValue;
+ if (bytes.length != 0) {
+ return deserializeAggregatedValue(bytes);
+ } else {
+ return MinMaxRangeAggregationFunction.getDefaultMinMaxRangePair();
+ }
} else {
double doubleValue = ((Number) rawValue).doubleValue();
return new MinMaxRangePair(doubleValue, doubleValue);
@@ -50,7 +57,11 @@ public class MinMaxRangeValueAggregator implements ValueAggregator<Object, MinMa
@Override
public MinMaxRangePair applyRawValue(MinMaxRangePair value, Object rawValue) {
if (rawValue instanceof byte[]) {
- value.apply(deserializeAggregatedValue((byte[]) rawValue));
+ // Skip zero-length byte array
+ byte[] bytes = (byte[]) rawValue;
+ if (bytes.length != 0) {
+ value.apply(deserializeAggregatedValue(bytes));
+ }
} else {
double doubleValue = ((Number) rawValue).doubleValue();
value.apply(doubleValue, doubleValue);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
index a57f324..c9a9e8c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
@@ -44,11 +44,17 @@ public class PercentileEstValueAggregator implements ValueAggregator<Object, Qua
public QuantileDigest getInitialAggregatedValue(Object rawValue) {
QuantileDigest initialValue;
if (rawValue instanceof byte[]) {
+ // Use default value for zero-length byte array
byte[] bytes = (byte[]) rawValue;
- initialValue = deserializeAggregatedValue(bytes);
- _maxByteSize = Math.max(_maxByteSize, bytes.length);
+ if (bytes.length != 0) {
+ initialValue = deserializeAggregatedValue(bytes);
+ _maxByteSize = Math.max(_maxByteSize, bytes.length);
+ } else {
+ initialValue = PercentileEstAggregationFunction.getDefaultQuantileDigest();
+ _maxByteSize = Math.max(_maxByteSize, initialValue.getByteSize());
+ }
} else {
- initialValue = new QuantileDigest(PercentileEstAggregationFunction.DEFAULT_MAX_ERROR);
+ initialValue = PercentileEstAggregationFunction.getDefaultQuantileDigest();
initialValue.add(((Number) rawValue).longValue());
_maxByteSize = Math.max(_maxByteSize, initialValue.getByteSize());
}
@@ -58,11 +64,16 @@ public class PercentileEstValueAggregator implements ValueAggregator<Object, Qua
@Override
public QuantileDigest applyRawValue(QuantileDigest value, Object rawValue) {
if (rawValue instanceof byte[]) {
- value.merge(deserializeAggregatedValue((byte[]) rawValue));
+ // Skip zero-length byte array
+ byte[] bytes = (byte[]) rawValue;
+ if (bytes.length != 0) {
+ value.merge(deserializeAggregatedValue(bytes));
+ _maxByteSize = Math.max(_maxByteSize, value.getByteSize());
+ }
} else {
value.add(((Number) rawValue).longValue());
+ _maxByteSize = Math.max(_maxByteSize, value.getByteSize());
}
- _maxByteSize = Math.max(_maxByteSize, value.getByteSize());
return value;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
index 9d398eb..545ed7f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
@@ -44,11 +44,17 @@ public class PercentileTDigestValueAggregator implements ValueAggregator<Object,
public TDigest getInitialAggregatedValue(Object rawValue) {
TDigest initialValue;
if (rawValue instanceof byte[]) {
+ // Use default value for zero-length byte array
byte[] bytes = (byte[]) rawValue;
- initialValue = deserializeAggregatedValue(bytes);
- _maxByteSize = Math.max(_maxByteSize, bytes.length);
+ if (bytes.length != 0) {
+ initialValue = deserializeAggregatedValue(bytes);
+ _maxByteSize = Math.max(_maxByteSize, bytes.length);
+ } else {
+ initialValue = PercentileTDigestAggregationFunction.getDefaultTDigest();
+ _maxByteSize = Math.max(_maxByteSize, initialValue.byteSize());
+ }
} else {
- initialValue = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+ initialValue = PercentileTDigestAggregationFunction.getDefaultTDigest();
initialValue.add(((Number) rawValue).doubleValue());
_maxByteSize = Math.max(_maxByteSize, initialValue.byteSize());
}
@@ -58,11 +64,16 @@ public class PercentileTDigestValueAggregator implements ValueAggregator<Object,
@Override
public TDigest applyRawValue(TDigest value, Object rawValue) {
if (rawValue instanceof byte[]) {
- value.add(deserializeAggregatedValue((byte[]) rawValue));
+ // Skip zero-length byte array
+ byte[] bytes = (byte[]) rawValue;
+ if (bytes.length != 0) {
+ value.add(deserializeAggregatedValue(bytes));
+ _maxByteSize = Math.max(_maxByteSize, value.byteSize());
+ }
} else {
- value.add(((Number) rawValue).doubleValue());
+ value.add(((Number) rawValue).longValue());
+ _maxByteSize = Math.max(_maxByteSize, value.byteSize());
}
- _maxByteSize = Math.max(_maxByteSize, value.byteSize());
return value;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
index 6aef5f0..a56ffbe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
@@ -33,6 +33,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
public class AvgAggregationFunction implements AggregationFunction<AvgPair, Double> {
private static final double DEFAULT_FINAL_RESULT = Double.NEGATIVE_INFINITY;
+ public static AvgPair getDefaultAvgPair() {
+ return new AvgPair(0.0, 0L);
+ }
+
@Nonnull
@Override
public AggregationFunctionType getType() {
@@ -71,10 +75,10 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
case LONG:
case FLOAT:
case DOUBLE:
- double[] valueArray = blockValSets[0].getDoubleValuesSV();
+ double[] doubleValues = blockValSets[0].getDoubleValuesSV();
double sum = 0.0;
for (int i = 0; i < length; i++) {
- sum += valueArray[i];
+ sum += doubleValues[i];
}
setAggregationResult(aggregationResultHolder, sum, (long) length);
break;
@@ -84,9 +88,12 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
sum = 0.0;
long count = 0L;
for (int i = 0; i < length; i++) {
- AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
- sum += value.getSum();
- count += value.getCount();
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+ sum += value.getSum();
+ count += value.getCount();
+ }
}
setAggregationResult(aggregationResultHolder, sum, count);
break;
@@ -114,17 +121,20 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
case LONG:
case FLOAT:
case DOUBLE:
- double[] valueArray = blockValSets[0].getDoubleValuesSV();
+ double[] doubleValues = blockValSets[0].getDoubleValuesSV();
for (int i = 0; i < length; i++) {
- setGroupByResult(groupKeyArray[i], groupByResultHolder, valueArray[i], 1L);
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, doubleValues[i], 1L);
}
break;
case BYTES:
// Serialized AvgPair
byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
for (int i = 0; i < length; i++) {
- AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
- setGroupByResult(groupKeyArray[i], groupByResultHolder, value.getSum(), value.getCount());
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, value.getSum(), value.getCount());
+ }
}
break;
default:
@@ -141,9 +151,9 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
case LONG:
case FLOAT:
case DOUBLE:
- double[] valueArray = blockValSets[0].getDoubleValuesSV();
+ double[] doubleValues = blockValSets[0].getDoubleValuesSV();
for (int i = 0; i < length; i++) {
- double value = valueArray[i];
+ double value = doubleValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, 1L);
}
@@ -153,11 +163,14 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
// Serialized AvgPair
byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
for (int i = 0; i < length; i++) {
- AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
- double sum = value.getSum();
- long count = value.getCount();
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, sum, count);
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ AvgPair value = ObjectSerDeUtils.AVG_PAIR_SER_DE.deserialize(bytesValues[i]);
+ double sum = value.getSum();
+ long count = value.getCount();
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, sum, count);
+ }
}
}
break;
@@ -181,7 +194,7 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
public AvgPair extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
AvgPair avgPair = aggregationResultHolder.getResult();
if (avgPair == null) {
- return new AvgPair(0.0, 0L);
+ return getDefaultAvgPair();
} else {
return avgPair;
}
@@ -192,7 +205,7 @@ public class AvgAggregationFunction implements AggregationFunction<AvgPair, Doub
public AvgPair extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
AvgPair avgPair = groupByResultHolder.getResult(groupKey);
if (avgPair == null) {
- return new AvgPair(0.0, 0L);
+ return getDefaultAvgPair();
} else {
return avgPair;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
index f35709c..25826e0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
@@ -34,6 +34,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
public class DistinctCountHLLAggregationFunction implements AggregationFunction<HyperLogLog, Long> {
public static final int DEFAULT_LOG2M = 8;
+ public static HyperLogLog getDefaultHyperLogLog() {
+ return new HyperLogLog(DEFAULT_LOG2M);
+ }
+
@Nonnull
@Override
public AggregationFunctionType getType() {
@@ -105,7 +109,10 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
try {
for (int i = 0; i < length; i++) {
- hyperLogLog.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ hyperLogLog.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+ }
}
} catch (Exception e) {
throw new RuntimeException("Caught exception while aggregating HyperLogLog", e);
@@ -156,8 +163,11 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
try {
for (int i = 0; i < length; i++) {
- setValueForGroupKey(groupByResultHolder, groupKeyArray[i],
- ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ setValueForGroupKey(groupByResultHolder, groupKeyArray[i],
+ ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+ }
}
} catch (Exception e) {
throw new RuntimeException("Caught exception while aggregating HyperLogLog", e);
@@ -208,8 +218,11 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
try {
for (int i = 0; i < length; i++) {
- setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
- ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i],
+ ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
+ }
}
} catch (Exception e) {
throw new RuntimeException("Caught exception while aggregating HyperLogLog", e);
@@ -225,7 +238,7 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
public HyperLogLog extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
HyperLogLog hyperLogLog = aggregationResultHolder.getResult();
if (hyperLogLog == null) {
- return new HyperLogLog(DEFAULT_LOG2M);
+ return getDefaultHyperLogLog();
} else {
return hyperLogLog;
}
@@ -236,7 +249,7 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
public HyperLogLog extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
HyperLogLog hyperLogLog = groupByResultHolder.getResult(groupKey);
if (hyperLogLog == null) {
- return new HyperLogLog(DEFAULT_LOG2M);
+ return getDefaultHyperLogLog();
} else {
return hyperLogLog;
}
@@ -335,7 +348,7 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
protected static HyperLogLog getHyperLogLog(@Nonnull AggregationResultHolder aggregationResultHolder) {
HyperLogLog hyperLogLog = aggregationResultHolder.getResult();
if (hyperLogLog == null) {
- hyperLogLog = new HyperLogLog(DEFAULT_LOG2M);
+ hyperLogLog = getDefaultHyperLogLog();
aggregationResultHolder.setValue(hyperLogLog);
}
return hyperLogLog;
@@ -351,7 +364,7 @@ public class DistinctCountHLLAggregationFunction implements AggregationFunction<
protected static HyperLogLog getHyperLogLog(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
HyperLogLog hyperLogLog = groupByResultHolder.getResult(groupKey);
if (hyperLogLog == null) {
- hyperLogLog = new HyperLogLog(DEFAULT_LOG2M);
+ hyperLogLog = getDefaultHyperLogLog();
groupByResultHolder.setValueForKey(groupKey, hyperLogLog);
}
return hyperLogLog;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
index 717817c..84d5e06 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
@@ -32,6 +32,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMaxRangePair, Double> {
+ public static MinMaxRangePair getDefaultMinMaxRangePair() {
+ return new MinMaxRangePair(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY);
+ }
+
@Nonnull
@Override
public AggregationFunctionType getType() {
@@ -70,11 +74,11 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
case LONG:
case FLOAT:
case DOUBLE:
- double[] valueArray = blockValSets[0].getDoubleValuesSV();
+ double[] doubleValues = blockValSets[0].getDoubleValuesSV();
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
for (int i = 0; i < length; i++) {
- double value = valueArray[i];
+ double value = doubleValues[i];
if (value < min) {
min = value;
}
@@ -90,12 +94,15 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
min = Double.POSITIVE_INFINITY;
max = Double.NEGATIVE_INFINITY;
for (int i = 0; i < length; i++) {
- MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
- if (value.getMin() < min) {
- min = value.getMin();
- }
- if (value.getMax() > max) {
- max = value.getMax();
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+ if (value.getMin() < min) {
+ min = value.getMin();
+ }
+ if (value.getMax() > max) {
+ max = value.getMax();
+ }
}
}
setAggregationResult(aggregationResultHolder, min, max);
@@ -124,9 +131,9 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
case LONG:
case FLOAT:
case DOUBLE:
- double[] valueArray = blockValSets[0].getDoubleValuesSV();
+ double[] doubleValues = blockValSets[0].getDoubleValuesSV();
for (int i = 0; i < length; i++) {
- double value = valueArray[i];
+ double value = doubleValues[i];
setGroupByResult(groupKeyArray[i], groupByResultHolder, value, value);
}
break;
@@ -134,8 +141,11 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
// Serialized MinMaxRangePair
byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
for (int i = 0; i < length; i++) {
- MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
- setGroupByResult(groupKeyArray[i], groupByResultHolder, value.getMin(), value.getMax());
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, value.getMin(), value.getMax());
+ }
}
break;
default:
@@ -152,9 +162,9 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
case LONG:
case FLOAT:
case DOUBLE:
- double[] valueArray = blockValSets[0].getDoubleValuesSV();
+ double[] doubleValues = blockValSets[0].getDoubleValuesSV();
for (int i = 0; i < length; i++) {
- double value = valueArray[i];
+ double value = doubleValues[i];
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, value, value);
}
@@ -164,9 +174,12 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
// Serialized MinMaxRangePair
byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
for (int i = 0; i < length; i++) {
- MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, value.getMin(), value.getMax());
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ MinMaxRangePair value = ObjectSerDeUtils.MIN_MAX_RANGE_PAIR_SER_DE.deserialize(bytesValues[i]);
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value.getMin(), value.getMax());
+ }
}
}
break;
@@ -190,7 +203,7 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
public MinMaxRangePair extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
MinMaxRangePair minMaxRangePair = aggregationResultHolder.getResult();
if (minMaxRangePair == null) {
- return new MinMaxRangePair(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY);
+ return getDefaultMinMaxRangePair();
} else {
return minMaxRangePair;
}
@@ -201,7 +214,7 @@ public class MinMaxRangeAggregationFunction implements AggregationFunction<MinMa
public MinMaxRangePair extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
MinMaxRangePair minMaxRangePair = groupByResultHolder.getResult(groupKey);
if (minMaxRangePair == null) {
- return new MinMaxRangePair(Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY);
+ return getDefaultMinMaxRangePair();
} else {
return minMaxRangePair;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
index 3c2c91b..6c0b8ae 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
@@ -33,6 +33,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
public class PercentileEstAggregationFunction implements AggregationFunction<QuantileDigest, Long> {
public static final double DEFAULT_MAX_ERROR = 0.05;
+ public static QuantileDigest getDefaultQuantileDigest() {
+ return new QuantileDigest(DEFAULT_MAX_ERROR);
+ }
+
protected final int _percentile;
public PercentileEstAggregationFunction(int percentile) {
@@ -79,16 +83,19 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
case LONG:
case FLOAT:
case DOUBLE:
- double[] valueArray = blockValSets[0].getDoubleValuesSV();
+ long[] longValues = blockValSets[0].getLongValuesSV();
for (int i = 0; i < length; i++) {
- quantileDigest.add((long) valueArray[i]);
+ quantileDigest.add(longValues[i]);
}
break;
case BYTES:
// Serialized QuantileDigest
byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
for (int i = 0; i < length; i++) {
- quantileDigest.merge(ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]));
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ quantileDigest.merge(ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]));
+ }
}
break;
default:
@@ -105,18 +112,21 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
case LONG:
case FLOAT:
case DOUBLE:
- double[] valueArray = blockValSets[0].getDoubleValuesSV();
+ long[] longValues = blockValSets[0].getLongValuesSV();
for (int i = 0; i < length; i++) {
QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKeyArray[i]);
- quantileDigest.add((long) valueArray[i]);
+ quantileDigest.add(longValues[i]);
}
break;
case BYTES:
// Serialized QuantileDigest
byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
for (int i = 0; i < length; i++) {
- QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKeyArray[i]);
- quantileDigest.merge(ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]));
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKeyArray[i]);
+ quantileDigest.merge(ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]));
+ }
}
break;
default:
@@ -133,12 +143,12 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
case LONG:
case FLOAT:
case DOUBLE:
- double[] valueArray = blockValSets[0].getDoubleValuesSV();
+ long[] longValues = blockValSets[0].getLongValuesSV();
for (int i = 0; i < length; i++) {
- double value = valueArray[i];
+ long value = longValues[i];
for (int groupKey : groupKeysArray[i]) {
QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKey);
- quantileDigest.add((long) value);
+ quantileDigest.add(value);
}
}
break;
@@ -146,10 +156,13 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
// Serialized QuantileDigest
byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
for (int i = 0; i < length; i++) {
- QuantileDigest value = ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]);
- for (int groupKey : groupKeysArray[i]) {
- QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKey);
- quantileDigest.merge(value);
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ QuantileDigest value = ObjectSerDeUtils.QUANTILE_DIGEST_SER_DE.deserialize(bytesValues[i]);
+ for (int groupKey : groupKeysArray[i]) {
+ QuantileDigest quantileDigest = getQuantileDigest(groupByResultHolder, groupKey);
+ quantileDigest.merge(value);
+ }
}
}
break;
@@ -163,7 +176,7 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
public QuantileDigest extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
QuantileDigest quantileDigest = aggregationResultHolder.getResult();
if (quantileDigest == null) {
- return new QuantileDigest(DEFAULT_MAX_ERROR);
+ return getDefaultQuantileDigest();
} else {
return quantileDigest;
}
@@ -174,7 +187,7 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
public QuantileDigest extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
QuantileDigest quantileDigest = groupByResultHolder.getResult(groupKey);
if (quantileDigest == null) {
- return new QuantileDigest(DEFAULT_MAX_ERROR);
+ return getDefaultQuantileDigest();
} else {
return quantileDigest;
}
@@ -214,7 +227,7 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
protected static QuantileDigest getQuantileDigest(@Nonnull AggregationResultHolder aggregationResultHolder) {
QuantileDigest quantileDigest = aggregationResultHolder.getResult();
if (quantileDigest == null) {
- quantileDigest = new QuantileDigest(DEFAULT_MAX_ERROR);
+ quantileDigest = getDefaultQuantileDigest();
aggregationResultHolder.setValue(quantileDigest);
}
return quantileDigest;
@@ -230,7 +243,7 @@ public class PercentileEstAggregationFunction implements AggregationFunction<Qua
protected static QuantileDigest getQuantileDigest(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
QuantileDigest quantileDigest = groupByResultHolder.getResult(groupKey);
if (quantileDigest == null) {
- quantileDigest = new QuantileDigest(DEFAULT_MAX_ERROR);
+ quantileDigest = getDefaultQuantileDigest();
groupByResultHolder.setValueForKey(groupKey, quantileDigest);
}
return quantileDigest;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
index 0d4db7f..d037535 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
@@ -37,6 +37,10 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder
public class PercentileTDigestAggregationFunction implements AggregationFunction<TDigest, Double> {
public static final int DEFAULT_TDIGEST_COMPRESSION = 100;
+ public static TDigest getDefaultTDigest() {
+ return TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+ }
+
protected final int _percentile;
public PercentileTDigestAggregationFunction(int percentile) {
@@ -83,16 +87,19 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
case LONG:
case FLOAT:
case DOUBLE:
- double[] valueArray = blockValSets[0].getDoubleValuesSV();
+ double[] doubleValues = blockValSets[0].getDoubleValuesSV();
for (int i = 0; i < length; i++) {
- tDigest.add(valueArray[i]);
+ tDigest.add(doubleValues[i]);
}
break;
case BYTES:
// Serialized TDigest
byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
for (int i = 0; i < length; i++) {
- tDigest.add(ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i])));
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ tDigest.add(ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i])));
+ }
}
break;
default:
@@ -109,18 +116,21 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
case LONG:
case FLOAT:
case DOUBLE:
- double[] valueArray = blockValSets[0].getDoubleValuesSV();
+ double[] doubleValues = blockValSets[0].getDoubleValuesSV();
for (int i = 0; i < length; i++) {
TDigest tDigest = getTDigest(groupByResultHolder, groupKeyArray[i]);
- tDigest.add(valueArray[i]);
+ tDigest.add(doubleValues[i]);
}
break;
case BYTES:
// Serialized TDigest
byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
for (int i = 0; i < length; i++) {
- TDigest tDigest = getTDigest(groupByResultHolder, groupKeyArray[i]);
- tDigest.add(ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i])));
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ TDigest tDigest = getTDigest(groupByResultHolder, groupKeyArray[i]);
+ tDigest.add(ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i])));
+ }
}
break;
default:
@@ -137,9 +147,9 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
case LONG:
case FLOAT:
case DOUBLE:
- double[] valueArray = blockValSets[0].getDoubleValuesSV();
+ double[] doubleValues = blockValSets[0].getDoubleValuesSV();
for (int i = 0; i < length; i++) {
- double value = valueArray[i];
+ double value = doubleValues[i];
for (int groupKey : groupKeysArray[i]) {
TDigest tDigest = getTDigest(groupByResultHolder, groupKey);
tDigest.add(value);
@@ -149,10 +159,13 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
// Serialized QuantileDigest
byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
for (int i = 0; i < length; i++) {
- TDigest value = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i]));
- for (int groupKey : groupKeysArray[i]) {
- TDigest tDigest = getTDigest(groupByResultHolder, groupKey);
- tDigest.add(value);
+ // Skip zero-length byte array
+ if (bytesValues[i].length != 0) {
+ TDigest value = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(ByteBuffer.wrap(bytesValues[i]));
+ for (int groupKey : groupKeysArray[i]) {
+ TDigest tDigest = getTDigest(groupByResultHolder, groupKey);
+ tDigest.add(value);
+ }
}
}
break;
@@ -166,7 +179,7 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
public TDigest extractAggregationResult(@Nonnull AggregationResultHolder aggregationResultHolder) {
TDigest tDigest = aggregationResultHolder.getResult();
if (tDigest == null) {
- return TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+ return getDefaultTDigest();
} else {
return tDigest;
}
@@ -177,7 +190,7 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
public TDigest extractGroupByResult(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
TDigest tDigest = groupByResultHolder.getResult(groupKey);
if (tDigest == null) {
- return TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+ return getDefaultTDigest();
} else {
return tDigest;
}
@@ -229,7 +242,7 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
protected static TDigest getTDigest(@Nonnull AggregationResultHolder aggregationResultHolder) {
TDigest tDigest = aggregationResultHolder.getResult();
if (tDigest == null) {
- tDigest = TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+ tDigest = getDefaultTDigest();
aggregationResultHolder.setValue(tDigest);
}
return tDigest;
@@ -245,7 +258,7 @@ public class PercentileTDigestAggregationFunction implements AggregationFunction
protected static TDigest getTDigest(@Nonnull GroupByResultHolder groupByResultHolder, int groupKey) {
TDigest tDigest = groupByResultHolder.getResult(groupKey);
if (tDigest == null) {
- tDigest = TDigest.createMergingDigest(DEFAULT_TDIGEST_COMPRESSION);
+ tDigest = getDefaultTDigest();
groupByResultHolder.setValueForKey(groupKey, tDigest);
}
return tDigest;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index 8350ee6..74515b0 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -137,7 +137,7 @@ public class ObjectSerDeUtilsTest {
@Test
public void testQuantileDigest() {
for (int i = 0; i < NUM_ITERATIONS; i++) {
- QuantileDigest expected = new QuantileDigest(PercentileEstAggregationFunction.DEFAULT_MAX_ERROR);
+ QuantileDigest expected = PercentileEstAggregationFunction.getDefaultQuantileDigest();
int size = RANDOM.nextInt(100) + 1;
for (int j = 0; j < size; j++) {
expected.add(RANDOM.nextLong());
@@ -188,7 +188,7 @@ public class ObjectSerDeUtilsTest {
@Test
public void testTDigest() {
for (int i = 0; i < NUM_ITERATIONS; i++) {
- TDigest expected = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+ TDigest expected = PercentileTDigestAggregationFunction.getDefaultTDigest();
int size = RANDOM.nextInt(100) + 1;
for (int j = 0; j < size; j++) {
expected.add(RANDOM.nextDouble());
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithBytesTypeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithBytesTypeTest.java
index 6d6d810..7749a44 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithBytesTypeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithBytesTypeTest.java
@@ -285,7 +285,7 @@ public class SegmentGenerationWithBytesTypeTest {
for (int i = 0; i < NUM_ROWS; i++) {
GenericData.Record record = new GenericData.Record(avroSchema);
- TDigest tDigest = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+ TDigest tDigest = PercentileTDigestAggregationFunction.getDefaultTDigest();
tDigest.add(_random.nextDouble());
ByteBuffer buffer = ByteBuffer.allocate(tDigest.byteSize());
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DefaultBytesQueriesTest.java
similarity index 55%
copy from pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
copy to pinot-core/src/test/java/org/apache/pinot/queries/DefaultBytesQueriesTest.java
index 67ab0fa..1686513 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DefaultBytesQueriesTest.java
@@ -18,17 +18,14 @@
*/
package org.apache.pinot.queries;
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import com.tdunning.math.stats.TDigest;
-import it.unimi.dsi.fastutil.doubles.DoubleList;
import java.io.File;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.data.DimensionFieldSpec;
@@ -51,7 +48,9 @@ import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
import org.apache.pinot.core.operator.query.AggregationOperator;
-import org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
+import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
+import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
@@ -60,34 +59,33 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
/**
- * Tests for PERCENTILE_TDIGEST aggregation function.
+ * Tests for default bytes (zero-length byte array) values.
*
+ * <p>Aggregation function that supports bytes values:
* <ul>
- * <li>Generates a segment with a double column, a TDigest column and a group-by column</li>
- * <li>Runs aggregation and group-by queries on the generated segment</li>
- * <li>
- * Compares the results for PERCENTILE_TDIGEST on double column and TDigest column with results for PERCENTILE on
- * double column
- * </li>
+ * <li>AVG</li>
+ * <li>DISTINCTCOUNTHLL</li>
+ * <li>MINMAXRANGE</li>
+ * <li>PERCENTILEEST</li>
+ * <li>PERCENTILETDIGEST</li>
* </ul>
*/
-public class PercentileTDigestQueriesTest extends BaseQueriesTest {
- protected static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "PercentileTDigestQueriesTest");
+public class DefaultBytesQueriesTest extends BaseQueriesTest {
+ protected static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "DefaultBytesQueriesTest");
protected static final String TABLE_NAME = "testTable";
protected static final String SEGMENT_NAME = "testSegment";
protected static final int NUM_ROWS = 1000;
- protected static final double VALUE_RANGE = Integer.MAX_VALUE;
- protected static final double DELTA = 0.05 * VALUE_RANGE; // Allow 5% quantile error
- protected static final String DOUBLE_COLUMN = "doubleColumn";
- protected static final String TDIGEST_COLUMN = "tDigestColumn";
+ protected static final String BYTES_COLUMN = "bytesColumn";
protected static final String GROUP_BY_COLUMN = "groupByColumn";
protected static final String[] GROUPS = new String[]{"G1", "G2", "G3"};
protected static final long RANDOM_SEED = System.nanoTime();
protected static final Random RANDOM = new Random(RANDOM_SEED);
- protected static final String ERROR_MESSAGE = "Random seed: " + RANDOM_SEED;
private ImmutableSegment _indexSegment;
private List<SegmentDataManager> _segmentDataManagers;
@@ -118,20 +116,13 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
Arrays.asList(new ImmutableSegmentDataManager(_indexSegment), new ImmutableSegmentDataManager(_indexSegment));
}
- protected void buildSegment()
+ private void buildSegment()
throws Exception {
List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
for (int i = 0; i < NUM_ROWS; i++) {
HashMap<String, Object> valueMap = new HashMap<>();
- double value = RANDOM.nextDouble() * VALUE_RANGE;
- valueMap.put(DOUBLE_COLUMN, value);
-
- TDigest tDigest = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
- tDigest.add(value);
- ByteBuffer byteBuffer = ByteBuffer.allocate(tDigest.byteSize());
- tDigest.asBytes(byteBuffer);
- valueMap.put(TDIGEST_COLUMN, byteBuffer.array());
+ valueMap.put(BYTES_COLUMN, FieldSpec.DEFAULT_METRIC_NULL_VALUE_OF_BYTES);
String group = GROUPS[RANDOM.nextInt(GROUPS.length)];
valueMap.put(GROUP_BY_COLUMN, group);
@@ -142,15 +133,13 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
}
Schema schema = new Schema();
- schema.addField(new MetricFieldSpec(DOUBLE_COLUMN, FieldSpec.DataType.DOUBLE));
- schema.addField(new MetricFieldSpec(TDIGEST_COLUMN, FieldSpec.DataType.BYTES));
+ schema.addField(new MetricFieldSpec(BYTES_COLUMN, FieldSpec.DataType.BYTES));
schema.addField(new DimensionFieldSpec(GROUP_BY_COLUMN, FieldSpec.DataType.STRING, true));
SegmentGeneratorConfig config = new SegmentGeneratorConfig(schema);
config.setOutDir(INDEX_DIR.getPath());
config.setTableName(TABLE_NAME);
config.setSegmentName(SEGMENT_NAME);
- config.setRawIndexCreationColumns(Collections.singletonList(TDIGEST_COLUMN));
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
try (RecordReader recordReader = new GenericRowRecordReader(rows, schema)) {
@@ -162,97 +151,128 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
@Test
public void testInnerSegmentAggregation() {
// For inner segment case, percentile does not affect the intermediate result
- AggregationOperator aggregationOperator = getOperatorForQuery(getAggregationQuery(0));
+ AggregationOperator aggregationOperator = getOperatorForQuery(getAggregationQuery());
IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
List<Object> aggregationResult = resultsBlock.getAggregationResult();
Assert.assertNotNull(aggregationResult);
- Assert.assertEquals(aggregationResult.size(), 3);
- DoubleList doubleList = (DoubleList) aggregationResult.get(0);
- Collections.sort(doubleList);
- assertTDigest((TDigest) aggregationResult.get(1), doubleList);
- assertTDigest((TDigest) aggregationResult.get(2), doubleList);
+ assertEquals(aggregationResult.size(), 5);
+ // Avg
+ AvgPair avgPair = (AvgPair) aggregationResult.get(0);
+ assertEquals(avgPair.getSum(), 0.0);
+ assertEquals(avgPair.getCount(), 0L);
+ // DistinctCountHLL
+ HyperLogLog hyperLogLog = (HyperLogLog) aggregationResult.get(1);
+ assertEquals(hyperLogLog.cardinality(), 0L);
+ // MinMaxRange
+ MinMaxRangePair minMaxRangePair = (MinMaxRangePair) aggregationResult.get(2);
+ assertEquals(minMaxRangePair.getMax(), Double.NEGATIVE_INFINITY);
+ assertEquals(minMaxRangePair.getMin(), Double.POSITIVE_INFINITY);
+ // PercentileEst
+ QuantileDigest quantileDigest = (QuantileDigest) aggregationResult.get(3);
+ assertEquals(quantileDigest.getQuantile(0.5), Long.MIN_VALUE);
+ // PercentileTDigest
+ TDigest tDigest = (TDigest) aggregationResult.get(4);
+ assertTrue(Double.isNaN(tDigest.quantile(0.5)));
}
@Test
public void testInterSegmentAggregation() {
for (int percentile = 0; percentile <= 100; percentile++) {
- BrokerResponseNative brokerResponse = getBrokerResponseForQuery(getAggregationQuery(percentile));
+ BrokerResponseNative brokerResponse = getBrokerResponseForQuery(getAggregationQuery());
List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
Assert.assertNotNull(aggregationResults);
- Assert.assertEquals(aggregationResults.size(), 3);
- double expected = Double.parseDouble((String) aggregationResults.get(0).getValue());
- double resultForDoubleColumn = Double.parseDouble((String) aggregationResults.get(1).getValue());
- Assert.assertEquals(resultForDoubleColumn, expected, DELTA, ERROR_MESSAGE);
- double resultForTDigestColumn = Double.parseDouble((String) aggregationResults.get(2).getValue());
- Assert.assertEquals(resultForTDigestColumn, expected, DELTA, ERROR_MESSAGE);
+ assertEquals(aggregationResults.size(), 5);
+ // Avg
+ assertEquals(Double.parseDouble((String) aggregationResults.get(0).getValue()), Double.NEGATIVE_INFINITY);
+ // DistinctCountHLL
+ assertEquals(Long.parseLong((String) aggregationResults.get(1).getValue()), 0L);
+ // MinMaxRange
+ assertEquals(Double.parseDouble((String) aggregationResults.get(2).getValue()), Double.NEGATIVE_INFINITY);
+ // PercentileEst
+ assertEquals(Long.parseLong((String) aggregationResults.get(3).getValue()), Long.MIN_VALUE);
+ // PercentileTDigest
+ assertTrue(Double.isNaN(Double.parseDouble((String) aggregationResults.get(4).getValue())));
}
}
@Test
public void testInnerSegmentGroupBy() {
// For inner segment case, percentile does not affect the intermediate result
- AggregationGroupByOperator groupByOperator = getOperatorForQuery(getGroupByQuery(0));
+ AggregationGroupByOperator groupByOperator = getOperatorForQuery(getGroupByQuery());
IntermediateResultsBlock resultsBlock = groupByOperator.nextBlock();
AggregationGroupByResult groupByResult = resultsBlock.getAggregationGroupByResult();
Assert.assertNotNull(groupByResult);
Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = groupByResult.getGroupKeyIterator();
while (groupKeyIterator.hasNext()) {
GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
- DoubleList doubleList = (DoubleList) groupByResult.getResultForKey(groupKey, 0);
- Collections.sort(doubleList);
- assertTDigest((TDigest) groupByResult.getResultForKey(groupKey, 1), doubleList);
- assertTDigest((TDigest) groupByResult.getResultForKey(groupKey, 2), doubleList);
+ // Avg
+ AvgPair avgPair = (AvgPair) groupByResult.getResultForKey(groupKey, 0);
+ assertEquals(avgPair.getSum(), 0.0);
+ assertEquals(avgPair.getCount(), 0L);
+ // DistinctCountHLL
+ HyperLogLog hyperLogLog = (HyperLogLog) groupByResult.getResultForKey(groupKey, 1);
+ assertEquals(hyperLogLog.cardinality(), 0L);
+ // MinMaxRange
+ MinMaxRangePair minMaxRangePair = (MinMaxRangePair) groupByResult.getResultForKey(groupKey, 2);
+ assertEquals(minMaxRangePair.getMax(), Double.NEGATIVE_INFINITY);
+ assertEquals(minMaxRangePair.getMin(), Double.POSITIVE_INFINITY);
+ // PercentileEst
+ QuantileDigest quantileDigest = (QuantileDigest) groupByResult.getResultForKey(groupKey, 3);
+ assertEquals(quantileDigest.getQuantile(0.5), Long.MIN_VALUE);
+ // PercentileTDigest
+ TDigest tDigest = (TDigest) groupByResult.getResultForKey(groupKey, 4);
+ assertTrue(Double.isNaN(tDigest.quantile(0.5)));
}
}
@Test
public void testInterSegmentGroupBy() {
for (int percentile = 0; percentile <= 100; percentile++) {
- BrokerResponseNative brokerResponse = getBrokerResponseForQuery(getGroupByQuery(percentile));
+ BrokerResponseNative brokerResponse = getBrokerResponseForQuery(getGroupByQuery());
List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
Assert.assertNotNull(aggregationResults);
- Assert.assertEquals(aggregationResults.size(), 3);
- Map<String, Double> expectedValues = new HashMap<>();
- for (GroupByResult groupByResult : aggregationResults.get(0).getGroupByResult()) {
- expectedValues.put(groupByResult.getGroup().get(0), Double.parseDouble((String) groupByResult.getValue()));
+ assertEquals(aggregationResults.size(), 5);
+ // Avg
+ List<GroupByResult> groupByResults = aggregationResults.get(0).getGroupByResult();
+ assertEquals(groupByResults.size(), 3);
+ for (GroupByResult groupByResult : groupByResults) {
+ assertEquals(Double.parseDouble((String) groupByResult.getValue()), Double.NEGATIVE_INFINITY);
+ }
+ // DistinctCountHLL
+ groupByResults = aggregationResults.get(1).getGroupByResult();
+ assertEquals(groupByResults.size(), 3);
+ for (GroupByResult groupByResult : groupByResults) {
+ assertEquals(Long.parseLong((String) groupByResult.getValue()), 0L);
}
- for (GroupByResult groupByResult : aggregationResults.get(1).getGroupByResult()) {
- String group = groupByResult.getGroup().get(0);
- double expected = expectedValues.get(group);
- double resultForDoubleColumn = Double.parseDouble((String) groupByResult.getValue());
- Assert.assertEquals(resultForDoubleColumn, expected, DELTA, ERROR_MESSAGE);
+ // MinMaxRange
+ groupByResults = aggregationResults.get(2).getGroupByResult();
+ assertEquals(groupByResults.size(), 3);
+ for (GroupByResult groupByResult : groupByResults) {
+ assertEquals(Double.parseDouble((String) groupByResult.getValue()), Double.NEGATIVE_INFINITY);
}
- for (GroupByResult groupByResult : aggregationResults.get(2).getGroupByResult()) {
- String group = groupByResult.getGroup().get(0);
- double expected = expectedValues.get(group);
- double resultForTDigestColumn = Double.parseDouble((String) groupByResult.getValue());
- Assert.assertEquals(resultForTDigestColumn, expected, DELTA, ERROR_MESSAGE);
+ // PercentileEst
+ groupByResults = aggregationResults.get(3).getGroupByResult();
+ assertEquals(groupByResults.size(), 3);
+ for (GroupByResult groupByResult : groupByResults) {
+ assertEquals(Long.parseLong((String) groupByResult.getValue()), Long.MIN_VALUE);
+ }
+ // PercentileTDigest
+ groupByResults = aggregationResults.get(4).getGroupByResult();
+ assertEquals(groupByResults.size(), 3);
+ for (GroupByResult groupByResult : groupByResults) {
+ assertTrue(Double.isNaN(Double.parseDouble((String) groupByResult.getValue())));
}
}
}
- protected String getAggregationQuery(int percentile) {
- return String
- .format("SELECT PERCENTILE%d(%s), PERCENTILETDIGEST%d(%s), PERCENTILETDIGEST%d(%s) FROM %s", percentile,
- DOUBLE_COLUMN, percentile, DOUBLE_COLUMN, percentile, TDIGEST_COLUMN, TABLE_NAME);
+ private String getAggregationQuery() {
+ return String.format(
+ "SELECT AVG(%s), DISTINCTCOUNTHLL(%s), MINMAXRANGE(%s), PERCENTILEEST50(%s), PERCENTILETDIGEST50(%s) FROM %s",
+ BYTES_COLUMN, BYTES_COLUMN, BYTES_COLUMN, BYTES_COLUMN, BYTES_COLUMN, TABLE_NAME);
}
- private String getGroupByQuery(int percentile) {
- return String.format("%s GROUP BY %s", getAggregationQuery(percentile), GROUP_BY_COLUMN);
- }
-
- private void assertTDigest(TDigest tDigest, DoubleList doubleList) {
- for (int percentile = 0; percentile <= 100; percentile++) {
- double expected;
- if (percentile == 100) {
- expected = doubleList.getDouble(doubleList.size() - 1);
- } else {
- expected = doubleList.getDouble(doubleList.size() * percentile / 100);
- }
- Assert
- .assertEquals(PercentileTDigestAggregationFunction.calculatePercentile(tDigest, percentile), expected, DELTA,
- ERROR_MESSAGE);
- }
+ private String getGroupByQuery() {
+ return String.format("%s GROUP BY %s", getAggregationQuery(), GROUP_BY_COLUMN);
}
@AfterClass
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestMVQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestMVQueriesTest.java
index 4f2bf59..a9f6218 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestMVQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestMVQueriesTest.java
@@ -60,7 +60,7 @@ public class PercentileTDigestMVQueriesTest extends PercentileTDigestQueriesTest
int numMultiValues = RANDOM.nextInt(MAX_NUM_MULTI_VALUES) + 1;
Double[] values = new Double[numMultiValues];
- TDigest tDigest = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+ TDigest tDigest = PercentileTDigestAggregationFunction.getDefaultTDigest();
for (int j = 0; j < numMultiValues; j++) {
double value = RANDOM.nextDouble() * VALUE_RANGE;
values[j] = value;
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
index 67ab0fa..6206120 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
@@ -127,7 +127,7 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
double value = RANDOM.nextDouble() * VALUE_RANGE;
valueMap.put(DOUBLE_COLUMN, value);
- TDigest tDigest = TDigest.createMergingDigest(PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION);
+ TDigest tDigest = PercentileTDigestAggregationFunction.getDefaultTDigest();
tDigest.add(value);
ByteBuffer byteBuffer = ByteBuffer.allocate(tDigest.byteSize());
tDigest.asBytes(byteBuffer);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org