You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/07/29 00:18:05 UTC

[incubator-pinot] branch distinct-count-bitmap created (now cbcc6a0)

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a change to branch distinct-count-bitmap
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at cbcc6a0  Adding distinct count support based on bitmap

This branch includes the following new commits:

     new cbcc6a0  Adding distinct count support based on bitmap

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding distinct count support based on bitmap

Posted by ki...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch distinct-count-bitmap
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit cbcc6a0d0641ebc7ea99d7bda488599b56f5c845
Author: kishoreg <g....@gmail.com>
AuthorDate: Tue Jul 28 17:16:40 2020 -0700

    Adding distinct count support based on bitmap
---
 .../common/function/AggregationFunctionType.java   |   1 +
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  62 +++-
 .../DistinctCountBitmapValueAggregator.java        |  95 ++++++
 .../function/AggregationFunctionFactory.java       |   2 +
 .../function/AggregationFunctionVisitorBase.java   |   3 +
 .../function/DistinctCountAggregationFunction.java |  71 +++++
 .../DistinctCountBitmapAggregationFunction.java    | 354 +++++++++++++++++++++
 7 files changed, 572 insertions(+), 16 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index ff3fb50..6c7ebe5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -28,6 +28,7 @@ public enum AggregationFunctionType {
   MINMAXRANGE("minMaxRange"),
   DISTINCTCOUNT("distinctCount"),
   DISTINCTCOUNTHLL("distinctCountHLL"),
+  DISTINCTCOUNTBITMAP("distinctCountBitmap"),
   DISTINCTCOUNTRAWHLL("distinctCountRawHLL"),
   FASTHLL("fastHLL"),
   DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch"),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index f471e37..5cbe20f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -40,6 +40,8 @@ import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
 import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
 import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
 import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 /**
@@ -63,7 +65,8 @@ public class ObjectSerDeUtils {
     IntSet(9),
     TDigest(10),
     DistinctTable(11),
-    DataSketch(12);
+    DataSketch(12),
+    Bitmap(13);
 
     private int _value;
 
@@ -102,6 +105,8 @@ public class ObjectSerDeUtils {
         return ObjectType.DistinctTable;
       } else if (value instanceof Sketch) {
         return ObjectType.DataSketch;
+      } else if (value instanceof MutableRoaringBitmap) {
+        return ObjectType.Bitmap;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
       }
@@ -286,6 +291,44 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<MutableRoaringBitmap> ROARING_BITMAP_SERDE = new ObjectSerDe<MutableRoaringBitmap>() {
+
+    @Override
+    public byte[] serialize(MutableRoaringBitmap bitmap) {
+      try {
+        byte[] bytes = new byte[bitmap.serializedSizeInBytes()];
+        bitmap.serialize(ByteBuffer.wrap(bytes));
+        return bytes;
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while serializing RoaringBitmap", e);
+      }
+    }
+
+    @Override
+    public MutableRoaringBitmap deserialize(byte[] bytes) {
+      try {
+        MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+        bitmap.deserialize(ByteBuffer.wrap(bytes));
+        return bitmap;
+      } catch (IOException e) {
+        throw new RuntimeException("Caught exception while de-serializing MutableRoaringBitmap", e);
+      }
+    }
+
+    @Override
+    public MutableRoaringBitmap deserialize(ByteBuffer byteBuffer) {
+      byte[] bytes = new byte[byteBuffer.remaining()];
+      byteBuffer.get(bytes);
+      try {
+        MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+        bitmap.deserialize(ByteBuffer.wrap(bytes));
+        return bitmap;
+      } catch (IOException e) {
+        throw new RuntimeException("Caught exception while de-serializing MutableRoaringBitmap", e);
+      }
+    }
+  };
+
   public static final ObjectSerDe<DistinctTable> DISTINCT_TABLE_SER_DE = new ObjectSerDe<DistinctTable>() {
 
     @Override
@@ -484,21 +527,8 @@ public class ObjectSerDeUtils {
 
   // NOTE: DO NOT change the order, it has to be the same order as the ObjectType
   //@formatter:off
-  private static final ObjectSerDe[] SER_DES = {
-      STRING_SER_DE,
-      LONG_SER_DE,
-      DOUBLE_SER_DE,
-      DOUBLE_ARRAY_LIST_SER_DE,
-      AVG_PAIR_SER_DE,
-      MIN_MAX_RANGE_PAIR_SER_DE,
-      HYPER_LOG_LOG_SER_DE,
-      QUANTILE_DIGEST_SER_DE,
-      MAP_SER_DE,
-      INT_SET_SER_DE,
-      TDIGEST_SER_DE,
-      DISTINCT_TABLE_SER_DE,
-      DATA_SKETCH_SER_DE
-  };
+  private static final ObjectSerDe[] SER_DES =
+      {STRING_SER_DE, LONG_SER_DE, DOUBLE_SER_DE, DOUBLE_ARRAY_LIST_SER_DE, AVG_PAIR_SER_DE, MIN_MAX_RANGE_PAIR_SER_DE, HYPER_LOG_LOG_SER_DE, QUANTILE_DIGEST_SER_DE, MAP_SER_DE, INT_SET_SER_DE, TDIGEST_SER_DE, DISTINCT_TABLE_SER_DE, DATA_SKETCH_SER_DE, ROARING_BITMAP_SERDE};
   //@formatter:on
 
   public static byte[] serialize(Object value) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java
new file mode 100644
index 0000000..5623a17
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.aggregator;
+
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import java.util.Objects;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class DistinctCountBitmapValueAggregator implements ValueAggregator<Object, MutableRoaringBitmap> {
+  public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+  private int _maxByteSize;
+
+  @Override
+  public AggregationFunctionType getAggregationType() {
+    return AggregationFunctionType.DISTINCTCOUNTBITMAP;
+  }
+
+  @Override
+  public DataType getAggregatedValueType() {
+    return AGGREGATED_VALUE_TYPE;
+  }
+
+  @Override
+  public MutableRoaringBitmap getInitialAggregatedValue(Object rawValue) {
+    MutableRoaringBitmap initialValue;
+    if (rawValue instanceof byte[]) {
+      byte[] bytes = (byte[]) rawValue;
+      initialValue = deserializeAggregatedValue(bytes);
+      _maxByteSize = Math.max(_maxByteSize, bytes.length);
+    } else {
+      initialValue = new MutableRoaringBitmap();
+      initialValue.add(Objects.hashCode(rawValue) & Integer.MAX_VALUE);
+      _maxByteSize = Math.max(_maxByteSize, initialValue.serializedSizeInBytes());
+    }
+    return initialValue;
+  }
+
+  @Override
+  public MutableRoaringBitmap applyRawValue(MutableRoaringBitmap value, Object rawValue) {
+    if (rawValue instanceof byte[]) {
+      value.or(deserializeAggregatedValue((byte[]) rawValue));
+    } else {
+      value.add(Objects.hashCode(rawValue) & Integer.MAX_VALUE);
+    }
+    _maxByteSize = Math.max(_maxByteSize, value.serializedSizeInBytes());
+    return value;
+  }
+
+  @Override
+  public MutableRoaringBitmap applyAggregatedValue(MutableRoaringBitmap value, MutableRoaringBitmap aggregatedValue) {
+    value.or(aggregatedValue);
+    _maxByteSize = Math.max(_maxByteSize, value.serializedSizeInBytes());
+    return value;
+  }
+
+  @Override
+  public MutableRoaringBitmap cloneAggregatedValue(MutableRoaringBitmap value) {
+    return deserializeAggregatedValue(serializeAggregatedValue(value));
+  }
+
+  @Override
+  public int getMaxAggregatedValueByteSize() {
+    return _maxByteSize;
+  }
+
+  @Override
+  public byte[] serializeAggregatedValue(MutableRoaringBitmap value) {
+    return ObjectSerDeUtils.ROARING_BITMAP_SERDE.serialize(value);
+  }
+
+  @Override
+  public MutableRoaringBitmap deserializeAggregatedValue(byte[] bytes) {
+    return ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytes);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 021c188..37cebaf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -123,6 +123,8 @@ public class AggregationFunctionFactory {
             return new DistinctCountAggregationFunction(firstArgument);
           case DISTINCTCOUNTHLL:
             return new DistinctCountHLLAggregationFunction(arguments);
+          case DISTINCTCOUNTBITMAP:
+            return new DistinctCountBitmapAggregationFunction(arguments);
           case DISTINCTCOUNTRAWHLL:
             return new DistinctCountRawHLLAggregationFunction(arguments);
           case FASTHLL:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
index 2b5b615..aadb583 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
@@ -98,5 +98,8 @@ public class AggregationFunctionVisitorBase {
 
   public void visit(DistinctCountThetaSketchAggregationFunction function) {
   }
+
+  public void visit(DistinctCountBitmapAggregationFunction distinctCountBitmapAggregationFunction) {
+  }
 }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
index 7e7ba4b..6077ba7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
@@ -18,18 +18,23 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Map;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<IntOpenHashSet, Integer> {
@@ -298,4 +303,70 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
     }
     return valueSet;
   }
+
+  public static void main(String[] args) {
+    int length = 30_000_000;
+
+    int cardinality = 10_000_000;
+
+    double[] metric = new double[length];
+    String[] strings = new String[length];
+
+    for (int i = 0; i < length; i++) {
+      metric[i] = (i * 1001 )% cardinality;
+      strings[i] = "asdasdasdadsad" + i % cardinality;
+    }
+    System.out.println(" ================= ");
+    //String TEST
+    long start = System.currentTimeMillis();
+//    IntOpenHashSet set = new IntOpenHashSet(3100000);
+    IntOpenHashSet stringSet = new IntOpenHashSet();
+//    HashSet<Integer> set = new HashSet<Integer>(3100000);
+    for (int i = 0; i < length; i++) {
+      stringSet.add(strings[i].hashCode());
+    }
+    System.out.println(System.currentTimeMillis() - start);
+    System.out.println(stringSet.size());
+    System.out.println(" ================= ");
+
+    //DOUBLE TEST
+    start = System.currentTimeMillis();
+//    IntOpenHashSet set = new IntOpenHashSet(3100000);
+    IntOpenHashSet doubleSet = new IntOpenHashSet(1000);
+//    HashSet<Integer> set = new HashSet<Integer>(3100000);
+    for (int i = 0; i < length; i++) {
+      doubleSet.add(Double.hashCode(metric[i]));
+    }
+    System.out.println(System.currentTimeMillis() - start);
+    System.out.println(doubleSet.size());
+    start = System.currentTimeMillis();
+    byte[] serialize = ObjectSerDeUtils.serialize(doubleSet);
+    System.out.println("took = " + (System.currentTimeMillis() - start));
+    System.out.println(serialize.length);
+    System.out.println(" ================= ");
+
+
+    //BITMAP TEST
+    start = System.currentTimeMillis();
+    MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+    for (int i = 0; i < length; i++) {
+      bitmap.add(Double.hashCode(metric[i]));
+    }
+    System.out.println(System.currentTimeMillis() - start);
+    System.out.println(bitmap.getCardinality());
+    System.out.println(bitmap.serializedSizeInBytes());
+    System.out.println(" ================= ");
+
+    //HLL TEST
+    start = System.currentTimeMillis();
+    HyperLogLog hll = new HyperLogLog(12);
+    for (int i = 0; i < length; i++) {
+      hll.offer(metric[i]);
+    }
+    System.out.println(System.currentTimeMillis() - start);
+    System.out.println(hll.cardinality());
+    System.out.println(hll.sizeof());
+    System.out.println(" ================= ");
+
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java
new file mode 100644
index 0000000..1a10f07
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class DistinctCountBitmapAggregationFunction extends BaseSingleInputAggregationFunction<ImmutableRoaringBitmap, Long> {
+
+  public DistinctCountBitmapAggregationFunction(List<ExpressionContext> arguments) {
+    super(arguments.get(0));
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTBITMAP;
+  }
+
+  @Override
+  public void accept(AggregationFunctionVisitorBase visitor) {
+    visitor.visit(this);
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    DataType valueType = blockValSet.getValueType();
+
+    if (valueType != DataType.BYTES) {
+      MutableRoaringBitmap bitmap = getDefaultBitmap(aggregationResultHolder);
+
+      switch (valueType) {
+        case INT:
+          int[] intValues = blockValSet.getIntValuesSV();
+          for (int i = 0; i < length; i++) {
+            bitmap.add(Integer.hashCode(intValues[i]) & Integer.MAX_VALUE);
+          }
+          break;
+        case LONG:
+          long[] longValues = blockValSet.getLongValuesSV();
+          for (int i = 0; i < length; i++) {
+            bitmap.add(Long.hashCode(longValues[i]) & Integer.MAX_VALUE);
+          }
+          break;
+        case FLOAT:
+          float[] floatValues = blockValSet.getFloatValuesSV();
+          for (int i = 0; i < length; i++) {
+            bitmap.add(Float.hashCode(floatValues[i]) & Integer.MAX_VALUE);
+          }
+          break;
+        case DOUBLE:
+          double[] doubleValues = blockValSet.getDoubleValuesSV();
+          for (int i = 0; i < length; i++) {
+            bitmap.add(Double.hashCode(doubleValues[i]) & Integer.MAX_VALUE);
+          }
+          break;
+        case STRING:
+          String[] stringValues = blockValSet.getStringValuesSV();
+          for (int i = 0; i < length; i++) {
+            bitmap.add(stringValues[i].hashCode() & Integer.MAX_VALUE);
+          }
+          break;
+        default:
+          throw new IllegalStateException(
+              "Illegal data type for DISTINCT_COUNT_HLL aggregation function: " + valueType);
+      }
+    } else {
+      // Serialized Bitmap
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        MutableRoaringBitmap bitmap = aggregationResultHolder.getResult();
+        if (bitmap != null) {
+          for (int i = 0; i < length; i++) {
+            bitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]));
+          }
+        } else {
+          bitmap = ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[0]).toMutableRoaringBitmap();
+          aggregationResultHolder.setValue(bitmap);
+          for (int i = 1; i < length; i++) {
+            bitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]));
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging HyperLogLogs", e);
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    DataType valueType = blockValSet.getValueType();
+
+    switch (valueType) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+        for (int i = 0; i < length; i++) {
+          getDefaultBitmap(groupByResultHolder, groupKeyArray[i])
+              .add(Integer.hashCode(intValues[i]) & Integer.MAX_VALUE);
+        }
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          getDefaultBitmap(groupByResultHolder, groupKeyArray[i]).add(Long.hashCode(longValues[i]) & Integer.MAX_VALUE);
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = blockValSet.getFloatValuesSV();
+        for (int i = 0; i < length; i++) {
+          getDefaultBitmap(groupByResultHolder, groupKeyArray[i])
+              .add(Float.hashCode(floatValues[i]) & Integer.MAX_VALUE);
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = blockValSet.getDoubleValuesSV();
+        for (int i = 0; i < length; i++) {
+          getDefaultBitmap(groupByResultHolder, groupKeyArray[i])
+              .add(Double.hashCode(doubleValues[i]) & Integer.MAX_VALUE);
+        }
+        break;
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+        for (int i = 0; i < length; i++) {
+          getDefaultBitmap(groupByResultHolder, groupKeyArray[i]).add(stringValues[i].hashCode() & Integer.MAX_VALUE);
+        }
+        break;
+      case BYTES:
+        // Serialized HyperLogLog
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        try {
+          for (int i = 0; i < length; i++) {
+            ImmutableRoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]);
+            int groupKey = groupKeyArray[i];
+            MutableRoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+            if (bitmap != null) {
+              bitmap.or(value);
+            } else {
+              groupByResultHolder.setValueForKey(groupKey, value.toMutableRoaringBitmap());
+            }
+          }
+        } catch (Exception e) {
+          throw new RuntimeException("Caught exception while merging Bitmaps", e);
+        }
+        break;
+      default:
+        throw new IllegalStateException(
+            "Illegal data type for DISTINCT_COUNT_BITMAP aggregation function: " + valueType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    DataType valueType = blockValSet.getValueType();
+
+    switch (valueType) {
+      case INT:
+        int[] intValues = blockValSet.getIntValuesSV();
+        for (int i = 0; i < length; i++) {
+          int value = intValues[i];
+          for (int groupKey : groupKeysArray[i]) {
+            getDefaultBitmap(groupByResultHolder, groupKey).add(Integer.hashCode(value) & Integer.MAX_VALUE);
+          }
+        }
+        break;
+      case LONG:
+        long[] longValues = blockValSet.getLongValuesSV();
+        for (int i = 0; i < length; i++) {
+          long value = longValues[i];
+          for (int groupKey : groupKeysArray[i]) {
+            getDefaultBitmap(groupByResultHolder, groupKey).add(Long.hashCode(value) & Integer.MAX_VALUE);
+          }
+        }
+        break;
+      case FLOAT:
+        float[] floatValues = blockValSet.getFloatValuesSV();
+        for (int i = 0; i < length; i++) {
+          float value = floatValues[i];
+          for (int groupKey : groupKeysArray[i]) {
+            getDefaultBitmap(groupByResultHolder, groupKey).add(Float.hashCode(value) & Integer.MAX_VALUE);
+          }
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValues = blockValSet.getDoubleValuesSV();
+        for (int i = 0; i < length; i++) {
+          double value = doubleValues[i];
+          for (int groupKey : groupKeysArray[i]) {
+            getDefaultBitmap(groupByResultHolder, groupKey).add(Double.hashCode(value) & Integer.MAX_VALUE);
+          }
+        }
+        break;
+      case STRING:
+        String[] stringValues = blockValSet.getStringValuesSV();
+        for (int i = 0; i < length; i++) {
+          String value = stringValues[i];
+          for (int groupKey : groupKeysArray[i]) {
+            getDefaultBitmap(groupByResultHolder, groupKey).add(value.hashCode() & Integer.MAX_VALUE);
+          }
+        }
+        break;
+      case BYTES:
+        // Serialized HyperLogLog
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        try {
+          for (int i = 0; i < length; i++) {
+            ImmutableRoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]);
+            for (int groupKey : groupKeysArray[i]) {
+              MutableRoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+              if (bitmap != null) {
+                bitmap.or(value);
+              } else {
+                // Create a new HyperLogLog for the group
+                groupByResultHolder
+                    .setValueForKey(groupKey, value.toMutableRoaringBitmap());
+              }
+            }
+          }
+        } catch (Exception e) {
+          throw new RuntimeException("Caught exception while merging HyperLogLogs", e);
+        }
+        break;
+      default:
+        throw new IllegalStateException("Illegal data type for DISTINCT_COUNT_HLL aggregation function: " + valueType);
+    }
+  }
+
+  @Override
+  public ImmutableRoaringBitmap extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    MutableRoaringBitmap hyperLogLog = aggregationResultHolder.getResult();
+    if (hyperLogLog == null) {
+      return new MutableRoaringBitmap();
+    } else {
+      return hyperLogLog;
+    }
+  }
+
+  @Override
+  public MutableRoaringBitmap extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    MutableRoaringBitmap hyperLogLog = groupByResultHolder.getResult(groupKey);
+    if (hyperLogLog == null) {
+      return new MutableRoaringBitmap();
+    } else {
+      return hyperLogLog;
+    }
+  }
+
+  @Override
+  public ImmutableRoaringBitmap merge(ImmutableRoaringBitmap intermediateResult1,
+      ImmutableRoaringBitmap intermediateResult2) {
+    try {
+      intermediateResult1.or(intermediateResult2);
+    } catch (Exception e) {
+      throw new RuntimeException("Caught exception while merging HyperLogLogs", e);
+    }
+    return intermediateResult1;
+  }
+
+  @Override
+  public boolean isIntermediateResultComparable() {
+    return false;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Long extractFinalResult(ImmutableRoaringBitmap intermediateResult) {
+    return Long.valueOf(intermediateResult.getCardinality());
+  }
+
+  /**
+   * Returns the HyperLogLog from the result holder or creates a new one with default log2m if it does not exist.
+   *
+   * @param aggregationResultHolder Result holder
+   * @return HyperLogLog from the result holder
+   */
+  protected MutableRoaringBitmap getDefaultBitmap(AggregationResultHolder aggregationResultHolder) {
+    MutableRoaringBitmap bitmap = aggregationResultHolder.getResult();
+    if (bitmap == null) {
+      bitmap = new MutableRoaringBitmap();
+      aggregationResultHolder.setValue(bitmap);
+    }
+    return bitmap;
+  }
+
+  /**
+   * Returns the HyperLogLog for the given group key if exists, or creates a new one with default log2m.
+   *
+   * @param groupByResultHolder Result holder
+   * @param groupKey Group key for which to return the HyperLogLog
+   * @return HyperLogLog for the group key
+   */
+  protected MutableRoaringBitmap getDefaultBitmap(GroupByResultHolder groupByResultHolder, int groupKey) {
+    MutableRoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+    if (bitmap == null) {
+      bitmap = new MutableRoaringBitmap();
+      groupByResultHolder.setValueForKey(groupKey, bitmap);
+    }
+    return bitmap;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org