You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/08/01 01:49:30 UTC
[incubator-pinot] branch master updated: Add DistinctCountBitmap
aggregation function (#5766)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a3efba4 Add DistinctCountBitmap aggregation function (#5766)
a3efba4 is described below
commit a3efba4aef942e76b024205ef2e67ae656dd8cc5
Author: Kishore Gopalakrishna <g....@gmail.com>
AuthorDate: Fri Jul 31 18:49:14 2020 -0700
Add DistinctCountBitmap aggregation function (#5766)
We currently use `IntHashSet` to store distinct values for `DistinctCount` aggregation function.
Added a new `DistinctCountBitmap` aggregation function which uses `RoaringBitmap` to store the distinct values which is more efficient, especially for ser/de.
Also added star-tree support for this new function.
---
.../common/function/AggregationFunctionType.java | 2 +
.../apache/pinot/core/common/ObjectSerDeUtils.java | 37 +-
.../DistinctCountBitmapValueAggregator.java | 94 +++++
.../data/aggregator/ValueAggregatorFactory.java | 7 +-
.../function/AggregationFunctionFactory.java | 4 +
.../function/AggregationFunctionVisitorBase.java | 6 +
.../DistinctCountBitmapAggregationFunction.java | 415 +++++++++++++++++++++
.../DistinctCountBitmapMVAggregationFunction.java | 261 +++++++++++++
.../v2/DistinctCountBitmapStarTreeV2Test.java | 51 +++
.../queries/DistinctCountBitmapQueriesTest.java | 248 ++++++++++++
.../tests/StarTreeClusterIntegrationTest.java | 3 +-
11 files changed, 1123 insertions(+), 5 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index 5125a8d..cf48ea6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -27,6 +27,7 @@ public enum AggregationFunctionType {
AVG("avg"),
MINMAXRANGE("minMaxRange"),
DISTINCTCOUNT("distinctCount"),
+ DISTINCTCOUNTBITMAP("distinctCountBitmap"),
DISTINCTCOUNTHLL("distinctCountHLL"),
DISTINCTCOUNTRAWHLL("distinctCountRawHLL"),
FASTHLL("fastHLL"),
@@ -47,6 +48,7 @@ public enum AggregationFunctionType {
AVGMV("avgMV"),
MINMAXRANGEMV("minMaxRangeMV"),
DISTINCTCOUNTMV("distinctCountMV"),
+ DISTINCTCOUNTBITMAPMV("distinctCountBitmapMV"),
DISTINCTCOUNTHLLMV("distinctCountHLLMV"),
DISTINCTCOUNTRAWHLLMV("distinctCountRawHLLMV"),
PERCENTILEMV("percentileMV"),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 637d5e1..9c87921 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -42,6 +42,7 @@ import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTab
import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
import org.locationtech.jts.geom.Geometry;
+import org.roaringbitmap.RoaringBitmap;
/**
@@ -66,9 +67,10 @@ public class ObjectSerDeUtils {
TDigest(10),
DistinctTable(11),
DataSketch(12),
- Geometry(13);
+ Geometry(13),
+ RoaringBitmap(14);
- private int _value;
+ private final int _value;
ObjectType(int value) {
_value = value;
@@ -107,6 +109,8 @@ public class ObjectSerDeUtils {
return ObjectType.DataSketch;
} else if (value instanceof Geometry) {
return ObjectType.Geometry;
+ } else if (value instanceof RoaringBitmap) {
+ return ObjectType.RoaringBitmap;
} else {
throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
}
@@ -506,6 +510,32 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<RoaringBitmap> ROARING_BITMAP_SER_DE = new ObjectSerDe<RoaringBitmap>() {
+ @Override
+ public byte[] serialize(RoaringBitmap bitmap) {
+ byte[] bytes = new byte[bitmap.serializedSizeInBytes()];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ bitmap.serialize(byteBuffer);
+ return bytes;
+ }
+
+ @Override
+ public RoaringBitmap deserialize(byte[] bytes) {
+ return deserialize(ByteBuffer.wrap(bytes));
+ }
+
+ @Override
+ public RoaringBitmap deserialize(ByteBuffer byteBuffer) {
+ RoaringBitmap bitmap = new RoaringBitmap();
+ try {
+ bitmap.deserialize(byteBuffer);
+ } catch (IOException e) {
+ throw new RuntimeException("Caught exception while deserializing RoaringBitmap", e);
+ }
+ return bitmap;
+ }
+ };
+
// NOTE: DO NOT change the order, it has to be the same order as the ObjectType
//@formatter:off
private static final ObjectSerDe[] SER_DES = {
@@ -522,7 +552,8 @@ public class ObjectSerDeUtils {
TDIGEST_SER_DE,
DISTINCT_TABLE_SER_DE,
DATA_SKETCH_SER_DE,
- GEOMETRY_SER_DE
+ GEOMETRY_SER_DE,
+ ROARING_BITMAP_SER_DE
};
//@formatter:on
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java
new file mode 100644
index 0000000..3244267
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.aggregator;
+
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class DistinctCountBitmapValueAggregator implements ValueAggregator<Object, RoaringBitmap> {
+ public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+ private int _maxByteSize;
+
+ @Override
+ public AggregationFunctionType getAggregationType() {
+ return AggregationFunctionType.DISTINCTCOUNTBITMAP;
+ }
+
+ @Override
+ public DataType getAggregatedValueType() {
+ return AGGREGATED_VALUE_TYPE;
+ }
+
+ @Override
+ public RoaringBitmap getInitialAggregatedValue(Object rawValue) {
+ RoaringBitmap initialValue;
+ if (rawValue instanceof byte[]) {
+ byte[] bytes = (byte[]) rawValue;
+ initialValue = deserializeAggregatedValue(bytes);
+ _maxByteSize = Math.max(_maxByteSize, bytes.length);
+ } else {
+ initialValue = new RoaringBitmap();
+ initialValue.add(rawValue.hashCode());
+ _maxByteSize = Math.max(_maxByteSize, initialValue.serializedSizeInBytes());
+ }
+ return initialValue;
+ }
+
+ @Override
+ public RoaringBitmap applyRawValue(RoaringBitmap value, Object rawValue) {
+ if (rawValue instanceof byte[]) {
+ value.or(deserializeAggregatedValue((byte[]) rawValue));
+ } else {
+ value.add(rawValue.hashCode());
+ }
+ _maxByteSize = Math.max(_maxByteSize, value.serializedSizeInBytes());
+ return value;
+ }
+
+ @Override
+ public RoaringBitmap applyAggregatedValue(RoaringBitmap value, RoaringBitmap aggregatedValue) {
+ value.or(aggregatedValue);
+ _maxByteSize = Math.max(_maxByteSize, value.serializedSizeInBytes());
+ return value;
+ }
+
+ @Override
+ public RoaringBitmap cloneAggregatedValue(RoaringBitmap value) {
+ return value.clone();
+ }
+
+ @Override
+ public int getMaxAggregatedValueByteSize() {
+ return _maxByteSize;
+ }
+
+ @Override
+ public byte[] serializeAggregatedValue(RoaringBitmap value) {
+ return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(value);
+ }
+
+ @Override
+ public RoaringBitmap deserializeAggregatedValue(byte[] bytes) {
+ return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytes);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java
index 94dee04..d93e974 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java
@@ -18,13 +18,14 @@
*/
package org.apache.pinot.core.data.aggregator;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
/**
* The {@code ValueAggregatorFactory} class is the factory for all value aggregators.
*/
+@SuppressWarnings("rawtypes")
public class ValueAggregatorFactory {
private ValueAggregatorFactory() {
}
@@ -49,6 +50,8 @@ public class ValueAggregatorFactory {
return new AvgValueAggregator();
case MINMAXRANGE:
return new MinMaxRangeValueAggregator();
+ case DISTINCTCOUNTBITMAP:
+ return new DistinctCountBitmapValueAggregator();
case DISTINCTCOUNTHLL:
case DISTINCTCOUNTRAWHLL:
return new DistinctCountHLLValueAggregator();
@@ -81,6 +84,8 @@ public class ValueAggregatorFactory {
return AvgValueAggregator.AGGREGATED_VALUE_TYPE;
case MINMAXRANGE:
return MinMaxRangeValueAggregator.AGGREGATED_VALUE_TYPE;
+ case DISTINCTCOUNTBITMAP:
+ return DistinctCountBitmapValueAggregator.AGGREGATED_VALUE_TYPE;
case DISTINCTCOUNTHLL:
case DISTINCTCOUNTRAWHLL:
return DistinctCountHLLValueAggregator.AGGREGATED_VALUE_TYPE;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index bfe8550..3a1bb01 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -121,6 +121,8 @@ public class AggregationFunctionFactory {
return new MinMaxRangeAggregationFunction(firstArgument);
case DISTINCTCOUNT:
return new DistinctCountAggregationFunction(firstArgument);
+ case DISTINCTCOUNTBITMAP:
+ return new DistinctCountBitmapAggregationFunction(firstArgument);
case DISTINCTCOUNTHLL:
return new DistinctCountHLLAggregationFunction(arguments);
case DISTINCTCOUNTRAWHLL:
@@ -145,6 +147,8 @@ public class AggregationFunctionFactory {
return new MinMaxRangeMVAggregationFunction(firstArgument);
case DISTINCTCOUNTMV:
return new DistinctCountMVAggregationFunction(firstArgument);
+ case DISTINCTCOUNTBITMAPMV:
+ return new DistinctCountBitmapMVAggregationFunction(firstArgument);
case DISTINCTCOUNTHLLMV:
return new DistinctCountHLLMVAggregationFunction(arguments);
case DISTINCTCOUNTRAWHLLMV:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
index 8e0a8a6..72c8d4c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
@@ -45,6 +45,12 @@ public class AggregationFunctionVisitorBase {
public void visit(DistinctCountMVAggregationFunction function) {
}
+ public void visit(DistinctCountBitmapAggregationFunction function) {
+ }
+
+ public void visit(DistinctCountBitmapMVAggregationFunction function) {
+ }
+
public void visit(DistinctCountHLLAggregationFunction function) {
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java
new file mode 100644
index 0000000..fbb5153
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * The {@code DistinctCountBitmapAggregationFunction} calculates the number of distinct values for a given single-value
+ * expression using RoaringBitmap. The bitmap stores the actual values for {@code INT} expression, or hash code of the
+ * values for other data types (values with the same hash code will only be counted once).
+ */
+public class DistinctCountBitmapAggregationFunction extends BaseSingleInputAggregationFunction<RoaringBitmap, Integer> {
+ protected Dictionary _dictionary;
+
+ public DistinctCountBitmapAggregationFunction(ExpressionContext expression) {
+ super(expression);
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.DISTINCTCOUNTBITMAP;
+ }
+
+ @Override
+ public void accept(AggregationFunctionVisitorBase visitor) {
+ visitor.visit(this);
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // Treat BYTES value as serialized RoaringBitmap
+ DataType valueType = blockValSet.getValueType();
+ if (valueType == DataType.BYTES) {
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ RoaringBitmap bitmap = aggregationResultHolder.getResult();
+ if (bitmap != null) {
+ for (int i = 0; i < length; i++) {
+ bitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[i]));
+ }
+ } else {
+ bitmap = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[0]);
+ aggregationResultHolder.setValue(bitmap);
+ for (int i = 1; i < length; i++) {
+ bitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[i]));
+ }
+ }
+ return;
+ }
+
+ RoaringBitmap bitmap = getBitmap(aggregationResultHolder);
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ _dictionary = dictionary;
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ bitmap.addN(dictIds, 0, length);
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store hash code of the values into the bitmap
+ switch (valueType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ bitmap.addN(intValues, 0, length);
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ bitmap.add(Long.hashCode(longValues[i]));
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ bitmap.add(Float.hashCode(floatValues[i]));
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ bitmap.add(Double.hashCode(doubleValues[i]));
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ bitmap.add(stringValues[i].hashCode());
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_BITMAP aggregation function: " + valueType);
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // Treat BYTES value as serialized RoaringBitmap
+ DataType valueType = blockValSet.getValueType();
+ if (valueType == DataType.BYTES) {
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ RoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[i]);
+ int groupKey = groupKeyArray[i];
+ RoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+ if (bitmap != null) {
+ bitmap.or(value);
+ } else {
+ groupByResultHolder.setValueForKey(groupKey, value);
+ }
+ }
+ return;
+ }
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ _dictionary = dictionary;
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ for (int i = 0; i < length; i++) {
+ getBitmap(groupByResultHolder, groupKeyArray[i]).add(dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store hash code of the values into the bitmap
+ switch (valueType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ getBitmap(groupByResultHolder, groupKeyArray[i]).add(intValues[i]);
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ getBitmap(groupByResultHolder, groupKeyArray[i]).add(Long.hashCode(longValues[i]));
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ getBitmap(groupByResultHolder, groupKeyArray[i]).add(Float.hashCode(floatValues[i]));
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ getBitmap(groupByResultHolder, groupKeyArray[i]).add(Double.hashCode(doubleValues[i]));
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ getBitmap(groupByResultHolder, groupKeyArray[i]).add(stringValues[i].hashCode());
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_BITMAP aggregation function: " + valueType);
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // Treat BYTES value as serialized RoaringBitmap
+ DataType valueType = blockValSet.getValueType();
+ if (valueType == DataType.BYTES) {
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ RoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[i]);
+ for (int groupKey : groupKeysArray[i]) {
+ RoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+ if (bitmap != null) {
+ bitmap.or(value);
+ } else {
+ // Clone a bitmap for the group
+ groupByResultHolder.setValueForKey(groupKey, value.clone());
+ }
+ }
+ }
+ return;
+ }
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ _dictionary = dictionary;
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ for (int i = 0; i < length; i++) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store hash code of the values into the bitmap
+ switch (valueType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], intValues[i]);
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Long.hashCode(longValues[i]));
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Float.hashCode(floatValues[i]));
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Double.hashCode(doubleValues[i]));
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], stringValues[i].hashCode());
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_BITMAP aggregation function: " + valueType);
+ }
+ }
+
+ @Override
+ public RoaringBitmap extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+ RoaringBitmap bitmap = aggregationResultHolder.getResult();
+ if (bitmap == null) {
+ return new RoaringBitmap();
+ }
+
+ if (_dictionary != null) {
+ // For dictionary-encoded expression, convert dictionary ids to hash code of the values
+ return convertToValueBitmap(bitmap, _dictionary);
+ } else {
+ // For serialized RoaringBitmap and non-dictionary-encoded expression, directly return the bitmap
+ return bitmap;
+ }
+ }
+
+ @Override
+ public RoaringBitmap extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+ RoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+ if (bitmap == null) {
+ return new RoaringBitmap();
+ }
+
+ if (_dictionary != null) {
+ // For dictionary-encoded expression, convert dictionary ids to hash code of the values
+ return convertToValueBitmap(bitmap, _dictionary);
+ } else {
+ // For serialized RoaringBitmap and non-dictionary-encoded expression, directly return the bitmap
+ return bitmap;
+ }
+ }
+
+ @Override
+ public RoaringBitmap merge(RoaringBitmap intermediateResult1, RoaringBitmap intermediateResult2) {
+ intermediateResult1.or(intermediateResult2);
+ return intermediateResult1;
+ }
+
+ @Override
+ public boolean isIntermediateResultComparable() {
+ return false;
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.INT;
+ }
+
+ @Override
+ public Integer extractFinalResult(RoaringBitmap intermediateResult) {
+ return intermediateResult.getCardinality();
+ }
+
+ /**
+ * Returns the bitmap from the result holder or creates a new one if it does not exist.
+ */
+ protected static RoaringBitmap getBitmap(AggregationResultHolder aggregationResultHolder) {
+ RoaringBitmap bitmap = aggregationResultHolder.getResult();
+ if (bitmap == null) {
+ bitmap = new RoaringBitmap();
+ aggregationResultHolder.setValue(bitmap);
+ }
+ return bitmap;
+ }
+
+ /**
+ * Returns the bitmap for the given group key or creates a new one if it does not exist.
+ */
+ protected static RoaringBitmap getBitmap(GroupByResultHolder groupByResultHolder, int groupKey) {
+ RoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+ if (bitmap == null) {
+ bitmap = new RoaringBitmap();
+ groupByResultHolder.setValueForKey(groupKey, bitmap);
+ }
+ return bitmap;
+ }
+
+ /**
+ * Helper method to set value for the given group keys into the result holder.
+ */
+ private void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, int value) {
+ for (int groupKey : groupKeys) {
+ getBitmap(groupByResultHolder, groupKey).add(value);
+ }
+ }
+
+ /**
+ * Helper method to read dictionary and convert dictionary ids to hash code of the values for dictionary-encoded
+ * expression.
+ */
+ private static RoaringBitmap convertToValueBitmap(RoaringBitmap dictIdBitmap, Dictionary dictionary) {
+ RoaringBitmap valueBitmap = new RoaringBitmap();
+ PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
+ DataType valueType = dictionary.getValueType();
+ switch (valueType) {
+ case INT:
+ while (iterator.hasNext()) {
+ valueBitmap.add(dictionary.getIntValue(iterator.next()));
+ }
+ break;
+ case LONG:
+ while (iterator.hasNext()) {
+ valueBitmap.add(Long.hashCode(dictionary.getLongValue(iterator.next())));
+ }
+ break;
+ case FLOAT:
+ while (iterator.hasNext()) {
+ valueBitmap.add(Float.hashCode(dictionary.getFloatValue(iterator.next())));
+ }
+ break;
+ case DOUBLE:
+ while (iterator.hasNext()) {
+ valueBitmap.add(Double.hashCode(dictionary.getDoubleValue(iterator.next())));
+ }
+ break;
+ case STRING:
+ while (iterator.hasNext()) {
+ valueBitmap.add(dictionary.getStringValue(iterator.next()).hashCode());
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_BITMAP aggregation function: " + valueType);
+ }
+ return valueBitmap;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapMVAggregationFunction.java
new file mode 100644
index 0000000..2f5dbba
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapMVAggregationFunction.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * The {@code DistinctCountBitmapMVAggregationFunction} calculates the number of distinct values for a given multi-value
+ * expression using RoaringBitmap. The bitmap stores the actual values for {@code INT} expression, or hash code of the
+ * values for other data types (values with the same hash code will only be counted once).
+ */
+public class DistinctCountBitmapMVAggregationFunction extends DistinctCountBitmapAggregationFunction {
+
+ public DistinctCountBitmapMVAggregationFunction(ExpressionContext expression) {
+ super(expression);
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.DISTINCTCOUNTBITMAPMV;
+ }
+
+ @Override
+ public void accept(AggregationFunctionVisitorBase visitor) {
+ visitor.visit(this);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ RoaringBitmap bitmap = getBitmap(aggregationResultHolder);
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ _dictionary = dictionary;
+ int[][] dictIds = blockValSet.getDictionaryIdsMV();
+ for (int i = 0; i < length; i++) {
+ bitmap.add(dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store hash code of the values into the bitmap
+ DataType valueType = blockValSet.getValueType();
+ switch (valueType) {
+ case INT:
+ int[][] intValues = blockValSet.getIntValuesMV();
+ for (int i = 0; i < length; i++) {
+ bitmap.add(intValues[i]);
+ }
+ break;
+ case LONG:
+ long[][] longValues = blockValSet.getLongValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (long value : longValues[i]) {
+ bitmap.add(Long.hashCode(value));
+ }
+ }
+ break;
+ case FLOAT:
+ float[][] floatValues = blockValSet.getFloatValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (float value : floatValues[i]) {
+ bitmap.add(Float.hashCode(value));
+ }
+ }
+ case DOUBLE:
+ double[][] doubleValues = blockValSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (double value : doubleValues[i]) {
+ bitmap.add(Double.hashCode(value));
+ }
+ }
+ break;
+ case STRING:
+ String[][] stringValues = blockValSet.getStringValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (String value : stringValues[i]) {
+ bitmap.add(value.hashCode());
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_BITMAP_MV aggregation function: " + valueType);
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ _dictionary = dictionary;
+ int[][] dictIds = blockValSet.getDictionaryIdsMV();
+ for (int i = 0; i < length; i++) {
+ getBitmap(groupByResultHolder, groupKeyArray[i]).add(dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store hash code of the values into the bitmap
+ DataType valueType = blockValSet.getValueType();
+ switch (valueType) {
+ case INT:
+ int[][] intValues = blockValSet.getIntValuesMV();
+ for (int i = 0; i < length; i++) {
+ getBitmap(groupByResultHolder, groupKeyArray[i]).add(intValues[i]);
+ }
+ break;
+ case LONG:
+ long[][] longValues = blockValSet.getLongValuesMV();
+ for (int i = 0; i < length; i++) {
+ RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKeyArray[i]);
+ for (long value : longValues[i]) {
+ bitmap.add(Long.hashCode(value));
+ }
+ }
+ break;
+ case FLOAT:
+ float[][] floatValues = blockValSet.getFloatValuesMV();
+ for (int i = 0; i < length; i++) {
+ RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKeyArray[i]);
+ for (float value : floatValues[i]) {
+ bitmap.add(Float.hashCode(value));
+ }
+ }
+ break;
+ case DOUBLE:
+ double[][] doubleValues = blockValSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKeyArray[i]);
+ for (double value : doubleValues[i]) {
+ bitmap.add(Double.hashCode(value));
+ }
+ }
+ break;
+ case STRING:
+ String[][] stringValues = blockValSet.getStringValuesMV();
+ for (int i = 0; i < length; i++) {
+ RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKeyArray[i]);
+ for (String value : stringValues[i]) {
+ bitmap.add(value.hashCode());
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_BITMAP_MV aggregation function: " + valueType);
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // For dictionary-encoded expression, store dictionary ids into the bitmap
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ _dictionary = dictionary;
+ int[][] dictIds = blockValSet.getDictionaryIdsMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ getBitmap(groupByResultHolder, groupKey).add(dictIds[i]);
+ }
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store hash code of the values into the bitmap
+ DataType valueType = blockValSet.getValueType();
+ switch (valueType) {
+ case INT:
+ int[][] intValues = blockValSet.getIntValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ getBitmap(groupByResultHolder, groupKey).add(intValues[i]);
+ }
+ }
+ break;
+ case LONG:
+ long[][] longValues = blockValSet.getLongValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKey);
+ for (long value : longValues[i]) {
+ bitmap.add(Long.hashCode(value));
+ }
+ }
+ }
+ break;
+ case FLOAT:
+ float[][] floatValues = blockValSet.getFloatValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKey);
+ for (float value : floatValues[i]) {
+ bitmap.add(Float.hashCode(value));
+ }
+ }
+ }
+ break;
+ case DOUBLE:
+ double[][] doubleValues = blockValSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKey);
+ for (double value : doubleValues[i]) {
+ bitmap.add(Double.hashCode(value));
+ }
+ }
+ }
+ break;
+ case STRING:
+ String[][] stringValues = blockValSet.getStringValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKey);
+ for (String value : stringValues[i]) {
+ bitmap.add(value.hashCode());
+ }
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_BITMAP_MV aggregation function: " + valueType);
+ }
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountBitmapStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountBitmapStarTreeV2Test.java
new file mode 100644
index 0000000..23e9ca2
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountBitmapStarTreeV2Test.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.startree.v2;
+
+import java.util.Random;
+import org.apache.pinot.core.data.aggregator.DistinctCountBitmapValueAggregator;
+import org.apache.pinot.core.data.aggregator.ValueAggregator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DistinctCountBitmapStarTreeV2Test extends BaseStarTreeV2Test<Object, RoaringBitmap> {
+
+ @Override
+ ValueAggregator<Object, RoaringBitmap> getValueAggregator() {
+ return new DistinctCountBitmapValueAggregator();
+ }
+
+ @Override
+ DataType getRawValueType() {
+ return DataType.INT;
+ }
+
+ @Override
+ Object getRandomRawValue(Random random) {
+ return random.nextInt(100);
+ }
+
+ @Override
+ void assertAggregatedValue(RoaringBitmap starTreeResult, RoaringBitmap nonStarTreeResult) {
+ assertEquals(starTreeResult, nonStarTreeResult);
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountBitmapQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountBitmapQueriesTest.java
new file mode 100644
index 0000000..8a7fa41
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountBitmapQueriesTest.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.AggregationResult;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.GroupByResult;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.roaringbitmap.RoaringBitmap;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Queries test for DISTINCT_COUNT_BITMAP queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class DistinctCountBitmapQueriesTest extends BaseQueriesTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "DistinctCountBitmapQueriesTest");
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME = "testSegment";
+ private static final Random RANDOM = new Random();
+
+ private static final int NUM_RECORDS = 2000;
+ private static final int MAX_VALUE = 1000;
+
+ private static final String INT_COLUMN = "intColumn";
+ private static final String LONG_COLUMN = "longColumn";
+ private static final String FLOAT_COLUMN = "floatColumn";
+ private static final String DOUBLE_COLUMN = "doubleColumn";
+ private static final String STRING_COLUMN = "stringColumn";
+ private static final String BYTES_COLUMN = "bytesColumn";
+ private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT)
+ .addSingleValueDimension(LONG_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+ .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE).addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+ .addSingleValueDimension(BYTES_COLUMN, DataType.BYTES).build();
+ private static final TableConfig TABLE_CONFIG =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+ private Set<Integer> _values;
+ private int[] _expectedResults;
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
+
+ @Override
+ protected String getFilter() {
+ return "";
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(INDEX_DIR);
+
+ List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+ int hashMapCapacity = HashUtil.getHashMapCapacity(MAX_VALUE);
+ _values = new HashSet<>(hashMapCapacity);
+ Set<Integer> longResultSet = new HashSet<>(hashMapCapacity);
+ Set<Integer> floatResultSet = new HashSet<>(hashMapCapacity);
+ Set<Integer> doubleResultSet = new HashSet<>(hashMapCapacity);
+ Set<Integer> stringResultSet = new HashSet<>(hashMapCapacity);
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ int value = RANDOM.nextInt(MAX_VALUE);
+ GenericRow record = new GenericRow();
+ record.putValue(INT_COLUMN, value);
+ _values.add(Integer.hashCode(value));
+ record.putValue(LONG_COLUMN, (long) value);
+ longResultSet.add(Long.hashCode(value));
+ record.putValue(FLOAT_COLUMN, (float) value);
+ floatResultSet.add(Float.hashCode(value));
+ record.putValue(DOUBLE_COLUMN, (double) value);
+ doubleResultSet.add(Double.hashCode(value));
+ String stringValue = Integer.toString(value);
+ record.putValue(STRING_COLUMN, stringValue);
+ stringResultSet.add(stringValue.hashCode());
+ // Store serialized bitmaps in the BYTES column
+ RoaringBitmap bitmap = new RoaringBitmap();
+ bitmap.add(value);
+ byte[] bytesValue = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(bitmap);
+ record.putValue(BYTES_COLUMN, bytesValue);
+ records.add(record);
+ }
+ _expectedResults =
+ new int[]{_values.size(), longResultSet.size(), floatResultSet.size(), doubleResultSet.size(), stringResultSet.size(), _values.size()};
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+ segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+ segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+ driver.build();
+
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+ }
+
+ @Test
+ public void testAggregationOnly() {
+ String query =
+ "SELECT DISTINCTCOUNTBITMAP(intColumn), DISTINCTCOUNTBITMAP(longColumn), DISTINCTCOUNTBITMAP(floatColumn), DISTINCTCOUNTBITMAP(doubleColumn), DISTINCTCOUNTBITMAP(stringColumn), DISTINCTCOUNTBITMAP(bytesColumn) FROM testTable";
+
+ // Inner segment
+ Operator operator = getOperatorForPqlQuery(query);
+ assertTrue(operator instanceof AggregationOperator);
+ IntermediateResultsBlock resultsBlock = ((AggregationOperator) operator).nextBlock();
+ QueriesTestUtils
+ .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 6 * NUM_RECORDS,
+ NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getAggregationResult();
+ assertNotNull(aggregationResult);
+ for (int i = 0; i < 6; i++) {
+ assertEquals(((RoaringBitmap) aggregationResult.get(i)).getCardinality(), _expectedResults[i]);
+ }
+
+ // Inter segments
+ String[] expectedResults = new String[6];
+ for (int i = 0; i < 6; i++) {
+ expectedResults[i] = Integer.toString(_expectedResults[i]);
+ }
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+ QueriesTestUtils
+ .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0, 4 * 6 * NUM_RECORDS, 4 * NUM_RECORDS,
+ expectedResults);
+ }
+
+ @Test
+ public void testAggregationGroupBy() {
+ String query =
+ "SELECT DISTINCTCOUNT(intColumn), DISTINCTCOUNT(longColumn), DISTINCTCOUNT(floatColumn), DISTINCTCOUNT(doubleColumn), DISTINCTCOUNT(stringColumn), DISTINCTCOUNT(bytesColumn) FROM testTable GROUP BY intColumn";
+
+ // Inner segment
+ Operator operator = getOperatorForPqlQuery(query);
+ assertTrue(operator instanceof AggregationGroupByOperator);
+ IntermediateResultsBlock resultsBlock = ((AggregationGroupByOperator) operator).nextBlock();
+ QueriesTestUtils
+ .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 6 * NUM_RECORDS,
+ NUM_RECORDS);
+ AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
+ assertNotNull(aggregationGroupByResult);
+ int numGroups = 0;
+ Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
+ while (groupKeyIterator.hasNext()) {
+ numGroups++;
+ GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
+ assertTrue(_values.contains(Integer.parseInt(groupKey._stringKey)));
+ for (int i = 0; i < 6; i++) {
+ assertEquals(((Set<Integer>) aggregationGroupByResult.getResultForKey(groupKey, i)).size(), 1);
+ }
+ }
+ assertEquals(numGroups, _values.size());
+
+ // Inter segments
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+ Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 6 * NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+ // size of this array will be equal to number of aggregation functions since
+ // we return each aggregation function separately
+ List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
+ int numAggregationColumns = aggregationResults.size();
+ Assert.assertEquals(numAggregationColumns, 6);
+ for (AggregationResult aggregationResult : aggregationResults) {
+ Assert.assertNull(aggregationResult.getValue());
+ List<GroupByResult> groupByResults = aggregationResult.getGroupByResult();
+ numGroups = groupByResults.size();
+ for (int i = 0; i < numGroups; i++) {
+ GroupByResult groupByResult = groupByResults.get(i);
+ List<String> group = groupByResult.getGroup();
+ assertEquals(group.size(), 1);
+ assertTrue(_values.contains(Integer.parseInt(group.get(0))));
+ assertEquals(groupByResult.getValue(), Integer.toString(1));
+ }
+ }
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ _indexSegment.destroy();
+ FileUtils.deleteDirectory(INDEX_DIR);
+ }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
index 7b80f71..5de486d 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
@@ -65,7 +65,8 @@ public class StarTreeClusterIntegrationTest extends BaseClusterIntegrationTest {
private static final int NUM_STAR_TREE_METRICS = 5;
private static final List<AggregationFunctionType> AGGREGATION_FUNCTION_TYPES = Arrays
.asList(AggregationFunctionType.COUNT, AggregationFunctionType.MIN, AggregationFunctionType.MAX,
- AggregationFunctionType.SUM, AggregationFunctionType.AVG, AggregationFunctionType.MINMAXRANGE);
+ AggregationFunctionType.SUM, AggregationFunctionType.AVG, AggregationFunctionType.MINMAXRANGE,
+ AggregationFunctionType.DISTINCTCOUNTBITMAP);
private static final int NUM_QUERIES_TO_GENERATE = 100;
private String _currentTable;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org