You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ma...@apache.org on 2023/05/25 18:57:06 UTC
[pinot] branch master updated: Integer Tuple Sketch support (#10427)
This is an automated email from the ASF dual-hosted git repository.
mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ded7e8f5ed Integer Tuple Sketch support (#10427)
ded7e8f5ed is described below
commit ded7e8f5ed63dbf41fb1cdff2e6bc86672e496aa
Author: Andi Miller <an...@andimiller.net>
AuthorDate: Thu May 25 19:56:58 2023 +0100
Integer Tuple Sketch support (#10427)
* Add support for Datasketches Integer Tuple Sketches
This adds support for `BYTES` columns containing Tuple Sketches with Integer as the summary type.
The added classes currently support `Sum` as the semigroup, but are generic so others can be added.
Feature breakdown:
1. Add transform functions that can be used to create Integer Tuple Sketches during ingestion, eg. `toIntegerSumTupleSketch(colA, colbB, 16)`
2. Add Codecs that use the Datasketches serialization
3. Add aggregation functions:
* `DISTINCT_COUNT_TUPLE_SKETCH` will just get the estimate for the number of unique keys, same as Theta or HLL
* `DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and return the raw sketch
* `SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the sum of the value side
* `AVG_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the average of the value side
4. Add `ValueAggregator<_, _>`s for use in `StarTree` indexes for all 4 above aggregations
5. Add `ValueAggregator`s for use in rollups for all 4 above aggregations
* fix style
* add test for sketch agg
* fix mangled license headers
* annotate types for old versions of java
* Cache Tuple Union result so it's not recomputed
* Improve null handling in Tuple aggregation functions
* Cleanup in IntegerTupleSketchAggregationFunction's parameters
* Make Theta and Tuple transform functions throw on unexpected key types
* Clean up sum/avg implementations for Tuple Sketch values
* Fix on Java 8
* Expand todo for tuple sketch aggregation function
* add preconditions to tuple aggregation function
* empty commit to re-trigger CI
* empty commit to re-trigger CI again
* fix merge
* empty commit to re-trigger CI again
* fix merge
* fix merge again
---
.../apache/pinot/core/common/ObjectSerDeUtils.java | 30 ++-
.../core/function/scalar/SketchFunctions.java | 81 ++++++--
.../function/AggregationFunctionFactory.java | 10 +
...ValueIntegerTupleSketchAggregationFunction.java | 70 +++++++
...CountIntegerTupleSketchAggregationFunction.java | 54 +++++
.../IntegerTupleSketchAggregationFunction.java | 222 +++++++++++++++++++++
...aluesIntegerTupleSketchAggregationFunction.java | 64 ++++++
.../aggregator/IntegerTupleSketchAggregator.java | 42 ++++
.../aggregator/ValueAggregatorFactory.java | 6 +
.../core/function/scalar/SketchFunctionsTest.java | 19 ++
...ctCountIntegerSumTupleSketchStarTreeV2Test.java | 57 ++++++
.../IntegerTupleSketchValueAggregator.java | 99 +++++++++
.../local/aggregator/ValueAggregatorFactory.java | 11 +
.../segment/local/utils/CustomSerDeUtils.java | 24 +++
.../IntegerTupleSketchValueAggregatorTest.java | 70 +++++++
.../pinot/segment/spi/AggregationFunctionType.java | 9 +
.../apache/pinot/spi/utils/CommonConstants.java | 3 +
17 files changed, 856 insertions(+), 15 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 14af9bac95..d93ea17ee9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -60,6 +60,8 @@ import java.util.Set;
import org.apache.datasketches.kll.KllDoublesSketch;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummaryDeserializer;
import org.apache.pinot.common.CustomObject;
import org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxObject;
import org.apache.pinot.core.query.distinct.DistinctTable;
@@ -131,7 +133,8 @@ public class ObjectSerDeUtils {
VarianceTuple(33),
PinotFourthMoment(34),
ArgMinMaxObject(35),
- KllDataSketch(36);
+ KllDataSketch(36),
+ IntegerTupleSketch(37);
private final int _value;
@@ -219,6 +222,8 @@ public class ObjectSerDeUtils {
return ObjectType.VarianceTuple;
} else if (value instanceof PinotFourthMoment) {
return ObjectType.PinotFourthMoment;
+ } else if (value instanceof org.apache.datasketches.tuple.Sketch) {
+ return ObjectType.IntegerTupleSketch;
} else if (value instanceof ArgMinMaxObject) {
return ObjectType.ArgMinMaxObject;
} else {
@@ -926,6 +931,28 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>> DATA_SKETCH_INT_TUPLE_SER_DE =
+ new ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>>() {
+ @Override
+ public byte[] serialize(org.apache.datasketches.tuple.Sketch<IntegerSummary> value) {
+ return value.compact().toByteArray();
+ }
+
+ @Override
+ public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(byte[] bytes) {
+ return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+ new IntegerSummaryDeserializer());
+ }
+
+ @Override
+ public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(ByteBuffer byteBuffer) {
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+ new IntegerSummaryDeserializer());
+ }
+ };
+
public static final ObjectSerDe<KllDoublesSketch> KLL_SKETCH_SER_DE = new ObjectSerDe<KllDoublesSketch>() {
@Override
@@ -1298,6 +1325,7 @@ public class ObjectSerDeUtils {
PINOT_FOURTH_MOMENT_OBJECT_SER_DE,
ARG_MIN_MAX_OBJECT_SER_DE,
KLL_SKETCH_SER_DE,
+ DATA_SKETCH_INT_TUPLE_SER_DE,
};
//@formatter:on
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
index 0c55880526..f6245bec6f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
@@ -23,6 +23,8 @@ import java.math.BigDecimal;
import javax.annotation.Nullable;
import org.apache.datasketches.theta.Sketches;
import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.spi.annotations.ScalarFunction;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -87,20 +89,24 @@ public class SketchFunctions {
@ScalarFunction(nullableParameters = true)
public static byte[] toThetaSketch(@Nullable Object input, int nominalEntries) {
UpdateSketch sketch = Sketches.updateSketchBuilder().setNominalEntries(nominalEntries).build();
- if (input instanceof Integer) {
- sketch.update((Integer) input);
- } else if (input instanceof Long) {
- sketch.update((Long) input);
- } else if (input instanceof Float) {
- sketch.update((Float) input);
- } else if (input instanceof Double) {
- sketch.update((Double) input);
- } else if (input instanceof BigDecimal) {
- sketch.update(((BigDecimal) input).toString());
- } else if (input instanceof String) {
- sketch.update((String) input);
- } else if (input instanceof byte[]) {
- sketch.update((byte[]) input);
+ if (input != null) {
+ if (input instanceof Integer) {
+ sketch.update((Integer) input);
+ } else if (input instanceof Long) {
+ sketch.update((Long) input);
+ } else if (input instanceof Float) {
+ sketch.update((Float) input);
+ } else if (input instanceof Double) {
+ sketch.update((Double) input);
+ } else if (input instanceof BigDecimal) {
+ sketch.update(((BigDecimal) input).toString());
+ } else if (input instanceof String) {
+ sketch.update((String) input);
+ } else if (input instanceof byte[]) {
+ sketch.update((byte[]) input);
+ } else {
+ throw new IllegalArgumentException("Unrecognised input type for Theta sketch: " + input.getClass().getName());
+ }
}
return ObjectSerDeUtils.DATA_SKETCH_SER_DE.serialize(sketch.compact());
}
@@ -131,4 +137,51 @@ public class SketchFunctions {
}
return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hll);
}
+
+ /**
+ * Create a Tuple Sketch containing the key and value supplied
+ *
+ * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+ * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+ * empty sketch
+ * @return serialized tuple sketch as bytes
+ */
+ @ScalarFunction(nullableParameters = true)
+ public static byte[] toIntegerSumTupleSketch(@Nullable Object key, @Nullable Integer value) {
+ return toIntegerSumTupleSketch(key, value, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+ }
+
+ /**
+ * Create a Tuple Sketch containing the key and value supplied
+ *
+ * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+ * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+ * empty sketch
+ * @param lgK integer representing the log of the maximum number of retained entries in the sketch, between 4 and 26
+ * @return serialized tuple sketch as bytes
+ */
+ @ScalarFunction(nullableParameters = true)
+ public static byte[] toIntegerSumTupleSketch(@Nullable Object key, Integer value, int lgK) {
+ IntegerSketch is = new IntegerSketch(lgK, IntegerSummary.Mode.Sum);
+ if (value != null && key != null) {
+ if (key instanceof Integer) {
+ is.update((Integer) key, value);
+ } else if (key instanceof Long) {
+ is.update((Long) key, value);
+ } else if (key instanceof Float) {
+ is.update((float) key, value);
+ } else if (key instanceof Double) {
+ is.update((double) key, value);
+ } else if (key instanceof BigDecimal) {
+ is.update(((BigDecimal) key).toString(), value);
+ } else if (key instanceof String) {
+ is.update((String) key, value);
+ } else if (key instanceof byte[]) {
+ is.update((byte[]) key, value);
+ } else {
+ throw new IllegalArgumentException("Unrecognised key type for Theta sketch: " + key.getClass().getName());
+ }
+ }
+ return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact());
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index f61375dc06..06fbb1db66 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.aggregation.function;
import com.google.common.base.Preconditions;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -336,6 +337,15 @@ public class AggregationFunctionFactory {
return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
case FOURTHMOMENT:
return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT);
+ case DISTINCTCOUNTTUPLESKETCH:
+ // mode actually doesn't matter here because we only care about keys, not values
+ return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+ case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+ return new IntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+ case SUMVALUESINTEGERSUMTUPLESKETCH:
+ return new SumValuesIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+ case AVGVALUEINTEGERSUMTUPLESKETCH:
+ return new AvgValueIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
case PARENTARGMAX:
return new ParentArgMinMaxAggregationFunction(arguments, true);
case PARENTARGMIN:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
new file mode 100644
index 0000000000..7ef6633619
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class AvgValueIntegerTupleSketchAggregationFunction
+ extends IntegerTupleSketchAggregationFunction {
+
+ public AvgValueIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+ super(arguments, mode);
+ }
+
+ // TODO if extra aggregation modes are supported, make this switch
+ // ie, if a Mode argument other than SUM is passed in, switch to the matching AggregationFunctionType
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.LONG;
+ }
+
+ @Override
+ public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+ if (integerSummarySketches == null) {
+ return null;
+ }
+ Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+ integerSummarySketches.forEach(union::union);
+ double retainedTotal = 0L;
+ CompactSketch<IntegerSummary> result = union.getResult();
+ SketchIterator<IntegerSummary> summaries = result.iterator();
+ while (summaries.next()) {
+ retainedTotal += summaries.getSummary().getValue();
+ }
+ if (result.getRetainedEntries() == 0) {
+ // there is nothing to average, return null
+ return null;
+ }
+ double estimate = retainedTotal / result.getRetainedEntries();
+ return Math.round(estimate);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java
new file mode 100644
index 0000000000..087337472d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class DistinctCountIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
+
+ public DistinctCountIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments,
+ IntegerSummary.Mode mode) {
+ super(arguments, mode);
+ }
+
+ // TODO if extra aggregation modes are supported, make this switch
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.LONG;
+ }
+
+ @Override
+ public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+ Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+ integerSummarySketches.forEach(union::union);
+ return Double.valueOf(union.getResult().getEstimate()).longValue();
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
new file mode 100644
index 0000000000..fde88dc808
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+ extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+ final ExpressionContext _expressionContext;
+ final IntegerSummarySetOperations _setOps;
+ final int _entries;
+
+ public IntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+ super(arguments.get(0));
+
+ Preconditions.checkArgument(arguments.size() <= 2,
+ "Tuple Sketch Aggregation Function expects at most 2 arguments, got: %s", arguments.size());
+ _expressionContext = arguments.get(0);
+ _setOps = new IntegerSummarySetOperations(mode, mode);
+ if (arguments.size() == 2) {
+ FieldSpec.DataType dataType = arguments.get(1).getLiteral().getType();
+ Preconditions.checkArgument(dataType == FieldSpec.DataType.LONG || dataType == FieldSpec.DataType.INT,
+ "Tuple Sketch Aggregation Function expected the second argument to be a number of entries to keep, but it "
+ + "was of type %s",
+ dataType.toString());
+ _entries = ((Long) arguments.get(1).getLiteral().getValue()).intValue();
+ } else {
+ _entries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+ }
+ }
+
+ // TODO if extra aggregation modes are supported, make this switch
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // Treat BYTES value as serialized Integer Tuple Sketch
+ FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+ if (storedType == FieldSpec.DataType.BYTES) {
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ try {
+ List<CompactSketch<IntegerSummary>> integerSketch = aggregationResultHolder.getResult();
+ if (integerSketch != null) {
+ List<CompactSketch<IntegerSummary>> sketches =
+ Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+ .map(Sketch::compact).collect(Collectors.toList());
+ aggregationResultHolder.setValue(merge(aggregationResultHolder.getResult(), sketches));
+ } else {
+ List<CompactSketch<IntegerSummary>> sketches =
+ Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+ .map(Sketch::compact).collect(Collectors.toList());
+ aggregationResultHolder.setValue(sketches);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
+ }
+ } else {
+ throw new IllegalStateException("Illegal data type for " + getType() + " aggregation function: " + storedType);
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+ // Treat BYTES value as serialized Integer Tuple Sketch
+ FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+
+ if (storedType == FieldSpec.DataType.BYTES) {
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ try {
+ for (int i = 0; i < length; i++) {
+ byte[] value = bytesValues[i];
+ int groupKey = groupKeyArray[i];
+ CompactSketch<IntegerSummary> newSketch =
+ ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
+ if (groupByResultHolder.getResult(groupKey) == null) {
+ ArrayList<CompactSketch<IntegerSummary>> newList = new ArrayList<>();
+ newList.add(newSketch);
+ groupByResultHolder.setValueForKey(groupKey, newList);
+ } else {
+ groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
+ }
+ } else {
+ throw new IllegalStateException(
+ "Illegal data type for INTEGER_TUPLE_SKETCH_UNION aggregation function: " + storedType);
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ byte[][] valueArray = blockValSetMap.get(_expression).getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ byte[] value = valueArray[i];
+ CompactSketch<IntegerSummary> newSketch =
+ ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
+ for (int groupKey : groupKeysArray[i]) {
+ if (groupByResultHolder.getResult(groupKey) == null) {
+ groupByResultHolder.setValueForKey(groupKey, Collections.singletonList(newSketch));
+ } else {
+ groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
+ }
+ }
+ }
+ }
+
+ @Override
+ public List<CompactSketch<IntegerSummary>> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+ return aggregationResultHolder.getResult();
+ }
+
+ @Override
+ public List<CompactSketch<IntegerSummary>> extractGroupByResult(GroupByResultHolder groupByResultHolder,
+ int groupKey) {
+ return groupByResultHolder.getResult(groupKey);
+ }
+
+ @Override
+ public List<CompactSketch<IntegerSummary>> merge(List<CompactSketch<IntegerSummary>> intermediateResult1,
+ List<CompactSketch<IntegerSummary>> intermediateResult2) {
+ if (intermediateResult1 == null && intermediateResult2 != null) {
+ return intermediateResult2;
+ } else if (intermediateResult1 != null && intermediateResult2 == null) {
+ return intermediateResult1;
+ } else if (intermediateResult1 == null && intermediateResult2 == null) {
+ return new ArrayList<>(0);
+ }
+ ArrayList<CompactSketch<IntegerSummary>> merged =
+ new ArrayList<>(intermediateResult1.size() + intermediateResult2.size());
+ merged.addAll(intermediateResult1);
+ merged.addAll(intermediateResult2);
+ return merged;
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.STRING;
+ }
+
+ @Override
+ public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+ if (integerSummarySketches == null) {
+ return null;
+ }
+ Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+ integerSummarySketches.forEach(union::union);
+ return Base64.getEncoder().encodeToString(union.getResult().toByteArray());
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
new file mode 100644
index 0000000000..0167c7a0cf
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
+
+ public SumValuesIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+ super(arguments, mode);
+ }
+
+ // TODO if extra aggregation modes are supported, make this switch
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.LONG;
+ }
+
+ @Override
+ public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+ if (integerSummarySketches == null) {
+ return null;
+ }
+ Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+ integerSummarySketches.forEach(union::union);
+ double retainedTotal = 0L;
+ CompactSketch<IntegerSummary> result = union.getResult();
+ SketchIterator<IntegerSummary> summaries = result.iterator();
+ while (summaries.next()) {
+ retainedTotal += summaries.getSummary().getValue();
+ }
+ double estimate = retainedTotal / result.getTheta();
+ return Math.round(estimate);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
new file mode 100644
index 0000000000..8bdf7f8a86
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+
+
+public class IntegerTupleSketchAggregator implements ValueAggregator {
+ IntegerSummary.Mode _mode;
+
+ public IntegerTupleSketchAggregator(IntegerSummary.Mode mode) {
+ _mode = mode;
+ }
+
+ @Override
+ public Object aggregate(Object value1, Object value2) {
+ Sketch<IntegerSummary> first = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value1);
+ Sketch<IntegerSummary> second = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value2);
+ Sketch<IntegerSummary> result = new Union<>(new IntegerSummarySetOperations(_mode, _mode)).union(first, second);
+ return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(result);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
index 54824255dd..4cd5a1ea6d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.segment.processing.aggregator;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -46,6 +47,11 @@ public class ValueAggregatorFactory {
case DISTINCTCOUNTTHETASKETCH:
case DISTINCTCOUNTRAWTHETASKETCH:
return new DistinctCountThetaSketchAggregator();
+ case DISTINCTCOUNTTUPLESKETCH:
+ case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+ case SUMVALUESINTEGERSUMTUPLESKETCH:
+ case AVGVALUEINTEGERSUMTUPLESKETCH:
+ return new IntegerTupleSketchAggregator(IntegerSummary.Mode.Sum);
default:
throw new IllegalStateException("Unsupported aggregation type: " + aggregationType);
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
index 4496bbad15..b62d363c4f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
@@ -44,6 +44,8 @@ public class SketchFunctionsTest {
}
Assert.assertEquals(thetaEstimate(SketchFunctions.toThetaSketch(null)), 0.0);
Assert.assertEquals(thetaEstimate(SketchFunctions.toThetaSketch(null, 1024)), 0.0);
+ Assert.assertThrows(IllegalArgumentException.class, () -> SketchFunctions.toThetaSketch(new Object()));
+ Assert.assertThrows(IllegalArgumentException.class, () -> SketchFunctions.toThetaSketch(new Object(), 1024));
}
private long hllEstimate(byte[] bytes) {
@@ -59,4 +61,21 @@ public class SketchFunctionsTest {
Assert.assertEquals(hllEstimate(SketchFunctions.toHLL(null)), 0);
Assert.assertEquals(hllEstimate(SketchFunctions.toHLL(null, 8)), 0);
}
+
+ private double intTupleEstimate(byte[] bytes) {
+ return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(bytes).getEstimate();
+ }
+
+ @Test
+ public void intTupleSumCreation() {
+ for (Object i : _inputs) {
+ Assert.assertEquals(intTupleEstimate(SketchFunctions.toIntegerSumTupleSketch(i, 1)), 1.0d);
+ Assert.assertEquals(intTupleEstimate(SketchFunctions.toIntegerSumTupleSketch(i, 1, 16)), 1.0d);
+ }
+ Assert.assertEquals(intTupleEstimate(SketchFunctions.toIntegerSumTupleSketch(null, 1)), 0.0d);
+ Assert.assertEquals(intTupleEstimate(SketchFunctions.toIntegerSumTupleSketch(null, 1, 16)), 0.0d);
+ Assert.assertThrows(IllegalArgumentException.class, () -> SketchFunctions.toIntegerSumTupleSketch(new Object(), 1));
+ Assert.assertThrows(IllegalArgumentException.class,
+ () -> SketchFunctions.toIntegerSumTupleSketch(new Object(), 1, 1024));
+ }
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountIntegerSumTupleSketchStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountIntegerSumTupleSketchStarTreeV2Test.java
new file mode 100644
index 0000000000..b9c52bf958
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountIntegerSumTupleSketchStarTreeV2Test.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.startree.v2;
+
+import java.util.Random;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.local.aggregator.IntegerTupleSketchValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DistinctCountIntegerSumTupleSketchStarTreeV2Test
+ extends BaseStarTreeV2Test<byte[], Sketch<IntegerSummary>> {
+
+ @Override
+ ValueAggregator<byte[], Sketch<IntegerSummary>> getValueAggregator() {
+ return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
+ }
+
+ @Override
+ DataType getRawValueType() {
+ return DataType.BYTES;
+ }
+
+ @Override
+ byte[] getRandomRawValue(Random random) {
+ IntegerSketch is = new IntegerSketch(4, IntegerSummary.Mode.Sum);
+ is.update(random.nextInt(100), random.nextInt(100));
+ return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact());
+ }
+
+ @Override
+ void assertAggregatedValue(Sketch<IntegerSummary> starTreeResult, Sketch<IntegerSummary> nonStarTreeResult) {
+ assertEquals(starTreeResult.getEstimate(), nonStarTreeResult.getEstimate());
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
new file mode 100644
index 0000000000..1440e738d1
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+public class IntegerTupleSketchValueAggregator implements ValueAggregator<byte[], Sketch<IntegerSummary>> {
+ public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+ // This changes a lot similar to the Bitmap aggregator
+ private int _maxByteSize;
+
+ private final IntegerSummary.Mode _mode;
+
+ public IntegerTupleSketchValueAggregator(IntegerSummary.Mode mode) {
+ _mode = mode;
+ }
+
+ @Override
+ public AggregationFunctionType getAggregationType() {
+ return AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH;
+ }
+
+ @Override
+ public DataType getAggregatedValueType() {
+ return AGGREGATED_VALUE_TYPE;
+ }
+
+ // Utility method to merge two sketches
+ private Sketch<IntegerSummary> union(Sketch<IntegerSummary> a, Sketch<IntegerSummary> b) {
+ return new Union<>(new IntegerSummarySetOperations(_mode, _mode)).union(a, b);
+ }
+
+ @Override
+ public Sketch<IntegerSummary> getInitialAggregatedValue(byte[] rawValue) {
+ Sketch<IntegerSummary> initialValue = deserializeAggregatedValue(rawValue);
+ _maxByteSize = Math.max(_maxByteSize, rawValue.length);
+ return initialValue;
+ }
+
+ @Override
+ public Sketch<IntegerSummary> applyRawValue(Sketch<IntegerSummary> value, byte[] rawValue) {
+ Sketch<IntegerSummary> right = deserializeAggregatedValue(rawValue);
+ Sketch<IntegerSummary> result = union(value, right).compact();
+ _maxByteSize = Math.max(_maxByteSize, result.toByteArray().length);
+ return result;
+ }
+
+ @Override
+ public Sketch<IntegerSummary> applyAggregatedValue(Sketch<IntegerSummary> value,
+ Sketch<IntegerSummary> aggregatedValue) {
+ Sketch<IntegerSummary> result = union(value, aggregatedValue);
+ _maxByteSize = Math.max(_maxByteSize, result.toByteArray().length);
+ return result;
+ }
+
+ @Override
+ public Sketch<IntegerSummary> cloneAggregatedValue(Sketch<IntegerSummary> value) {
+ return deserializeAggregatedValue(serializeAggregatedValue(value));
+ }
+
+ @Override
+ public int getMaxAggregatedValueByteSize() {
+ return _maxByteSize;
+ }
+
+ @Override
+ public byte[] serializeAggregatedValue(Sketch<IntegerSummary> value) {
+ return CustomSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(value);
+ }
+
+ @Override
+ public Sketch<IntegerSummary> deserializeAggregatedValue(byte[] bytes) {
+ return CustomSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(bytes);
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
index aa4bdb410b..b4f90c4952 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.segment.local.aggregator;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -66,6 +67,11 @@ public class ValueAggregatorFactory {
case DISTINCTCOUNTTHETASKETCH:
case DISTINCTCOUNTRAWTHETASKETCH:
return new DistinctCountThetaSketchValueAggregator();
+ case DISTINCTCOUNTTUPLESKETCH:
+ case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+ case AVGVALUEINTEGERSUMTUPLESKETCH:
+ case SUMVALUESINTEGERSUMTUPLESKETCH:
+ return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
default:
throw new IllegalStateException("Unsupported aggregation type: " + aggregationType);
}
@@ -107,6 +113,11 @@ public class ValueAggregatorFactory {
case DISTINCTCOUNTTHETASKETCH:
case DISTINCTCOUNTRAWTHETASKETCH:
return DistinctCountThetaSketchValueAggregator.AGGREGATED_VALUE_TYPE;
+ case DISTINCTCOUNTTUPLESKETCH:
+ case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+ case AVGVALUEINTEGERSUMTUPLESKETCH:
+ case SUMVALUESINTEGERSUMTUPLESKETCH:
+ return IntegerTupleSketchValueAggregator.AGGREGATED_VALUE_TYPE;
default:
throw new IllegalStateException("Unsupported aggregation type: " + aggregationType);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
index 289715680b..1ed3a3e341 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
@@ -26,6 +26,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummaryDeserializer;
import org.apache.pinot.segment.local.customobject.AvgPair;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
@@ -228,6 +230,28 @@ public class CustomSerDeUtils {
}
};
+ public static final ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>> DATA_SKETCH_INT_TUPLE_SER_DE =
+ new ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>>() {
+ @Override
+ public byte[] serialize(org.apache.datasketches.tuple.Sketch<IntegerSummary> value) {
+ return value.compact().toByteArray();
+ }
+
+ @Override
+ public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(byte[] bytes) {
+ return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+ new IntegerSummaryDeserializer());
+ }
+
+ @Override
+ public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(ByteBuffer byteBuffer) {
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+ new IntegerSummaryDeserializer());
+ }
+ };
+
public static final ObjectSerDe<RoaringBitmap> ROARING_BITMAP_SER_DE = new ObjectSerDe<RoaringBitmap>() {
@Override
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
new file mode 100644
index 0000000000..d108d799b0
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class IntegerTupleSketchValueAggregatorTest {
+
+ private byte[] sketchContaining(String key, int value) {
+ IntegerSketch is = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+ is.update(key, value);
+ return is.compact().toByteArray();
+ };
+
+ @Test
+ public void initialShouldParseASketch() {
+ IntegerTupleSketchValueAggregator agg = new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
+ assertEquals(agg.getInitialAggregatedValue(sketchContaining("hello world", 1)).getEstimate(), 1.0);
+ }
+
+ @Test
+ public void applyAggregatedValueShouldUnion() {
+ IntegerSketch s1 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+ IntegerSketch s2 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+ s1.update("a", 1);
+ s2.update("b", 1);
+ IntegerTupleSketchValueAggregator agg = new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
+ Sketch<IntegerSummary> merged = agg.applyAggregatedValue(s1, s2);
+ assertEquals(merged.getEstimate(), 2.0);
+
+ // and should update the max size
+ assertEquals(agg.getMaxAggregatedValueByteSize(), agg.serializeAggregatedValue(merged).length);
+ }
+
+ @Test
+ public void applyRawValueShouldUnion() {
+ IntegerSketch s1 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+ IntegerSketch s2 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+ s1.update("a", 1);
+ s2.update("b", 1);
+ IntegerTupleSketchValueAggregator agg = new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
+ Sketch<IntegerSummary> merged = agg.applyRawValue(s1, agg.serializeAggregatedValue(s2));
+ assertEquals(merged.getEstimate(), 2.0);
+
+ // and should update the max size
+ assertEquals(agg.getMaxAggregatedValueByteSize(), agg.serializeAggregatedValue(merged).length);
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index a9d2085c8f..7b2a02d666 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -72,6 +72,15 @@ public enum AggregationFunctionType {
KURTOSIS("kurtosis"),
FOURTHMOMENT("fourthmoment"),
+ // DataSketches Tuple Sketch support
+ DISTINCTCOUNTTUPLESKETCH("distinctCountTupleSketch"),
+
+ // DataSketches Tuple Sketch support for Integer based Tuple Sketches
+ DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH("distinctCountRawIntegerSumTupleSketch"),
+
+ SUMVALUESINTEGERSUMTUPLESKETCH("sumValuesIntegerSumTupleSketch"),
+ AVGVALUEINTEGERSUMTUPLESKETCH("avgValueIntegerSumTupleSketch"),
+
// Geo aggregation functions
STUNION("STUnion"),
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 585c56520c..4dc7496c3a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -96,6 +96,9 @@ public class CommonConstants {
// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;
+
+ public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;
+
// Whether to rewrite DistinctCount to DistinctCountBitmap
public static final String ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY = "enable.distinct.count.bitmap.override";
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org