You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/08/01 01:49:30 UTC

[incubator-pinot] branch master updated: Add DistinctCountBitmap aggregation function (#5766)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a3efba4  Add DistinctCountBitmap aggregation function (#5766)
a3efba4 is described below

commit a3efba4aef942e76b024205ef2e67ae656dd8cc5
Author: Kishore Gopalakrishna <g....@gmail.com>
AuthorDate: Fri Jul 31 18:49:14 2020 -0700

    Add DistinctCountBitmap aggregation function (#5766)
    
    We currently use `IntHashSet` to store distinct values for `DistinctCount` aggregation function.
    Added a new `DistinctCountBitmap` aggregation function which uses `RoaringBitmap` to store the distinct values which is more efficient, especially for ser/de.
    Also added star-tree support for this new function.
---
 .../common/function/AggregationFunctionType.java   |   2 +
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  37 +-
 .../DistinctCountBitmapValueAggregator.java        |  94 +++++
 .../data/aggregator/ValueAggregatorFactory.java    |   7 +-
 .../function/AggregationFunctionFactory.java       |   4 +
 .../function/AggregationFunctionVisitorBase.java   |   6 +
 .../DistinctCountBitmapAggregationFunction.java    | 415 +++++++++++++++++++++
 .../DistinctCountBitmapMVAggregationFunction.java  | 261 +++++++++++++
 .../v2/DistinctCountBitmapStarTreeV2Test.java      |  51 +++
 .../queries/DistinctCountBitmapQueriesTest.java    | 248 ++++++++++++
 .../tests/StarTreeClusterIntegrationTest.java      |   3 +-
 11 files changed, 1123 insertions(+), 5 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index 5125a8d..cf48ea6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -27,6 +27,7 @@ public enum AggregationFunctionType {
   AVG("avg"),
   MINMAXRANGE("minMaxRange"),
   DISTINCTCOUNT("distinctCount"),
+  DISTINCTCOUNTBITMAP("distinctCountBitmap"),
   DISTINCTCOUNTHLL("distinctCountHLL"),
   DISTINCTCOUNTRAWHLL("distinctCountRawHLL"),
   FASTHLL("fastHLL"),
@@ -47,6 +48,7 @@ public enum AggregationFunctionType {
   AVGMV("avgMV"),
   MINMAXRANGEMV("minMaxRangeMV"),
   DISTINCTCOUNTMV("distinctCountMV"),
+  DISTINCTCOUNTBITMAPMV("distinctCountBitmapMV"),
   DISTINCTCOUNTHLLMV("distinctCountHLLMV"),
   DISTINCTCOUNTRAWHLLMV("distinctCountRawHLLMV"),
   PERCENTILEMV("percentileMV"),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 637d5e1..9c87921 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -42,6 +42,7 @@ import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTab
 import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
 import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
 import org.locationtech.jts.geom.Geometry;
+import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -66,9 +67,10 @@ public class ObjectSerDeUtils {
     TDigest(10),
     DistinctTable(11),
     DataSketch(12),
-    Geometry(13);
+    Geometry(13),
+    RoaringBitmap(14);
 
-    private int _value;
+    private final int _value;
 
     ObjectType(int value) {
       _value = value;
@@ -107,6 +109,8 @@ public class ObjectSerDeUtils {
         return ObjectType.DataSketch;
       } else if (value instanceof Geometry) {
         return ObjectType.Geometry;
+      } else if (value instanceof RoaringBitmap) {
+        return ObjectType.RoaringBitmap;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
       }
@@ -506,6 +510,32 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<RoaringBitmap> ROARING_BITMAP_SER_DE = new ObjectSerDe<RoaringBitmap>() {
+    @Override
+    public byte[] serialize(RoaringBitmap bitmap) {
+      byte[] bytes = new byte[bitmap.serializedSizeInBytes()];
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      bitmap.serialize(byteBuffer);
+      return bytes;
+    }
+
+    @Override
+    public RoaringBitmap deserialize(byte[] bytes) {
+      return deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    @Override
+    public RoaringBitmap deserialize(ByteBuffer byteBuffer) {
+      RoaringBitmap bitmap = new RoaringBitmap();
+      try {
+        bitmap.deserialize(byteBuffer);
+      } catch (IOException e) {
+        throw new RuntimeException("Caught exception while deserializing RoaringBitmap", e);
+      }
+      return bitmap;
+    }
+  };
+
   // NOTE: DO NOT change the order, it has to be the same order as the ObjectType
   //@formatter:off
   private static final ObjectSerDe[] SER_DES = {
@@ -522,7 +552,8 @@ public class ObjectSerDeUtils {
       TDIGEST_SER_DE,
       DISTINCT_TABLE_SER_DE,
       DATA_SKETCH_SER_DE,
-      GEOMETRY_SER_DE
+      GEOMETRY_SER_DE,
+      ROARING_BITMAP_SER_DE
   };
   //@formatter:on
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java
new file mode 100644
index 0000000..3244267
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.aggregator;
+
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
+
+
+public class DistinctCountBitmapValueAggregator implements ValueAggregator<Object, RoaringBitmap> {
+  public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+  private int _maxByteSize;
+
+  @Override
+  public AggregationFunctionType getAggregationType() {
+    return AggregationFunctionType.DISTINCTCOUNTBITMAP;
+  }
+
+  @Override
+  public DataType getAggregatedValueType() {
+    return AGGREGATED_VALUE_TYPE;
+  }
+
+  @Override
+  public RoaringBitmap getInitialAggregatedValue(Object rawValue) {
+    RoaringBitmap initialValue;
+    if (rawValue instanceof byte[]) {
+      byte[] bytes = (byte[]) rawValue;
+      initialValue = deserializeAggregatedValue(bytes);
+      _maxByteSize = Math.max(_maxByteSize, bytes.length);
+    } else {
+      initialValue = new RoaringBitmap();
+      initialValue.add(rawValue.hashCode());
+      _maxByteSize = Math.max(_maxByteSize, initialValue.serializedSizeInBytes());
+    }
+    return initialValue;
+  }
+
+  @Override
+  public RoaringBitmap applyRawValue(RoaringBitmap value, Object rawValue) {
+    if (rawValue instanceof byte[]) {
+      value.or(deserializeAggregatedValue((byte[]) rawValue));
+    } else {
+      value.add(rawValue.hashCode());
+    }
+    _maxByteSize = Math.max(_maxByteSize, value.serializedSizeInBytes());
+    return value;
+  }
+
+  @Override
+  public RoaringBitmap applyAggregatedValue(RoaringBitmap value, RoaringBitmap aggregatedValue) {
+    value.or(aggregatedValue);
+    _maxByteSize = Math.max(_maxByteSize, value.serializedSizeInBytes());
+    return value;
+  }
+
+  @Override
+  public RoaringBitmap cloneAggregatedValue(RoaringBitmap value) {
+    return value.clone();
+  }
+
+  @Override
+  public int getMaxAggregatedValueByteSize() {
+    return _maxByteSize;
+  }
+
+  @Override
+  public byte[] serializeAggregatedValue(RoaringBitmap value) {
+    return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(value);
+  }
+
+  @Override
+  public RoaringBitmap deserializeAggregatedValue(byte[] bytes) {
+    return ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytes);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java
index 94dee04..d93e974 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pinot.core.data.aggregator;
 
-import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
  * The {@code ValueAggregatorFactory} class is the factory for all value aggregators.
  */
+@SuppressWarnings("rawtypes")
 public class ValueAggregatorFactory {
   private ValueAggregatorFactory() {
   }
@@ -49,6 +50,8 @@ public class ValueAggregatorFactory {
         return new AvgValueAggregator();
       case MINMAXRANGE:
         return new MinMaxRangeValueAggregator();
+      case DISTINCTCOUNTBITMAP:
+        return new DistinctCountBitmapValueAggregator();
       case DISTINCTCOUNTHLL:
       case DISTINCTCOUNTRAWHLL:
         return new DistinctCountHLLValueAggregator();
@@ -81,6 +84,8 @@ public class ValueAggregatorFactory {
         return AvgValueAggregator.AGGREGATED_VALUE_TYPE;
       case MINMAXRANGE:
         return MinMaxRangeValueAggregator.AGGREGATED_VALUE_TYPE;
+      case DISTINCTCOUNTBITMAP:
+        return DistinctCountBitmapValueAggregator.AGGREGATED_VALUE_TYPE;
       case DISTINCTCOUNTHLL:
       case DISTINCTCOUNTRAWHLL:
         return DistinctCountHLLValueAggregator.AGGREGATED_VALUE_TYPE;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index bfe8550..3a1bb01 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -121,6 +121,8 @@ public class AggregationFunctionFactory {
             return new MinMaxRangeAggregationFunction(firstArgument);
           case DISTINCTCOUNT:
             return new DistinctCountAggregationFunction(firstArgument);
+          case DISTINCTCOUNTBITMAP:
+            return new DistinctCountBitmapAggregationFunction(firstArgument);
           case DISTINCTCOUNTHLL:
             return new DistinctCountHLLAggregationFunction(arguments);
           case DISTINCTCOUNTRAWHLL:
@@ -145,6 +147,8 @@ public class AggregationFunctionFactory {
             return new MinMaxRangeMVAggregationFunction(firstArgument);
           case DISTINCTCOUNTMV:
             return new DistinctCountMVAggregationFunction(firstArgument);
+          case DISTINCTCOUNTBITMAPMV:
+            return new DistinctCountBitmapMVAggregationFunction(firstArgument);
           case DISTINCTCOUNTHLLMV:
             return new DistinctCountHLLMVAggregationFunction(arguments);
           case DISTINCTCOUNTRAWHLLMV:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
index 8e0a8a6..72c8d4c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
@@ -45,6 +45,12 @@ public class AggregationFunctionVisitorBase {
   public void visit(DistinctCountMVAggregationFunction function) {
   }
 
+  public void visit(DistinctCountBitmapAggregationFunction function) {
+  }
+
+  public void visit(DistinctCountBitmapMVAggregationFunction function) {
+  }
+
   public void visit(DistinctCountHLLAggregationFunction function) {
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java
new file mode 100644
index 0000000..fbb5153
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * The {@code DistinctCountBitmapAggregationFunction} calculates the number of distinct values for a given single-value
+ * expression using RoaringBitmap. The bitmap stores the actual values for {@code INT} expression, or hash code of the
+ * values for other data types (values with the same hash code will only be counted once).
+ */
+public class DistinctCountBitmapAggregationFunction extends BaseSingleInputAggregationFunction<RoaringBitmap, Integer> {
+  protected Dictionary _dictionary;
+
+  public DistinctCountBitmapAggregationFunction(ExpressionContext expression) {
+    super(expression);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTBITMAP;
+  }
+
+  @Override
+  public void accept(AggregationFunctionVisitorBase visitor) {
+    visitor.visit(this);
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized RoaringBitmap
+    DataType valueType = blockValSet.getValueType();
+    if (valueType == DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      RoaringBitmap bitmap = aggregationResultHolder.getResult();
+      if (bitmap != null) {
+        for (int i = 0; i < length; i++) {
+          bitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[i]));
+        }
+      } else {
+        bitmap = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[0]);
+        aggregationResultHolder.setValue(bitmap);
+        for (int i = 1; i < length; i++) {
+          bitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[i]));
+        }
+      }
+      return;
+    }
+
+    RoaringBitmap bitmap = getBitmap(aggregationResultHolder);
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      _dictionary = dictionary;
+      int[] dictIds = blockValSet.getDictionaryIdsSV();
+      bitmap.addN(dictIds, 0, length);
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store hash code of the values into the bitmap
+    switch (valueType) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+        bitmap.addN(intValues, 0, length);
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          bitmap.add(Long.hashCode(longValues[i]));
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = blockValSet.getFloatValuesSV();
+        for (int i = 0; i < length; i++) {
+          bitmap.add(Float.hashCode(floatValues[i]));
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = blockValSet.getDoubleValuesSV();
+        for (int i = 0; i < length; i++) {
+          bitmap.add(Double.hashCode(doubleValues[i]));
+        }
+        break;
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+        for (int i = 0; i < length; i++) {
+          bitmap.add(stringValues[i].hashCode());
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_BITMAP aggregation function: " + valueType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized RoaringBitmap
+    DataType valueType = blockValSet.getValueType();
+    if (valueType == DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        RoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[i]);
+        int groupKey = groupKeyArray[i];
+        RoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+        if (bitmap != null) {
+          bitmap.or(value);
+        } else {
+          groupByResultHolder.setValueForKey(groupKey, value);
+        }
+      }
+      return;
+    }
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      _dictionary = dictionary;
+      int[] dictIds = blockValSet.getDictionaryIdsSV();
+      for (int i = 0; i < length; i++) {
+        getBitmap(groupByResultHolder, groupKeyArray[i]).add(dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store hash code of the values into the bitmap
+    switch (valueType) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+        for (int i = 0; i < length; i++) {
+          getBitmap(groupByResultHolder, groupKeyArray[i]).add(intValues[i]);
+        }
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          getBitmap(groupByResultHolder, groupKeyArray[i]).add(Long.hashCode(longValues[i]));
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = blockValSet.getFloatValuesSV();
+        for (int i = 0; i < length; i++) {
+          getBitmap(groupByResultHolder, groupKeyArray[i]).add(Float.hashCode(floatValues[i]));
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = blockValSet.getDoubleValuesSV();
+        for (int i = 0; i < length; i++) {
+          getBitmap(groupByResultHolder, groupKeyArray[i]).add(Double.hashCode(doubleValues[i]));
+        }
+        break;
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+        for (int i = 0; i < length; i++) {
+          getBitmap(groupByResultHolder, groupKeyArray[i]).add(stringValues[i].hashCode());
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_BITMAP aggregation function: " + valueType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized RoaringBitmap
+    DataType valueType = blockValSet.getValueType();
+    if (valueType == DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        RoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.deserialize(bytesValues[i]);
+        for (int groupKey : groupKeysArray[i]) {
+          RoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+          if (bitmap != null) {
+            bitmap.or(value);
+          } else {
+            // Clone a bitmap for the group
+            groupByResultHolder.setValueForKey(groupKey, value.clone());
+          }
+        }
+      }
+      return;
+    }
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      _dictionary = dictionary;
+      int[] dictIds = blockValSet.getDictionaryIdsSV();
+      for (int i = 0; i < length; i++) {
+        setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store hash code of the values into the bitmap
+    switch (valueType) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+        for (int i = 0; i < length; i++) {
+          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], intValues[i]);
+        }
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Long.hashCode(longValues[i]));
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = blockValSet.getFloatValuesSV();
+        for (int i = 0; i < length; i++) {
+          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Float.hashCode(floatValues[i]));
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = blockValSet.getDoubleValuesSV();
+        for (int i = 0; i < length; i++) {
+          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Double.hashCode(doubleValues[i]));
+        }
+        break;
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+        for (int i = 0; i < length; i++) {
+          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], stringValues[i].hashCode());
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_BITMAP aggregation function: " + valueType);
+    }
+  }
+
+  @Override
+  public RoaringBitmap extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    RoaringBitmap bitmap = aggregationResultHolder.getResult();
+    if (bitmap == null) {
+      return new RoaringBitmap();
+    }
+
+    if (_dictionary != null) {
+      // For dictionary-encoded expression, convert dictionary ids to hash code of the values
+      return convertToValueBitmap(bitmap, _dictionary);
+    } else {
+      // For serialized RoaringBitmap and non-dictionary-encoded expression, directly return the bitmap
+      return bitmap;
+    }
+  }
+
+  @Override
+  public RoaringBitmap extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    RoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+    if (bitmap == null) {
+      return new RoaringBitmap();
+    }
+
+    if (_dictionary != null) {
+      // For dictionary-encoded expression, convert dictionary ids to hash code of the values
+      return convertToValueBitmap(bitmap, _dictionary);
+    } else {
+      // For serialized RoaringBitmap and non-dictionary-encoded expression, directly return the bitmap
+      return bitmap;
+    }
+  }
+
+  @Override
+  public RoaringBitmap merge(RoaringBitmap intermediateResult1, RoaringBitmap intermediateResult2) {
+    intermediateResult1.or(intermediateResult2);
+    return intermediateResult1;
+  }
+
+  @Override
+  public boolean isIntermediateResultComparable() {
+    return false;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.INT;
+  }
+
+  @Override
+  public Integer extractFinalResult(RoaringBitmap intermediateResult) {
+    return intermediateResult.getCardinality();
+  }
+
+  /**
+   * Returns the bitmap from the result holder or creates a new one if it does not exist.
+   */
+  protected static RoaringBitmap getBitmap(AggregationResultHolder aggregationResultHolder) {
+    RoaringBitmap bitmap = aggregationResultHolder.getResult();
+    if (bitmap == null) {
+      bitmap = new RoaringBitmap();
+      aggregationResultHolder.setValue(bitmap);
+    }
+    return bitmap;
+  }
+
+  /**
+   * Returns the bitmap for the given group key or creates a new one if it does not exist.
+   */
+  protected static RoaringBitmap getBitmap(GroupByResultHolder groupByResultHolder, int groupKey) {
+    RoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+    if (bitmap == null) {
+      bitmap = new RoaringBitmap();
+      groupByResultHolder.setValueForKey(groupKey, bitmap);
+    }
+    return bitmap;
+  }
+
+  /**
+   * Helper method to set value for the given group keys into the result holder.
+   */
+  private void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, int value) {
+    for (int groupKey : groupKeys) {
+      getBitmap(groupByResultHolder, groupKey).add(value);
+    }
+  }
+
+  /**
+   * Helper method to read dictionary and convert dictionary ids to hash code of the values for dictionary-encoded
+   * expression.
+   */
+  private static RoaringBitmap convertToValueBitmap(RoaringBitmap dictIdBitmap, Dictionary dictionary) {
+    RoaringBitmap valueBitmap = new RoaringBitmap();
+    PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
+    DataType valueType = dictionary.getValueType();
+    switch (valueType) {
+      case INT:
+        while (iterator.hasNext()) {
+          valueBitmap.add(dictionary.getIntValue(iterator.next()));
+        }
+        break;
+      case LONG:
+        while (iterator.hasNext()) {
+          valueBitmap.add(Long.hashCode(dictionary.getLongValue(iterator.next())));
+        }
+        break;
+      case FLOAT:
+        while (iterator.hasNext()) {
+          valueBitmap.add(Float.hashCode(dictionary.getFloatValue(iterator.next())));
+        }
+        break;
+      case DOUBLE:
+        while (iterator.hasNext()) {
+          valueBitmap.add(Double.hashCode(dictionary.getDoubleValue(iterator.next())));
+        }
+        break;
+      case STRING:
+        while (iterator.hasNext()) {
+          valueBitmap.add(dictionary.getStringValue(iterator.next()).hashCode());
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_BITMAP aggregation function: " + valueType);
+    }
+    return valueBitmap;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapMVAggregationFunction.java
new file mode 100644
index 0000000..2f5dbba
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapMVAggregationFunction.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * The {@code DistinctCountBitmapMVAggregationFunction} calculates the number of distinct values for a given multi-value
+ * expression using RoaringBitmap. The bitmap stores the actual values for {@code INT} expression, or hash code of the
+ * values for other data types (values with the same hash code will only be counted once).
+ */
+public class DistinctCountBitmapMVAggregationFunction extends DistinctCountBitmapAggregationFunction {
+
+  public DistinctCountBitmapMVAggregationFunction(ExpressionContext expression) {
+    super(expression);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTBITMAPMV;
+  }
+
+  @Override
+  public void accept(AggregationFunctionVisitorBase visitor) {
+    visitor.visit(this);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    RoaringBitmap bitmap = getBitmap(aggregationResultHolder);
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      _dictionary = dictionary;
+      int[][] dictIds = blockValSet.getDictionaryIdsMV();
+      for (int i = 0; i < length; i++) {
+        bitmap.add(dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store hash code of the values into the bitmap
+    DataType valueType = blockValSet.getValueType();
+    switch (valueType) {
+      case INT:
+        int[][] intValues = blockValSet.getIntValuesMV();
+        for (int i = 0; i < length; i++) {
+          bitmap.add(intValues[i]);
+        }
+        break;
+      case LONG:
+        long[][] longValues = blockValSet.getLongValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (long value : longValues[i]) {
+            bitmap.add(Long.hashCode(value));
+          }
+        }
+        break;
+      case FLOAT:
+        float[][] floatValues = blockValSet.getFloatValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (float value : floatValues[i]) {
+            bitmap.add(Float.hashCode(value));
+          }
+        }
+      case DOUBLE:
+        double[][] doubleValues = blockValSet.getDoubleValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (double value : doubleValues[i]) {
+            bitmap.add(Double.hashCode(value));
+          }
+        }
+        break;
+      case STRING:
+        String[][] stringValues = blockValSet.getStringValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (String value : stringValues[i]) {
+            bitmap.add(value.hashCode());
+          }
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_BITMAP_MV aggregation function: " + valueType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      _dictionary = dictionary;
+      int[][] dictIds = blockValSet.getDictionaryIdsMV();
+      for (int i = 0; i < length; i++) {
+        getBitmap(groupByResultHolder, groupKeyArray[i]).add(dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store hash code of the values into the bitmap
+    DataType valueType = blockValSet.getValueType();
+    switch (valueType) {
+      case INT:
+        int[][] intValues = blockValSet.getIntValuesMV();
+        for (int i = 0; i < length; i++) {
+          getBitmap(groupByResultHolder, groupKeyArray[i]).add(intValues[i]);
+        }
+        break;
+      case LONG:
+        long[][] longValues = blockValSet.getLongValuesMV();
+        for (int i = 0; i < length; i++) {
+          RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKeyArray[i]);
+          for (long value : longValues[i]) {
+            bitmap.add(Long.hashCode(value));
+          }
+        }
+        break;
+      case FLOAT:
+        float[][] floatValues = blockValSet.getFloatValuesMV();
+        for (int i = 0; i < length; i++) {
+          RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKeyArray[i]);
+          for (float value : floatValues[i]) {
+            bitmap.add(Float.hashCode(value));
+          }
+        }
+        break;
+      case DOUBLE:
+        double[][] doubleValues = blockValSet.getDoubleValuesMV();
+        for (int i = 0; i < length; i++) {
+          RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKeyArray[i]);
+          for (double value : doubleValues[i]) {
+            bitmap.add(Double.hashCode(value));
+          }
+        }
+        break;
+      case STRING:
+        String[][] stringValues = blockValSet.getStringValuesMV();
+        for (int i = 0; i < length; i++) {
+          RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKeyArray[i]);
+          for (String value : stringValues[i]) {
+            bitmap.add(value.hashCode());
+          }
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_BITMAP_MV aggregation function: " + valueType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // For dictionary-encoded expression, store dictionary ids into the bitmap
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      _dictionary = dictionary;
+      int[][] dictIds = blockValSet.getDictionaryIdsMV();
+      for (int i = 0; i < length; i++) {
+        for (int groupKey : groupKeysArray[i]) {
+          getBitmap(groupByResultHolder, groupKey).add(dictIds[i]);
+        }
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store hash code of the values into the bitmap
+    DataType valueType = blockValSet.getValueType();
+    switch (valueType) {
+      case INT:
+        int[][] intValues = blockValSet.getIntValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            getBitmap(groupByResultHolder, groupKey).add(intValues[i]);
+          }
+        }
+        break;
+      case LONG:
+        long[][] longValues = blockValSet.getLongValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKey);
+            for (long value : longValues[i]) {
+              bitmap.add(Long.hashCode(value));
+            }
+          }
+        }
+        break;
+      case FLOAT:
+        float[][] floatValues = blockValSet.getFloatValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKey);
+            for (float value : floatValues[i]) {
+              bitmap.add(Float.hashCode(value));
+            }
+          }
+        }
+        break;
+      case DOUBLE:
+        double[][] doubleValues = blockValSet.getDoubleValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKey);
+            for (double value : doubleValues[i]) {
+              bitmap.add(Double.hashCode(value));
+            }
+          }
+        }
+        break;
+      case STRING:
+        String[][] stringValues = blockValSet.getStringValuesMV();
+        for (int i = 0; i < length; i++) {
+          for (int groupKey : groupKeysArray[i]) {
+            RoaringBitmap bitmap = getBitmap(groupByResultHolder, groupKey);
+            for (String value : stringValues[i]) {
+              bitmap.add(value.hashCode());
+            }
+          }
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_BITMAP_MV aggregation function: " + valueType);
+    }
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountBitmapStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountBitmapStarTreeV2Test.java
new file mode 100644
index 0000000..23e9ca2
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountBitmapStarTreeV2Test.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.startree.v2;
+
+import java.util.Random;
+import org.apache.pinot.core.data.aggregator.DistinctCountBitmapValueAggregator;
+import org.apache.pinot.core.data.aggregator.ValueAggregator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DistinctCountBitmapStarTreeV2Test extends BaseStarTreeV2Test<Object, RoaringBitmap> {
+
+  @Override
+  ValueAggregator<Object, RoaringBitmap> getValueAggregator() {
+    return new DistinctCountBitmapValueAggregator();
+  }
+
+  @Override
+  DataType getRawValueType() {
+    return DataType.INT;
+  }
+
+  @Override
+  Object getRandomRawValue(Random random) {
+    return random.nextInt(100);
+  }
+
+  @Override
+  void assertAggregatedValue(RoaringBitmap starTreeResult, RoaringBitmap nonStarTreeResult) {
+    assertEquals(starTreeResult, nonStarTreeResult);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountBitmapQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountBitmapQueriesTest.java
new file mode 100644
index 0000000..8a7fa41
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountBitmapQueriesTest.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.AggregationResult;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.GroupByResult;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.roaringbitmap.RoaringBitmap;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Queries test for DISTINCT_COUNT_BITMAP queries.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class DistinctCountBitmapQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "DistinctCountBitmapQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final Random RANDOM = new Random();
+
+  private static final int NUM_RECORDS = 2000;
+  private static final int MAX_VALUE = 1000;
+
+  private static final String INT_COLUMN = "intColumn";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String FLOAT_COLUMN = "floatColumn";
+  private static final String DOUBLE_COLUMN = "doubleColumn";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String BYTES_COLUMN = "bytesColumn";
+  private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(LONG_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE).addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(BYTES_COLUMN, DataType.BYTES).build();
+  private static final TableConfig TABLE_CONFIG =
+      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+  private Set<Integer> _values;
+  private int[] _expectedResults;
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    int hashMapCapacity = HashUtil.getHashMapCapacity(MAX_VALUE);
+    _values = new HashSet<>(hashMapCapacity);
+    Set<Integer> longResultSet = new HashSet<>(hashMapCapacity);
+    Set<Integer> floatResultSet = new HashSet<>(hashMapCapacity);
+    Set<Integer> doubleResultSet = new HashSet<>(hashMapCapacity);
+    Set<Integer> stringResultSet = new HashSet<>(hashMapCapacity);
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      int value = RANDOM.nextInt(MAX_VALUE);
+      GenericRow record = new GenericRow();
+      record.putValue(INT_COLUMN, value);
+      _values.add(Integer.hashCode(value));
+      record.putValue(LONG_COLUMN, (long) value);
+      longResultSet.add(Long.hashCode(value));
+      record.putValue(FLOAT_COLUMN, (float) value);
+      floatResultSet.add(Float.hashCode(value));
+      record.putValue(DOUBLE_COLUMN, (double) value);
+      doubleResultSet.add(Double.hashCode(value));
+      String stringValue = Integer.toString(value);
+      record.putValue(STRING_COLUMN, stringValue);
+      stringResultSet.add(stringValue.hashCode());
+      // Store serialized bitmaps in the BYTES column
+      RoaringBitmap bitmap = new RoaringBitmap();
+      bitmap.add(value);
+      byte[] bytesValue = ObjectSerDeUtils.ROARING_BITMAP_SER_DE.serialize(bitmap);
+      record.putValue(BYTES_COLUMN, bytesValue);
+      records.add(record);
+    }
+    _expectedResults =
+        new int[]{_values.size(), longResultSet.size(), floatResultSet.size(), doubleResultSet.size(), stringResultSet.size(), _values.size()};
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  @Test
+  public void testAggregationOnly() {
+    String query =
+        "SELECT DISTINCTCOUNTBITMAP(intColumn), DISTINCTCOUNTBITMAP(longColumn), DISTINCTCOUNTBITMAP(floatColumn), DISTINCTCOUNTBITMAP(doubleColumn), DISTINCTCOUNTBITMAP(stringColumn), DISTINCTCOUNTBITMAP(bytesColumn) FROM testTable";
+
+    // Inner segment
+    Operator operator = getOperatorForPqlQuery(query);
+    assertTrue(operator instanceof AggregationOperator);
+    IntermediateResultsBlock resultsBlock = ((AggregationOperator) operator).nextBlock();
+    QueriesTestUtils
+        .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 6 * NUM_RECORDS,
+            NUM_RECORDS);
+    List<Object> aggregationResult = resultsBlock.getAggregationResult();
+    assertNotNull(aggregationResult);
+    for (int i = 0; i < 6; i++) {
+      assertEquals(((RoaringBitmap) aggregationResult.get(i)).getCardinality(), _expectedResults[i]);
+    }
+
+    // Inter segments
+    String[] expectedResults = new String[6];
+    for (int i = 0; i < 6; i++) {
+      expectedResults[i] = Integer.toString(_expectedResults[i]);
+    }
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+    QueriesTestUtils
+        .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0, 4 * 6 * NUM_RECORDS, 4 * NUM_RECORDS,
+            expectedResults);
+  }
+
+  @Test
+  public void testAggregationGroupBy() {
+    String query =
+        "SELECT DISTINCTCOUNT(intColumn), DISTINCTCOUNT(longColumn), DISTINCTCOUNT(floatColumn), DISTINCTCOUNT(doubleColumn), DISTINCTCOUNT(stringColumn), DISTINCTCOUNT(bytesColumn) FROM testTable GROUP BY intColumn";
+
+    // Inner segment
+    Operator operator = getOperatorForPqlQuery(query);
+    assertTrue(operator instanceof AggregationGroupByOperator);
+    IntermediateResultsBlock resultsBlock = ((AggregationGroupByOperator) operator).nextBlock();
+    QueriesTestUtils
+        .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 6 * NUM_RECORDS,
+            NUM_RECORDS);
+    AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
+    assertNotNull(aggregationGroupByResult);
+    int numGroups = 0;
+    Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
+    while (groupKeyIterator.hasNext()) {
+      numGroups++;
+      GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
+      assertTrue(_values.contains(Integer.parseInt(groupKey._stringKey)));
+      for (int i = 0; i < 6; i++) {
+        assertEquals(((Set<Integer>) aggregationGroupByResult.getResultForKey(groupKey, i)).size(), 1);
+      }
+    }
+    assertEquals(numGroups, _values.size());
+
+    // Inter segments
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+    Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 6 * NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+    // size of this array will be equal to number of aggregation functions since
+    // we return each aggregation function separately
+    List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
+    int numAggregationColumns = aggregationResults.size();
+    Assert.assertEquals(numAggregationColumns, 6);
+    for (AggregationResult aggregationResult : aggregationResults) {
+      Assert.assertNull(aggregationResult.getValue());
+      List<GroupByResult> groupByResults = aggregationResult.getGroupByResult();
+      numGroups = groupByResults.size();
+      for (int i = 0; i < numGroups; i++) {
+        GroupByResult groupByResult = groupByResults.get(i);
+        List<String> group = groupByResult.getGroup();
+        assertEquals(group.size(), 1);
+        assertTrue(_values.contains(Integer.parseInt(group.get(0))));
+        assertEquals(groupByResult.getValue(), Integer.toString(1));
+      }
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    _indexSegment.destroy();
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
index 7b80f71..5de486d 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
@@ -65,7 +65,8 @@ public class StarTreeClusterIntegrationTest extends BaseClusterIntegrationTest {
   private static final int NUM_STAR_TREE_METRICS = 5;
   private static final List<AggregationFunctionType> AGGREGATION_FUNCTION_TYPES = Arrays
       .asList(AggregationFunctionType.COUNT, AggregationFunctionType.MIN, AggregationFunctionType.MAX,
-          AggregationFunctionType.SUM, AggregationFunctionType.AVG, AggregationFunctionType.MINMAXRANGE);
+          AggregationFunctionType.SUM, AggregationFunctionType.AVG, AggregationFunctionType.MINMAXRANGE,
+          AggregationFunctionType.DISTINCTCOUNTBITMAP);
   private static final int NUM_QUERIES_TO_GENERATE = 100;
 
   private String _currentTable;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org