You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/07/29 00:18:06 UTC

[incubator-pinot] 01/01: Adding distinct count support based on bitmap

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch distinct-count-bitmap
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit cbcc6a0d0641ebc7ea99d7bda488599b56f5c845
Author: kishoreg <g....@gmail.com>
AuthorDate: Tue Jul 28 17:16:40 2020 -0700

    Adding distinct count support based on bitmap
---
 .../common/function/AggregationFunctionType.java   |   1 +
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  62 +++-
 .../DistinctCountBitmapValueAggregator.java        |  95 ++++++
 .../function/AggregationFunctionFactory.java       |   2 +
 .../function/AggregationFunctionVisitorBase.java   |   3 +
 .../function/DistinctCountAggregationFunction.java |  71 +++++
 .../DistinctCountBitmapAggregationFunction.java    | 354 +++++++++++++++++++++
 7 files changed, 572 insertions(+), 16 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index ff3fb50..6c7ebe5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -28,6 +28,7 @@ public enum AggregationFunctionType {
   MINMAXRANGE("minMaxRange"),
   DISTINCTCOUNT("distinctCount"),
   DISTINCTCOUNTHLL("distinctCountHLL"),
+  DISTINCTCOUNTBITMAP("distinctCountBitmap"),
   DISTINCTCOUNTRAWHLL("distinctCountRawHLL"),
   FASTHLL("fastHLL"),
   DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch"),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index f471e37..5cbe20f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -40,6 +40,8 @@ import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
 import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
 import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
 import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 /**
@@ -63,7 +65,8 @@ public class ObjectSerDeUtils {
     IntSet(9),
     TDigest(10),
     DistinctTable(11),
-    DataSketch(12);
+    DataSketch(12),
+    Bitmap(13);
 
     private int _value;
 
@@ -102,6 +105,8 @@ public class ObjectSerDeUtils {
         return ObjectType.DistinctTable;
       } else if (value instanceof Sketch) {
         return ObjectType.DataSketch;
+      } else if (value instanceof MutableRoaringBitmap) {
+        return ObjectType.Bitmap;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
       }
@@ -286,6 +291,44 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<MutableRoaringBitmap> ROARING_BITMAP_SERDE = new ObjectSerDe<MutableRoaringBitmap>() {
+
+    @Override
+    public byte[] serialize(MutableRoaringBitmap bitmap) {
+      try {
+        byte[] bytes = new byte[bitmap.serializedSizeInBytes()];
+        bitmap.serialize(ByteBuffer.wrap(bytes));
+        return bytes;
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while serializing RoaringBitmap", e);
+      }
+    }
+
+    @Override
+    public MutableRoaringBitmap deserialize(byte[] bytes) {
+      try {
+        MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+        bitmap.deserialize(ByteBuffer.wrap(bytes));
+        return bitmap;
+      } catch (IOException e) {
+        throw new RuntimeException("Caught exception while de-serializing MutableRoaringBitmap", e);
+      }
+    }
+
+    @Override
+    public MutableRoaringBitmap deserialize(ByteBuffer byteBuffer) {
+      byte[] bytes = new byte[byteBuffer.remaining()];
+      byteBuffer.get(bytes);
+      try {
+        MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+        bitmap.deserialize(ByteBuffer.wrap(bytes));
+        return bitmap;
+      } catch (IOException e) {
+        throw new RuntimeException("Caught exception while de-serializing MutableRoaringBitmap", e);
+      }
+    }
+  };
+
   public static final ObjectSerDe<DistinctTable> DISTINCT_TABLE_SER_DE = new ObjectSerDe<DistinctTable>() {
 
     @Override
@@ -484,21 +527,8 @@ public class ObjectSerDeUtils {
 
   // NOTE: DO NOT change the order, it has to be the same order as the ObjectType
   //@formatter:off
-  private static final ObjectSerDe[] SER_DES = {
-      STRING_SER_DE,
-      LONG_SER_DE,
-      DOUBLE_SER_DE,
-      DOUBLE_ARRAY_LIST_SER_DE,
-      AVG_PAIR_SER_DE,
-      MIN_MAX_RANGE_PAIR_SER_DE,
-      HYPER_LOG_LOG_SER_DE,
-      QUANTILE_DIGEST_SER_DE,
-      MAP_SER_DE,
-      INT_SET_SER_DE,
-      TDIGEST_SER_DE,
-      DISTINCT_TABLE_SER_DE,
-      DATA_SKETCH_SER_DE
-  };
+  private static final ObjectSerDe[] SER_DES =
+      {STRING_SER_DE, LONG_SER_DE, DOUBLE_SER_DE, DOUBLE_ARRAY_LIST_SER_DE, AVG_PAIR_SER_DE, MIN_MAX_RANGE_PAIR_SER_DE, HYPER_LOG_LOG_SER_DE, QUANTILE_DIGEST_SER_DE, MAP_SER_DE, INT_SET_SER_DE, TDIGEST_SER_DE, DISTINCT_TABLE_SER_DE, DATA_SKETCH_SER_DE, ROARING_BITMAP_SERDE};
   //@formatter:on
 
   public static byte[] serialize(Object value) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java
new file mode 100644
index 0000000..5623a17
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.aggregator;
+
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import java.util.Objects;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class DistinctCountBitmapValueAggregator implements ValueAggregator<Object, MutableRoaringBitmap> {
+  public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+  private int _maxByteSize;
+
+  @Override
+  public AggregationFunctionType getAggregationType() {
+    return AggregationFunctionType.DISTINCTCOUNTBITMAP;
+  }
+
+  @Override
+  public DataType getAggregatedValueType() {
+    return AGGREGATED_VALUE_TYPE;
+  }
+
+  @Override
+  public MutableRoaringBitmap getInitialAggregatedValue(Object rawValue) {
+    MutableRoaringBitmap initialValue;
+    if (rawValue instanceof byte[]) {
+      byte[] bytes = (byte[]) rawValue;
+      initialValue = deserializeAggregatedValue(bytes);
+      _maxByteSize = Math.max(_maxByteSize, bytes.length);
+    } else {
+      initialValue = new MutableRoaringBitmap();
+      initialValue.add(Objects.hashCode(rawValue) & Integer.MAX_VALUE);
+      _maxByteSize = Math.max(_maxByteSize, initialValue.serializedSizeInBytes());
+    }
+    return initialValue;
+  }
+
+  @Override
+  public MutableRoaringBitmap applyRawValue(MutableRoaringBitmap value, Object rawValue) {
+    if (rawValue instanceof byte[]) {
+      value.or(deserializeAggregatedValue((byte[]) rawValue));
+    } else {
+      value.add(Objects.hashCode(rawValue) & Integer.MAX_VALUE);
+    }
+    _maxByteSize = Math.max(_maxByteSize, value.serializedSizeInBytes());
+    return value;
+  }
+
+  @Override
+  public MutableRoaringBitmap applyAggregatedValue(MutableRoaringBitmap value, MutableRoaringBitmap aggregatedValue) {
+    value.or(aggregatedValue);
+    _maxByteSize = Math.max(_maxByteSize, value.serializedSizeInBytes());
+    return value;
+  }
+
+  @Override
+  public MutableRoaringBitmap cloneAggregatedValue(MutableRoaringBitmap value) {
+    return deserializeAggregatedValue(serializeAggregatedValue(value));
+  }
+
+  @Override
+  public int getMaxAggregatedValueByteSize() {
+    return _maxByteSize;
+  }
+
+  @Override
+  public byte[] serializeAggregatedValue(MutableRoaringBitmap value) {
+    return ObjectSerDeUtils.ROARING_BITMAP_SERDE.serialize(value);
+  }
+
+  @Override
+  public MutableRoaringBitmap deserializeAggregatedValue(byte[] bytes) {
+    return ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytes);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 021c188..37cebaf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -123,6 +123,8 @@ public class AggregationFunctionFactory {
             return new DistinctCountAggregationFunction(firstArgument);
           case DISTINCTCOUNTHLL:
             return new DistinctCountHLLAggregationFunction(arguments);
+          case DISTINCTCOUNTBITMAP:
+            return new DistinctCountBitmapAggregationFunction(arguments);
           case DISTINCTCOUNTRAWHLL:
             return new DistinctCountRawHLLAggregationFunction(arguments);
           case FASTHLL:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
index 2b5b615..aadb583 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
@@ -98,5 +98,8 @@ public class AggregationFunctionVisitorBase {
 
   public void visit(DistinctCountThetaSketchAggregationFunction function) {
   }
+
+  public void visit(DistinctCountBitmapAggregationFunction distinctCountBitmapAggregationFunction) {
+  }
 }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
index 7e7ba4b..6077ba7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
@@ -18,18 +18,23 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Map;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<IntOpenHashSet, Integer> {
@@ -298,4 +303,70 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
     }
     return valueSet;
   }
+
+  public static void main(String[] args) {
+    int length = 30_000_000;
+
+    int cardinality = 10_000_000;
+
+    double[] metric = new double[length];
+    String[] strings = new String[length];
+
+    for (int i = 0; i < length; i++) {
+      metric[i] = (i * 1001 )% cardinality;
+      strings[i] = "asdasdasdadsad" + i % cardinality;
+    }
+    System.out.println(" ================= ");
+    //String TEST
+    long start = System.currentTimeMillis();
+//    IntOpenHashSet set = new IntOpenHashSet(3100000);
+    IntOpenHashSet stringSet = new IntOpenHashSet();
+//    HashSet<Integer> set = new HashSet<Integer>(3100000);
+    for (int i = 0; i < length; i++) {
+      stringSet.add(strings[i].hashCode());
+    }
+    System.out.println(System.currentTimeMillis() - start);
+    System.out.println(stringSet.size());
+    System.out.println(" ================= ");
+
+    //DOUBLE TEST
+    start = System.currentTimeMillis();
+//    IntOpenHashSet set = new IntOpenHashSet(3100000);
+    IntOpenHashSet doubleSet = new IntOpenHashSet(1000);
+//    HashSet<Integer> set = new HashSet<Integer>(3100000);
+    for (int i = 0; i < length; i++) {
+      doubleSet.add(Double.hashCode(metric[i]));
+    }
+    System.out.println(System.currentTimeMillis() - start);
+    System.out.println(doubleSet.size());
+    start = System.currentTimeMillis();
+    byte[] serialize = ObjectSerDeUtils.serialize(doubleSet);
+    System.out.println("took = " + (System.currentTimeMillis() - start));
+    System.out.println(serialize.length);
+    System.out.println(" ================= ");
+
+
+    //BITMAP TEST
+    start = System.currentTimeMillis();
+    MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+    for (int i = 0; i < length; i++) {
+      bitmap.add(Double.hashCode(metric[i]));
+    }
+    System.out.println(System.currentTimeMillis() - start);
+    System.out.println(bitmap.getCardinality());
+    System.out.println(bitmap.serializedSizeInBytes());
+    System.out.println(" ================= ");
+
+    //HLL TEST
+    start = System.currentTimeMillis();
+    HyperLogLog hll = new HyperLogLog(12);
+    for (int i = 0; i < length; i++) {
+      hll.offer(metric[i]);
+    }
+    System.out.println(System.currentTimeMillis() - start);
+    System.out.println(hll.cardinality());
+    System.out.println(hll.sizeof());
+    System.out.println(" ================= ");
+
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java
new file mode 100644
index 0000000..1a10f07
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class DistinctCountBitmapAggregationFunction extends BaseSingleInputAggregationFunction<ImmutableRoaringBitmap, Long> {
+
+  public DistinctCountBitmapAggregationFunction(List<ExpressionContext> arguments) {
+    super(arguments.get(0));
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTBITMAP;
+  }
+
+  @Override
+  public void accept(AggregationFunctionVisitorBase visitor) {
+    visitor.visit(this);
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    DataType valueType = blockValSet.getValueType();
+
+    if (valueType != DataType.BYTES) {
+      MutableRoaringBitmap bitmap = getDefaultBitmap(aggregationResultHolder);
+
+      switch (valueType) {
+        case INT:
+          int[] intValues = blockValSet.getIntValuesSV();
+          for (int i = 0; i < length; i++) {
+            bitmap.add(Integer.hashCode(intValues[i]) & Integer.MAX_VALUE);
+          }
+          break;
+        case LONG:
+          long[] longValues = blockValSet.getLongValuesSV();
+          for (int i = 0; i < length; i++) {
+            bitmap.add(Long.hashCode(longValues[i]) & Integer.MAX_VALUE);
+          }
+          break;
+        case FLOAT:
+          float[] floatValues = blockValSet.getFloatValuesSV();
+          for (int i = 0; i < length; i++) {
+            bitmap.add(Float.hashCode(floatValues[i]) & Integer.MAX_VALUE);
+          }
+          break;
+        case DOUBLE:
+          double[] doubleValues = blockValSet.getDoubleValuesSV();
+          for (int i = 0; i < length; i++) {
+            bitmap.add(Double.hashCode(doubleValues[i]) & Integer.MAX_VALUE);
+          }
+          break;
+        case STRING:
+          String[] stringValues = blockValSet.getStringValuesSV();
+          for (int i = 0; i < length; i++) {
+            bitmap.add(stringValues[i].hashCode() & Integer.MAX_VALUE);
+          }
+          break;
+        default:
+          throw new IllegalStateException(
+              "Illegal data type for DISTINCT_COUNT_HLL aggregation function: " + valueType);
+      }
+    } else {
+      // Serialized Bitmap
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        MutableRoaringBitmap bitmap = aggregationResultHolder.getResult();
+        if (bitmap != null) {
+          for (int i = 0; i < length; i++) {
+            bitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]));
+          }
+        } else {
+          bitmap = ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[0]).toMutableRoaringBitmap();
+          aggregationResultHolder.setValue(bitmap);
+          for (int i = 1; i < length; i++) {
+            bitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]));
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging HyperLogLogs", e);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    DataType valueType = blockValSet.getValueType();
+
+    switch (valueType) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+        for (int i = 0; i < length; i++) {
+          getDefaultBitmap(groupByResultHolder, groupKeyArray[i])
+              .add(Integer.hashCode(intValues[i]) & Integer.MAX_VALUE);
+        }
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          getDefaultBitmap(groupByResultHolder, groupKeyArray[i]).add(Long.hashCode(longValues[i]) & Integer.MAX_VALUE);
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = blockValSet.getFloatValuesSV();
+        for (int i = 0; i < length; i++) {
+          getDefaultBitmap(groupByResultHolder, groupKeyArray[i])
+              .add(Float.hashCode(floatValues[i]) & Integer.MAX_VALUE);
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = blockValSet.getDoubleValuesSV();
+        for (int i = 0; i < length; i++) {
+          getDefaultBitmap(groupByResultHolder, groupKeyArray[i])
+              .add(Double.hashCode(doubleValues[i]) & Integer.MAX_VALUE);
+        }
+        break;
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+        for (int i = 0; i < length; i++) {
+          getDefaultBitmap(groupByResultHolder, groupKeyArray[i]).add(stringValues[i].hashCode() & Integer.MAX_VALUE);
+        }
+        break;
+      case BYTES:
+        // Serialized HyperLogLog
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        try {
+          for (int i = 0; i < length; i++) {
+            ImmutableRoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]);
+            int groupKey = groupKeyArray[i];
+            MutableRoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+            if (bitmap != null) {
+              bitmap.or(value);
+            } else {
+              groupByResultHolder.setValueForKey(groupKey, value.toMutableRoaringBitmap());
+            }
+          }
+        } catch (Exception e) {
+          throw new RuntimeException("Caught exception while merging Bitmaps", e);
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_BITMAP aggregation function: " + valueType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    DataType valueType = blockValSet.getValueType();
+
+    switch (valueType) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+        for (int i = 0; i < length; i++) {
+          int value = intValues[i];
+          for (int groupKey : groupKeysArray[i]) {
+            getDefaultBitmap(groupByResultHolder, groupKey).add(Integer.hashCode(value) & Integer.MAX_VALUE);
+          }
+        }
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          long value = longValues[i];
+          for (int groupKey : groupKeysArray[i]) {
+            getDefaultBitmap(groupByResultHolder, groupKey).add(Long.hashCode(value) & Integer.MAX_VALUE);
+          }
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = blockValSet.getFloatValuesSV();
+        for (int i = 0; i < length; i++) {
+          float value = floatValues[i];
+          for (int groupKey : groupKeysArray[i]) {
+            getDefaultBitmap(groupByResultHolder, groupKey).add(Float.hashCode(value) & Integer.MAX_VALUE);
+          }
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = blockValSet.getDoubleValuesSV();
+        for (int i = 0; i < length; i++) {
+          double value = doubleValues[i];
+          for (int groupKey : groupKeysArray[i]) {
+            getDefaultBitmap(groupByResultHolder, groupKey).add(Double.hashCode(value) & Integer.MAX_VALUE);
+          }
+        }
+        break;
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+        for (int i = 0; i < length; i++) {
+          String value = stringValues[i];
+          for (int groupKey : groupKeysArray[i]) {
+            getDefaultBitmap(groupByResultHolder, groupKey).add(value.hashCode() & Integer.MAX_VALUE);
+          }
+        }
+        break;
+      case BYTES:
+        // Serialized HyperLogLog
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        try {
+          for (int i = 0; i < length; i++) {
+            ImmutableRoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]);
+            for (int groupKey : groupKeysArray[i]) {
+              MutableRoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+              if (bitmap != null) {
+                bitmap.or(value);
+              } else {
+                // Create a new HyperLogLog for the group
+                groupByResultHolder
+                    .setValueForKey(groupKey, value.toMutableRoaringBitmap());
+              }
+            }
+          }
+        } catch (Exception e) {
+          throw new RuntimeException("Caught exception while merging HyperLogLogs", e);
+        }
+        break;
+      default:
+        throw new IllegalStateException("Illegal data type for DISTINCT_COUNT_HLL aggregation function: " + valueType);
+    }
+  }
+
+  @Override
+  public ImmutableRoaringBitmap extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    MutableRoaringBitmap hyperLogLog = aggregationResultHolder.getResult();
+    if (hyperLogLog == null) {
+      return new MutableRoaringBitmap();
+    } else {
+      return hyperLogLog;
+    }
+  }
+
+  @Override
+  public MutableRoaringBitmap extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    MutableRoaringBitmap hyperLogLog = groupByResultHolder.getResult(groupKey);
+    if (hyperLogLog == null) {
+      return new MutableRoaringBitmap();
+    } else {
+      return hyperLogLog;
+    }
+  }
+
+  @Override
+  public ImmutableRoaringBitmap merge(ImmutableRoaringBitmap intermediateResult1,
+      ImmutableRoaringBitmap intermediateResult2) {
+    try {
+      intermediateResult1.or(intermediateResult2);
+    } catch (Exception e) {
+      throw new RuntimeException("Caught exception while merging HyperLogLogs", e);
+    }
+    return intermediateResult1;
+  }
+
+  @Override
+  public boolean isIntermediateResultComparable() {
+    return false;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Long extractFinalResult(ImmutableRoaringBitmap intermediateResult) {
+    return Long.valueOf(intermediateResult.getCardinality());
+  }
+
+  /**
+   * Returns the HyperLogLog from the result holder or creates a new one with default log2m if it does not exist.
+   *
+   * @param aggregationResultHolder Result holder
+   * @return HyperLogLog from the result holder
+   */
+  protected MutableRoaringBitmap getDefaultBitmap(AggregationResultHolder aggregationResultHolder) {
+    MutableRoaringBitmap bitmap = aggregationResultHolder.getResult();
+    if (bitmap == null) {
+      bitmap = new MutableRoaringBitmap();
+      aggregationResultHolder.setValue(bitmap);
+    }
+    return bitmap;
+  }
+
+  /**
+   * Returns the HyperLogLog for the given group key if exists, or creates a new one with default log2m.
+   *
+   * @param groupByResultHolder Result holder
+   * @param groupKey Group key for which to return the HyperLogLog
+   * @return HyperLogLog for the group key
+   */
+  protected MutableRoaringBitmap getDefaultBitmap(GroupByResultHolder groupByResultHolder, int groupKey) {
+    MutableRoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+    if (bitmap == null) {
+      bitmap = new MutableRoaringBitmap();
+      groupByResultHolder.setValueForKey(groupKey, bitmap);
+    }
+    return bitmap;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org