You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/07/29 00:18:06 UTC
[incubator-pinot] 01/01: Adding distinct count support based on
bitmap
This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch distinct-count-bitmap
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit cbcc6a0d0641ebc7ea99d7bda488599b56f5c845
Author: kishoreg <g....@gmail.com>
AuthorDate: Tue Jul 28 17:16:40 2020 -0700
Adding distinct count support based on bitmap
---
.../common/function/AggregationFunctionType.java | 1 +
.../apache/pinot/core/common/ObjectSerDeUtils.java | 62 +++-
.../DistinctCountBitmapValueAggregator.java | 95 ++++++
.../function/AggregationFunctionFactory.java | 2 +
.../function/AggregationFunctionVisitorBase.java | 3 +
.../function/DistinctCountAggregationFunction.java | 71 +++++
.../DistinctCountBitmapAggregationFunction.java | 354 +++++++++++++++++++++
7 files changed, 572 insertions(+), 16 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index ff3fb50..6c7ebe5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -28,6 +28,7 @@ public enum AggregationFunctionType {
MINMAXRANGE("minMaxRange"),
DISTINCTCOUNT("distinctCount"),
DISTINCTCOUNTHLL("distinctCountHLL"),
+ DISTINCTCOUNTBITMAP("distinctCountBitmap"),
DISTINCTCOUNTRAWHLL("distinctCountRawHLL"),
FASTHLL("fastHLL"),
DISTINCTCOUNTTHETASKETCH("distinctCountThetaSketch"),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index f471e37..5cbe20f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -40,6 +40,8 @@ import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
/**
@@ -63,7 +65,8 @@ public class ObjectSerDeUtils {
IntSet(9),
TDigest(10),
DistinctTable(11),
- DataSketch(12);
+ DataSketch(12),
+ Bitmap(13);
private int _value;
@@ -102,6 +105,8 @@ public class ObjectSerDeUtils {
return ObjectType.DistinctTable;
} else if (value instanceof Sketch) {
return ObjectType.DataSketch;
+ } else if (value instanceof MutableRoaringBitmap) {
+ return ObjectType.Bitmap;
} else {
throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
}
@@ -286,6 +291,44 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<MutableRoaringBitmap> ROARING_BITMAP_SERDE = new ObjectSerDe<MutableRoaringBitmap>() {
+
+ @Override
+ public byte[] serialize(MutableRoaringBitmap bitmap) {
+ try {
+ byte[] bytes = new byte[bitmap.serializedSizeInBytes()];
+ bitmap.serialize(ByteBuffer.wrap(bytes));
+ return bytes;
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while serializing RoaringBitmap", e);
+ }
+ }
+
+ @Override
+ public MutableRoaringBitmap deserialize(byte[] bytes) {
+ try {
+ MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+ bitmap.deserialize(ByteBuffer.wrap(bytes));
+ return bitmap;
+ } catch (IOException e) {
+ throw new RuntimeException("Caught exception while de-serializing MutableRoaringBitmap", e);
+ }
+ }
+
+ @Override
+ public MutableRoaringBitmap deserialize(ByteBuffer byteBuffer) {
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ try {
+ MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+ bitmap.deserialize(ByteBuffer.wrap(bytes));
+ return bitmap;
+ } catch (IOException e) {
+ throw new RuntimeException("Caught exception while de-serializing MutableRoaringBitmap", e);
+ }
+ }
+ };
+
public static final ObjectSerDe<DistinctTable> DISTINCT_TABLE_SER_DE = new ObjectSerDe<DistinctTable>() {
@Override
@@ -484,21 +527,8 @@ public class ObjectSerDeUtils {
// NOTE: DO NOT change the order, it has to be the same order as the ObjectType
//@formatter:off
- private static final ObjectSerDe[] SER_DES = {
- STRING_SER_DE,
- LONG_SER_DE,
- DOUBLE_SER_DE,
- DOUBLE_ARRAY_LIST_SER_DE,
- AVG_PAIR_SER_DE,
- MIN_MAX_RANGE_PAIR_SER_DE,
- HYPER_LOG_LOG_SER_DE,
- QUANTILE_DIGEST_SER_DE,
- MAP_SER_DE,
- INT_SET_SER_DE,
- TDIGEST_SER_DE,
- DISTINCT_TABLE_SER_DE,
- DATA_SKETCH_SER_DE
- };
+ private static final ObjectSerDe[] SER_DES =
+ {STRING_SER_DE, LONG_SER_DE, DOUBLE_SER_DE, DOUBLE_ARRAY_LIST_SER_DE, AVG_PAIR_SER_DE, MIN_MAX_RANGE_PAIR_SER_DE, HYPER_LOG_LOG_SER_DE, QUANTILE_DIGEST_SER_DE, MAP_SER_DE, INT_SET_SER_DE, TDIGEST_SER_DE, DISTINCT_TABLE_SER_DE, DATA_SKETCH_SER_DE, ROARING_BITMAP_SERDE};
//@formatter:on
public static byte[] serialize(Object value) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java
new file mode 100644
index 0000000..5623a17
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountBitmapValueAggregator.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.aggregator;
+
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import java.util.Objects;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class DistinctCountBitmapValueAggregator implements ValueAggregator<Object, MutableRoaringBitmap> {
+ public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+ private int _maxByteSize;
+
+ @Override
+ public AggregationFunctionType getAggregationType() {
+ return AggregationFunctionType.DISTINCTCOUNTBITMAP;
+ }
+
+ @Override
+ public DataType getAggregatedValueType() {
+ return AGGREGATED_VALUE_TYPE;
+ }
+
+ @Override
+ public MutableRoaringBitmap getInitialAggregatedValue(Object rawValue) {
+ MutableRoaringBitmap initialValue;
+ if (rawValue instanceof byte[]) {
+ byte[] bytes = (byte[]) rawValue;
+ initialValue = deserializeAggregatedValue(bytes);
+ _maxByteSize = Math.max(_maxByteSize, bytes.length);
+ } else {
+ initialValue = new MutableRoaringBitmap();
+ initialValue.add(Objects.hashCode(rawValue) & Integer.MAX_VALUE);
+ _maxByteSize = Math.max(_maxByteSize, initialValue.serializedSizeInBytes());
+ }
+ return initialValue;
+ }
+
+ @Override
+ public MutableRoaringBitmap applyRawValue(MutableRoaringBitmap value, Object rawValue) {
+ if (rawValue instanceof byte[]) {
+ value.or(deserializeAggregatedValue((byte[]) rawValue));
+ } else {
+ value.add(Objects.hashCode(rawValue) & Integer.MAX_VALUE);
+ }
+ _maxByteSize = Math.max(_maxByteSize, value.serializedSizeInBytes());
+ return value;
+ }
+
+ @Override
+ public MutableRoaringBitmap applyAggregatedValue(MutableRoaringBitmap value, MutableRoaringBitmap aggregatedValue) {
+ value.or(aggregatedValue);
+ _maxByteSize = Math.max(_maxByteSize, value.serializedSizeInBytes());
+ return value;
+ }
+
+ @Override
+ public MutableRoaringBitmap cloneAggregatedValue(MutableRoaringBitmap value) {
+ return deserializeAggregatedValue(serializeAggregatedValue(value));
+ }
+
+ @Override
+ public int getMaxAggregatedValueByteSize() {
+ return _maxByteSize;
+ }
+
+ @Override
+ public byte[] serializeAggregatedValue(MutableRoaringBitmap value) {
+ return ObjectSerDeUtils.ROARING_BITMAP_SERDE.serialize(value);
+ }
+
+ @Override
+ public MutableRoaringBitmap deserializeAggregatedValue(byte[] bytes) {
+ return ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytes);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 021c188..37cebaf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -123,6 +123,8 @@ public class AggregationFunctionFactory {
return new DistinctCountAggregationFunction(firstArgument);
case DISTINCTCOUNTHLL:
return new DistinctCountHLLAggregationFunction(arguments);
+ case DISTINCTCOUNTBITMAP:
+ return new DistinctCountBitmapAggregationFunction(arguments);
case DISTINCTCOUNTRAWHLL:
return new DistinctCountRawHLLAggregationFunction(arguments);
case FASTHLL:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
index 2b5b615..aadb583 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
@@ -98,5 +98,8 @@ public class AggregationFunctionVisitorBase {
public void visit(DistinctCountThetaSketchAggregationFunction function) {
}
+
+ public void visit(DistinctCountBitmapAggregationFunction distinctCountBitmapAggregationFunction) {
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
index 7e7ba4b..6077ba7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
@@ -18,18 +18,23 @@
*/
package org.apache.pinot.core.query.aggregation.function;
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.Map;
import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.core.query.request.context.ExpressionContext;
import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<IntOpenHashSet, Integer> {
@@ -298,4 +303,70 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
}
return valueSet;
}
+
+ public static void main(String[] args) {
+ int length = 30_000_000;
+
+ int cardinality = 10_000_000;
+
+ double[] metric = new double[length];
+ String[] strings = new String[length];
+
+ for (int i = 0; i < length; i++) {
+ metric[i] = (i * 1001 )% cardinality;
+ strings[i] = "asdasdasdadsad" + i % cardinality;
+ }
+ System.out.println(" ================= ");
+ //String TEST
+ long start = System.currentTimeMillis();
+// IntOpenHashSet set = new IntOpenHashSet(3100000);
+ IntOpenHashSet stringSet = new IntOpenHashSet();
+// HashSet<Integer> set = new HashSet<Integer>(3100000);
+ for (int i = 0; i < length; i++) {
+ stringSet.add(strings[i].hashCode());
+ }
+ System.out.println(System.currentTimeMillis() - start);
+ System.out.println(stringSet.size());
+ System.out.println(" ================= ");
+
+ //DOUBLE TEST
+ start = System.currentTimeMillis();
+// IntOpenHashSet set = new IntOpenHashSet(3100000);
+ IntOpenHashSet doubleSet = new IntOpenHashSet(1000);
+// HashSet<Integer> set = new HashSet<Integer>(3100000);
+ for (int i = 0; i < length; i++) {
+ doubleSet.add(Double.hashCode(metric[i]));
+ }
+ System.out.println(System.currentTimeMillis() - start);
+ System.out.println(doubleSet.size());
+ start = System.currentTimeMillis();
+ byte[] serialize = ObjectSerDeUtils.serialize(doubleSet);
+ System.out.println("took = " + (System.currentTimeMillis() - start));
+ System.out.println(serialize.length);
+ System.out.println(" ================= ");
+
+
+ //BITMAP TEST
+ start = System.currentTimeMillis();
+ MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+ for (int i = 0; i < length; i++) {
+ bitmap.add(Double.hashCode(metric[i]));
+ }
+ System.out.println(System.currentTimeMillis() - start);
+ System.out.println(bitmap.getCardinality());
+ System.out.println(bitmap.serializedSizeInBytes());
+ System.out.println(" ================= ");
+
+ //HLL TEST
+ start = System.currentTimeMillis();
+ HyperLogLog hll = new HyperLogLog(12);
+ for (int i = 0; i < length; i++) {
+ hll.offer(metric[i]);
+ }
+ System.out.println(System.currentTimeMillis() - start);
+ System.out.println(hll.cardinality());
+ System.out.println(hll.sizeof());
+ System.out.println(" ================= ");
+
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java
new file mode 100644
index 0000000..1a10f07
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountBitmapAggregationFunction.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class DistinctCountBitmapAggregationFunction extends BaseSingleInputAggregationFunction<ImmutableRoaringBitmap, Long> {
+
+ public DistinctCountBitmapAggregationFunction(List<ExpressionContext> arguments) {
+ super(arguments.get(0));
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.DISTINCTCOUNTBITMAP;
+ }
+
+ @Override
+ public void accept(AggregationFunctionVisitorBase visitor) {
+ visitor.visit(this);
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ DataType valueType = blockValSet.getValueType();
+
+ if (valueType != DataType.BYTES) {
+ MutableRoaringBitmap bitmap = getDefaultBitmap(aggregationResultHolder);
+
+ switch (valueType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ bitmap.add(Integer.hashCode(intValues[i]) & Integer.MAX_VALUE);
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ bitmap.add(Long.hashCode(longValues[i]) & Integer.MAX_VALUE);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ bitmap.add(Float.hashCode(floatValues[i]) & Integer.MAX_VALUE);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ bitmap.add(Double.hashCode(doubleValues[i]) & Integer.MAX_VALUE);
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ bitmap.add(stringValues[i].hashCode() & Integer.MAX_VALUE);
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_HLL aggregation function: " + valueType);
+ }
+ } else {
+ // Serialized Bitmap
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ try {
+ MutableRoaringBitmap bitmap = aggregationResultHolder.getResult();
+ if (bitmap != null) {
+ for (int i = 0; i < length; i++) {
+ bitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]));
+ }
+ } else {
+ bitmap = ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[0]).toMutableRoaringBitmap();
+ aggregationResultHolder.setValue(bitmap);
+ for (int i = 1; i < length; i++) {
+ bitmap.or(ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]));
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging HyperLogLogs", e);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ DataType valueType = blockValSet.getValueType();
+
+ switch (valueType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ getDefaultBitmap(groupByResultHolder, groupKeyArray[i])
+ .add(Integer.hashCode(intValues[i]) & Integer.MAX_VALUE);
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ getDefaultBitmap(groupByResultHolder, groupKeyArray[i]).add(Long.hashCode(longValues[i]) & Integer.MAX_VALUE);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ getDefaultBitmap(groupByResultHolder, groupKeyArray[i])
+ .add(Float.hashCode(floatValues[i]) & Integer.MAX_VALUE);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ getDefaultBitmap(groupByResultHolder, groupKeyArray[i])
+ .add(Double.hashCode(doubleValues[i]) & Integer.MAX_VALUE);
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ getDefaultBitmap(groupByResultHolder, groupKeyArray[i]).add(stringValues[i].hashCode() & Integer.MAX_VALUE);
+ }
+ break;
+ case BYTES:
+ // Serialized HyperLogLog
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ try {
+ for (int i = 0; i < length; i++) {
+ ImmutableRoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]);
+ int groupKey = groupKeyArray[i];
+ MutableRoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+ if (bitmap != null) {
+ bitmap.or(value);
+ } else {
+ groupByResultHolder.setValueForKey(groupKey, value.toMutableRoaringBitmap());
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging Bitmaps", e);
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal data type for DISTINCT_COUNT_BITMAP aggregation function: " + valueType);
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ DataType valueType = blockValSet.getValueType();
+
+ switch (valueType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ int value = intValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ getDefaultBitmap(groupByResultHolder, groupKey).add(Integer.hashCode(value) & Integer.MAX_VALUE);
+ }
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ long value = longValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ getDefaultBitmap(groupByResultHolder, groupKey).add(Long.hashCode(value) & Integer.MAX_VALUE);
+ }
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ float value = floatValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ getDefaultBitmap(groupByResultHolder, groupKey).add(Float.hashCode(value) & Integer.MAX_VALUE);
+ }
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ double value = doubleValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ getDefaultBitmap(groupByResultHolder, groupKey).add(Double.hashCode(value) & Integer.MAX_VALUE);
+ }
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ String value = stringValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ getDefaultBitmap(groupByResultHolder, groupKey).add(value.hashCode() & Integer.MAX_VALUE);
+ }
+ }
+ break;
+ case BYTES:
+ // Serialized HyperLogLog
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ try {
+ for (int i = 0; i < length; i++) {
+ ImmutableRoaringBitmap value = ObjectSerDeUtils.ROARING_BITMAP_SERDE.deserialize(bytesValues[i]);
+ for (int groupKey : groupKeysArray[i]) {
+ MutableRoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+ if (bitmap != null) {
+ bitmap.or(value);
+ } else {
+ // Create a new HyperLogLog for the group
+ groupByResultHolder
+ .setValueForKey(groupKey, value.toMutableRoaringBitmap());
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging HyperLogLogs", e);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Illegal data type for DISTINCT_COUNT_HLL aggregation function: " + valueType);
+ }
+ }
+
+ @Override
+ public ImmutableRoaringBitmap extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+ MutableRoaringBitmap hyperLogLog = aggregationResultHolder.getResult();
+ if (hyperLogLog == null) {
+ return new MutableRoaringBitmap();
+ } else {
+ return hyperLogLog;
+ }
+ }
+
+ @Override
+ public MutableRoaringBitmap extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+ MutableRoaringBitmap hyperLogLog = groupByResultHolder.getResult(groupKey);
+ if (hyperLogLog == null) {
+ return new MutableRoaringBitmap();
+ } else {
+ return hyperLogLog;
+ }
+ }
+
+ @Override
+ public ImmutableRoaringBitmap merge(ImmutableRoaringBitmap intermediateResult1,
+ ImmutableRoaringBitmap intermediateResult2) {
+ try {
+ intermediateResult1.or(intermediateResult2);
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging HyperLogLogs", e);
+ }
+ return intermediateResult1;
+ }
+
+ @Override
+ public boolean isIntermediateResultComparable() {
+ return false;
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.LONG;
+ }
+
+ @Override
+ public Long extractFinalResult(ImmutableRoaringBitmap intermediateResult) {
+ return Long.valueOf(intermediateResult.getCardinality());
+ }
+
+ /**
+ * Returns the HyperLogLog from the result holder or creates a new one with default log2m if it does not exist.
+ *
+ * @param aggregationResultHolder Result holder
+ * @return HyperLogLog from the result holder
+ */
+ protected MutableRoaringBitmap getDefaultBitmap(AggregationResultHolder aggregationResultHolder) {
+ MutableRoaringBitmap bitmap = aggregationResultHolder.getResult();
+ if (bitmap == null) {
+ bitmap = new MutableRoaringBitmap();
+ aggregationResultHolder.setValue(bitmap);
+ }
+ return bitmap;
+ }
+
+ /**
+ * Returns the HyperLogLog for the given group key if exists, or creates a new one with default log2m.
+ *
+ * @param groupByResultHolder Result holder
+ * @param groupKey Group key for which to return the HyperLogLog
+ * @return HyperLogLog for the group key
+ */
+ protected MutableRoaringBitmap getDefaultBitmap(GroupByResultHolder groupByResultHolder, int groupKey) {
+ MutableRoaringBitmap bitmap = groupByResultHolder.getResult(groupKey);
+ if (bitmap == null) {
+ bitmap = new MutableRoaringBitmap();
+ groupByResultHolder.setValueForKey(groupKey, bitmap);
+ }
+ return bitmap;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org