You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ma...@apache.org on 2023/05/25 18:57:06 UTC

[pinot] branch master updated: Integer Tuple Sketch support (#10427)

This is an automated email from the ASF dual-hosted git repository.

mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ded7e8f5ed Integer Tuple Sketch support (#10427)
ded7e8f5ed is described below

commit ded7e8f5ed63dbf41fb1cdff2e6bc86672e496aa
Author: Andi Miller <an...@andimiller.net>
AuthorDate: Thu May 25 19:56:58 2023 +0100

    Integer Tuple Sketch support (#10427)
    
    * Add support for Datasketches Integer Tuple Sketches
    
    This adds support for `BYTES` columns containing Tuple Sketches with Integer as the summary type.
    
    The added classes currently support `Sum` as the semigroup, but are generic so others can be added.
    
    Feature breakdown:
    
    1. Add transform functions that can be used to create Integer Tuple Sketches during ingestion, eg. `toIntegerSumTupleSketch(colA, colbB, 16)`
    2. Add Codecs that use the Datasketches serialization
    3. Add aggregation functions:
      * `DISTINCT_COUNT_TUPLE_SKETCH` will just get the estimate for the number of unique keys, same as Theta or HLL
      * `DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and return the raw sketch
      * `SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the sum of the value side
      * `AVG_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the average of the value side
    4. Add `ValueAggregator<_, _>`s for use in `StarTree` indexes for all 4 above aggregations
    5. Add `ValueAggregator`s for use in rollups for all 4 above aggregations
    
    * fix style
    
    * add test for sketch agg
    
    * fix mangled license headers
    
    * annotate types for old versions of java
    
    * Cache Tuple Union result so it's not recomputed
    
    * Improve null handling in Tuple aggregation functions
    
    * Cleanup in IntegerTupleSketchAggregationFunction's parameters
    
    * Make Theta and Tuple transform functions throw on unexpected key types
    
    * Clean up sum/avg implementations for Tuple Sketch values
    
    * Fix on Java 8
    
    * Expand todo for tuple sketch aggregation function
    
    * add preconditions to tuple aggregation function
    
    * empty commit to re-trigger CI
    
    * empty commit to re-trigger CI again
    
    * fix merge
    
    * empty commit to re-trigger CI again
    
    * fix merge
    
    * fix merge again
---
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  30 ++-
 .../core/function/scalar/SketchFunctions.java      |  81 ++++++--
 .../function/AggregationFunctionFactory.java       |  10 +
 ...ValueIntegerTupleSketchAggregationFunction.java |  70 +++++++
 ...CountIntegerTupleSketchAggregationFunction.java |  54 +++++
 .../IntegerTupleSketchAggregationFunction.java     | 222 +++++++++++++++++++++
 ...aluesIntegerTupleSketchAggregationFunction.java |  64 ++++++
 .../aggregator/IntegerTupleSketchAggregator.java   |  42 ++++
 .../aggregator/ValueAggregatorFactory.java         |   6 +
 .../core/function/scalar/SketchFunctionsTest.java  |  19 ++
 ...ctCountIntegerSumTupleSketchStarTreeV2Test.java |  57 ++++++
 .../IntegerTupleSketchValueAggregator.java         |  99 +++++++++
 .../local/aggregator/ValueAggregatorFactory.java   |  11 +
 .../segment/local/utils/CustomSerDeUtils.java      |  24 +++
 .../IntegerTupleSketchValueAggregatorTest.java     |  70 +++++++
 .../pinot/segment/spi/AggregationFunctionType.java |   9 +
 .../apache/pinot/spi/utils/CommonConstants.java    |   3 +
 17 files changed, 856 insertions(+), 15 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 14af9bac95..d93ea17ee9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -60,6 +60,8 @@ import java.util.Set;
 import org.apache.datasketches.kll.KllDoublesSketch;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummaryDeserializer;
 import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.core.query.aggregation.utils.argminmax.ArgMinMaxObject;
 import org.apache.pinot.core.query.distinct.DistinctTable;
@@ -131,7 +133,8 @@ public class ObjectSerDeUtils {
     VarianceTuple(33),
     PinotFourthMoment(34),
     ArgMinMaxObject(35),
-    KllDataSketch(36);
+    KllDataSketch(36),
+    IntegerTupleSketch(37);
 
     private final int _value;
 
@@ -219,6 +222,8 @@ public class ObjectSerDeUtils {
         return ObjectType.VarianceTuple;
       } else if (value instanceof PinotFourthMoment) {
         return ObjectType.PinotFourthMoment;
+      } else if (value instanceof org.apache.datasketches.tuple.Sketch) {
+        return ObjectType.IntegerTupleSketch;
       } else if (value instanceof ArgMinMaxObject) {
         return ObjectType.ArgMinMaxObject;
       } else {
@@ -926,6 +931,28 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>> DATA_SKETCH_INT_TUPLE_SER_DE =
+      new ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>>() {
+        @Override
+        public byte[] serialize(org.apache.datasketches.tuple.Sketch<IntegerSummary> value) {
+          return value.compact().toByteArray();
+        }
+
+        @Override
+        public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(byte[] bytes) {
+          return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+              new IntegerSummaryDeserializer());
+        }
+
+        @Override
+        public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(ByteBuffer byteBuffer) {
+          byte[] bytes = new byte[byteBuffer.remaining()];
+          byteBuffer.get(bytes);
+          return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+              new IntegerSummaryDeserializer());
+        }
+      };
+
   public static final ObjectSerDe<KllDoublesSketch> KLL_SKETCH_SER_DE = new ObjectSerDe<KllDoublesSketch>() {
 
     @Override
@@ -1298,6 +1325,7 @@ public class ObjectSerDeUtils {
       PINOT_FOURTH_MOMENT_OBJECT_SER_DE,
       ARG_MIN_MAX_OBJECT_SER_DE,
       KLL_SKETCH_SER_DE,
+      DATA_SKETCH_INT_TUPLE_SER_DE,
   };
   //@formatter:on
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
index 0c55880526..f6245bec6f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
@@ -23,6 +23,8 @@ import java.math.BigDecimal;
 import javax.annotation.Nullable;
 import org.apache.datasketches.theta.Sketches;
 import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.spi.annotations.ScalarFunction;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -87,20 +89,24 @@ public class SketchFunctions {
   @ScalarFunction(nullableParameters = true)
   public static byte[] toThetaSketch(@Nullable Object input, int nominalEntries) {
     UpdateSketch sketch = Sketches.updateSketchBuilder().setNominalEntries(nominalEntries).build();
-    if (input instanceof Integer) {
-      sketch.update((Integer) input);
-    } else if (input instanceof Long) {
-      sketch.update((Long) input);
-    } else if (input instanceof Float) {
-      sketch.update((Float) input);
-    } else if (input instanceof Double) {
-      sketch.update((Double) input);
-    } else if (input instanceof BigDecimal) {
-      sketch.update(((BigDecimal) input).toString());
-    } else if (input instanceof String) {
-      sketch.update((String) input);
-    } else if (input instanceof byte[]) {
-      sketch.update((byte[]) input);
+    if (input != null) {
+      if (input instanceof Integer) {
+        sketch.update((Integer) input);
+      } else if (input instanceof Long) {
+        sketch.update((Long) input);
+      } else if (input instanceof Float) {
+        sketch.update((Float) input);
+      } else if (input instanceof Double) {
+        sketch.update((Double) input);
+      } else if (input instanceof BigDecimal) {
+        sketch.update(((BigDecimal) input).toString());
+      } else if (input instanceof String) {
+        sketch.update((String) input);
+      } else if (input instanceof byte[]) {
+        sketch.update((byte[]) input);
+      } else {
+        throw new IllegalArgumentException("Unrecognised input type for Theta sketch: " + input.getClass().getName());
+      }
     }
     return ObjectSerDeUtils.DATA_SKETCH_SER_DE.serialize(sketch.compact());
   }
@@ -131,4 +137,51 @@ public class SketchFunctions {
     }
     return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hll);
   }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, @Nullable Integer value) {
+    return toIntegerSumTupleSketch(key, value, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+  }
+
+  /**
+   * Create a Tuple Sketch containing the key and value supplied
+   *
+   * @param key an Object we want to insert as the key of the sketch, may be null to return an empty sketch
+   * @param value an Integer we want to associate as the value to go along with the key, may be null to return an
+   *              empty sketch
+   * @param lgK integer representing the log of the maximum number of retained entries in the sketch, between 4 and 26
+   * @return serialized tuple sketch as bytes
+   */
+  @ScalarFunction(nullableParameters = true)
+  public static byte[] toIntegerSumTupleSketch(@Nullable Object key, Integer value, int lgK) {
+    IntegerSketch is = new IntegerSketch(lgK, IntegerSummary.Mode.Sum);
+    if (value != null && key != null) {
+      if (key instanceof Integer) {
+        is.update((Integer) key, value);
+      } else if (key instanceof Long) {
+        is.update((Long) key, value);
+      } else if (key instanceof Float) {
+        is.update((float) key, value);
+      } else if (key instanceof Double) {
+        is.update((double) key, value);
+      } else if (key instanceof BigDecimal) {
+        is.update(((BigDecimal) key).toString(), value);
+      } else if (key instanceof String) {
+        is.update((String) key, value);
+      } else if (key instanceof byte[]) {
+        is.update((byte[]) key, value);
+      } else {
+        throw new IllegalArgumentException("Unrecognised key type for Theta sketch: " + key.getClass().getName());
+      }
+    }
+    return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact());
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index f61375dc06..06fbb1db66 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.aggregation.function;
 import com.google.common.base.Preconditions;
 import java.util.List;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FunctionContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -336,6 +337,15 @@ public class AggregationFunctionFactory {
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
           case FOURTHMOMENT:
             return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT);
+          case DISTINCTCOUNTTUPLESKETCH:
+            // mode actually doesn't matter here because we only care about keys, not values
+            return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+            return new IntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case SUMVALUESINTEGERSUMTUPLESKETCH:
+            return new SumValuesIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
+          case AVGVALUEINTEGERSUMTUPLESKETCH:
+            return new AvgValueIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
           case PARENTARGMAX:
             return new ParentArgMinMaxAggregationFunction(arguments, true);
           case PARENTARGMIN:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
new file mode 100644
index 0000000000..7ef6633619
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgValueIntegerTupleSketchAggregationFunction.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class AvgValueIntegerTupleSketchAggregationFunction
+    extends IntegerTupleSketchAggregationFunction {
+
+  public AvgValueIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  // ie, if a Mode argument other than SUM is passed in, switch to the matching AggregationFunctionType
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    if (integerSummarySketches == null) {
+      return null;
+    }
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    CompactSketch<IntegerSummary> result = union.getResult();
+    SketchIterator<IntegerSummary> summaries = result.iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    if (result.getRetainedEntries() == 0) {
+      // there is nothing to average, return null
+      return null;
+    }
+    double estimate = retainedTotal / result.getRetainedEntries();
+    return Math.round(estimate);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java
new file mode 100644
index 0000000000..087337472d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountIntegerTupleSketchAggregationFunction.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class DistinctCountIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
+
+  public DistinctCountIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments,
+      IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    return Double.valueOf(union.getResult().getEstimate()).longValue();
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
new file mode 100644
index 0000000000..fde88dc808
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/IntegerTupleSketchAggregationFunction.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/***
+ * This is the base class for all Integer Tuple Sketch aggregations
+ *
+ * Note that it only supports BYTES columns containing serialized sketches currently, but could be expanded to more
+ */
+public class IntegerTupleSketchAggregationFunction
+    extends BaseSingleInputAggregationFunction<List<CompactSketch<IntegerSummary>>, Comparable> {
+  final ExpressionContext _expressionContext;
+  final IntegerSummarySetOperations _setOps;
+  final int _entries;
+
+  public IntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments.get(0));
+
+    Preconditions.checkArgument(arguments.size() <= 2,
+        "Tuple Sketch Aggregation Function expects at most 2 arguments, got: %s", arguments.size());
+    _expressionContext = arguments.get(0);
+    _setOps = new IntegerSummarySetOperations(mode, mode);
+    if (arguments.size() == 2) {
+      FieldSpec.DataType dataType = arguments.get(1).getLiteral().getType();
+      Preconditions.checkArgument(dataType == FieldSpec.DataType.LONG || dataType == FieldSpec.DataType.INT,
+          "Tuple Sketch Aggregation Function expected the second argument to be a number of entries to keep, but it "
+              + "was of type %s",
+          dataType.toString());
+      _entries = ((Long) arguments.get(1).getLiteral().getValue()).intValue();
+    } else {
+      _entries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+    }
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized Integer Tuple Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+    if (storedType == FieldSpec.DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        List<CompactSketch<IntegerSummary>> integerSketch = aggregationResultHolder.getResult();
+        if (integerSketch != null) {
+          List<CompactSketch<IntegerSummary>> sketches =
+              Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+                  .map(Sketch::compact).collect(Collectors.toList());
+          aggregationResultHolder.setValue(merge(aggregationResultHolder.getResult(), sketches));
+        } else {
+          List<CompactSketch<IntegerSummary>> sketches =
+              Arrays.stream(bytesValues).map(ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE::deserialize)
+                  .map(Sketch::compact).collect(Collectors.toList());
+          aggregationResultHolder.setValue(sketches);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
+      }
+    } else {
+      throw new IllegalStateException("Illegal data type for " + getType() + " aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+
+    // Treat BYTES value as serialized Integer Tuple Sketch
+    FieldSpec.DataType storedType = blockValSet.getValueType().getStoredType();
+
+    if (storedType == FieldSpec.DataType.BYTES) {
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      try {
+        for (int i = 0; i < length; i++) {
+          byte[] value = bytesValues[i];
+          int groupKey = groupKeyArray[i];
+          CompactSketch<IntegerSummary> newSketch =
+              ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
+          if (groupByResultHolder.getResult(groupKey) == null) {
+            ArrayList<CompactSketch<IntegerSummary>> newList = new ArrayList<>();
+            newList.add(newSketch);
+            groupByResultHolder.setValueForKey(groupKey, newList);
+          } else {
+            groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
+          }
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
+      }
+    } else {
+      throw new IllegalStateException(
+          "Illegal data type for INTEGER_TUPLE_SKETCH_UNION aggregation function: " + storedType);
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    byte[][] valueArray = blockValSetMap.get(_expression).getBytesValuesSV();
+    for (int i = 0; i < length; i++) {
+      byte[] value = valueArray[i];
+      CompactSketch<IntegerSummary> newSketch =
+          ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
+      for (int groupKey : groupKeysArray[i]) {
+        if (groupByResultHolder.getResult(groupKey) == null) {
+          groupByResultHolder.setValueForKey(groupKey, Collections.singletonList(newSketch));
+        } else {
+          groupByResultHolder.<List<CompactSketch<IntegerSummary>>>getResult(groupKey).add(newSketch);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<CompactSketch<IntegerSummary>> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    return aggregationResultHolder.getResult();
+  }
+
+  @Override
+  public List<CompactSketch<IntegerSummary>> extractGroupByResult(GroupByResultHolder groupByResultHolder,
+      int groupKey) {
+    return groupByResultHolder.getResult(groupKey);
+  }
+
+  @Override
+  public List<CompactSketch<IntegerSummary>> merge(List<CompactSketch<IntegerSummary>> intermediateResult1,
+      List<CompactSketch<IntegerSummary>> intermediateResult2) {
+    if (intermediateResult1 == null && intermediateResult2 != null) {
+      return intermediateResult2;
+    } else if (intermediateResult1 != null && intermediateResult2 == null) {
+      return intermediateResult1;
+    } else if (intermediateResult1 == null && intermediateResult2 == null) {
+      return new ArrayList<>(0);
+    }
+    ArrayList<CompactSketch<IntegerSummary>> merged =
+        new ArrayList<>(intermediateResult1.size() + intermediateResult2.size());
+    merged.addAll(intermediateResult1);
+    merged.addAll(intermediateResult2);
+    return merged;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.STRING;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    if (integerSummarySketches == null) {
+      return null;
+    }
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    return Base64.getEncoder().encodeToString(union.getResult().toByteArray());
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
new file mode 100644
index 0000000000..0167c7a0cf
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumValuesIntegerTupleSketchAggregationFunction.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.List;
+import org.apache.datasketches.tuple.CompactSketch;
+import org.apache.datasketches.tuple.SketchIterator;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
+
+  public SumValuesIntegerTupleSketchAggregationFunction(List<ExpressionContext> arguments, IntegerSummary.Mode mode) {
+    super(arguments, mode);
+  }
+
+  // TODO if extra aggregation modes are supported, make this switch
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+
+  @Override
+  public Comparable extractFinalResult(List<CompactSketch<IntegerSummary>> integerSummarySketches) {
+    if (integerSummarySketches == null) {
+      return null;
+    }
+    Union<IntegerSummary> union = new Union<>(_entries, _setOps);
+    integerSummarySketches.forEach(union::union);
+    double retainedTotal = 0L;
+    CompactSketch<IntegerSummary> result = union.getResult();
+    SketchIterator<IntegerSummary> summaries = result.iterator();
+    while (summaries.next()) {
+      retainedTotal += summaries.getSummary().getValue();
+    }
+    double estimate = retainedTotal / result.getTheta();
+    return Math.round(estimate);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
new file mode 100644
index 0000000000..8bdf7f8a86
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.processing.aggregator;
+
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+
+
+public class IntegerTupleSketchAggregator implements ValueAggregator {
+  IntegerSummary.Mode _mode;
+
+  public IntegerTupleSketchAggregator(IntegerSummary.Mode mode) {
+    _mode = mode;
+  }
+
+  @Override
+  public Object aggregate(Object value1, Object value2) {
+    Sketch<IntegerSummary> first = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value1);
+    Sketch<IntegerSummary> second = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value2);
+    Sketch<IntegerSummary> result = new Union<>(new IntegerSummarySetOperations(_mode, _mode)).union(first, second);
+    return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(result);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
index 54824255dd..4cd5a1ea6d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.segment.processing.aggregator;
 
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -46,6 +47,11 @@ public class ValueAggregatorFactory {
       case DISTINCTCOUNTTHETASKETCH:
       case DISTINCTCOUNTRAWTHETASKETCH:
         return new DistinctCountThetaSketchAggregator();
+      case DISTINCTCOUNTTUPLESKETCH:
+      case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+      case SUMVALUESINTEGERSUMTUPLESKETCH:
+      case AVGVALUEINTEGERSUMTUPLESKETCH:
+        return new IntegerTupleSketchAggregator(IntegerSummary.Mode.Sum);
       default:
         throw new IllegalStateException("Unsupported aggregation type: " + aggregationType);
     }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
index 4496bbad15..b62d363c4f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
@@ -44,6 +44,8 @@ public class SketchFunctionsTest {
     }
     Assert.assertEquals(thetaEstimate(SketchFunctions.toThetaSketch(null)), 0.0);
     Assert.assertEquals(thetaEstimate(SketchFunctions.toThetaSketch(null, 1024)), 0.0);
+    Assert.assertThrows(IllegalArgumentException.class, () -> SketchFunctions.toThetaSketch(new Object()));
+    Assert.assertThrows(IllegalArgumentException.class, () -> SketchFunctions.toThetaSketch(new Object(), 1024));
   }
 
   private long hllEstimate(byte[] bytes) {
@@ -59,4 +61,21 @@ public class SketchFunctionsTest {
     Assert.assertEquals(hllEstimate(SketchFunctions.toHLL(null)), 0);
     Assert.assertEquals(hllEstimate(SketchFunctions.toHLL(null, 8)), 0);
   }
+
+  private double intTupleEstimate(byte[] bytes) {
+    return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(bytes).getEstimate();
+  }
+
+  @Test
+  public void intTupleSumCreation() {
+    for (Object i : _inputs) {
+      Assert.assertEquals(intTupleEstimate(SketchFunctions.toIntegerSumTupleSketch(i, 1)), 1.0d);
+      Assert.assertEquals(intTupleEstimate(SketchFunctions.toIntegerSumTupleSketch(i, 1, 16)), 1.0d);
+    }
+    Assert.assertEquals(intTupleEstimate(SketchFunctions.toIntegerSumTupleSketch(null, 1)), 0.0d);
+    Assert.assertEquals(intTupleEstimate(SketchFunctions.toIntegerSumTupleSketch(null, 1, 16)), 0.0d);
+    Assert.assertThrows(IllegalArgumentException.class, () -> SketchFunctions.toIntegerSumTupleSketch(new Object(), 1));
+    Assert.assertThrows(IllegalArgumentException.class,
+        () -> SketchFunctions.toIntegerSumTupleSketch(new Object(), 1, 1024));
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountIntegerSumTupleSketchStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountIntegerSumTupleSketchStarTreeV2Test.java
new file mode 100644
index 0000000000..b9c52bf958
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountIntegerSumTupleSketchStarTreeV2Test.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.startree.v2;
+
+import java.util.Random;
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.local.aggregator.IntegerTupleSketchValueAggregator;
+import org.apache.pinot.segment.local.aggregator.ValueAggregator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DistinctCountIntegerSumTupleSketchStarTreeV2Test
+    extends BaseStarTreeV2Test<byte[], Sketch<IntegerSummary>> {
+
+  @Override
+  ValueAggregator<byte[], Sketch<IntegerSummary>> getValueAggregator() {
+    return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
+  }
+
+  @Override
+  DataType getRawValueType() {
+    return DataType.BYTES;
+  }
+
+  @Override
+  byte[] getRandomRawValue(Random random) {
+    IntegerSketch is = new IntegerSketch(4, IntegerSummary.Mode.Sum);
+    is.update(random.nextInt(100), random.nextInt(100));
+    return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(is.compact());
+  }
+
+  @Override
+  void assertAggregatedValue(Sketch<IntegerSummary> starTreeResult, Sketch<IntegerSummary> nonStarTreeResult) {
+    assertEquals(starTreeResult.getEstimate(), nonStarTreeResult.getEstimate());
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
new file mode 100644
index 0000000000..1440e738d1
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregator.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.Union;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
+import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+public class IntegerTupleSketchValueAggregator implements ValueAggregator<byte[], Sketch<IntegerSummary>> {
+  public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
+
+  // This changes a lot similar to the Bitmap aggregator
+  private int _maxByteSize;
+
+  private final IntegerSummary.Mode _mode;
+
+  public IntegerTupleSketchValueAggregator(IntegerSummary.Mode mode) {
+    _mode = mode;
+  }
+
+  @Override
+  public AggregationFunctionType getAggregationType() {
+    return AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH;
+  }
+
+  @Override
+  public DataType getAggregatedValueType() {
+    return AGGREGATED_VALUE_TYPE;
+  }
+
+  // Utility method to merge two sketches
+  private Sketch<IntegerSummary> union(Sketch<IntegerSummary> a, Sketch<IntegerSummary> b) {
+    return new Union<>(new IntegerSummarySetOperations(_mode, _mode)).union(a, b);
+  }
+
+  @Override
+  public Sketch<IntegerSummary> getInitialAggregatedValue(byte[] rawValue) {
+    Sketch<IntegerSummary> initialValue = deserializeAggregatedValue(rawValue);
+    _maxByteSize = Math.max(_maxByteSize, rawValue.length);
+    return initialValue;
+  }
+
+  @Override
+  public Sketch<IntegerSummary> applyRawValue(Sketch<IntegerSummary> value, byte[] rawValue) {
+    Sketch<IntegerSummary> right = deserializeAggregatedValue(rawValue);
+    Sketch<IntegerSummary> result = union(value, right).compact();
+    _maxByteSize = Math.max(_maxByteSize, result.toByteArray().length);
+    return result;
+  }
+
+  @Override
+  public Sketch<IntegerSummary> applyAggregatedValue(Sketch<IntegerSummary> value,
+      Sketch<IntegerSummary> aggregatedValue) {
+    Sketch<IntegerSummary> result = union(value, aggregatedValue);
+    _maxByteSize = Math.max(_maxByteSize, result.toByteArray().length);
+    return result;
+  }
+
+  @Override
+  public Sketch<IntegerSummary> cloneAggregatedValue(Sketch<IntegerSummary> value) {
+    return deserializeAggregatedValue(serializeAggregatedValue(value));
+  }
+
+  @Override
+  public int getMaxAggregatedValueByteSize() {
+    return _maxByteSize;
+  }
+
+  @Override
+  public byte[] serializeAggregatedValue(Sketch<IntegerSummary> value) {
+    return CustomSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(value);
+  }
+
+  @Override
+  public Sketch<IntegerSummary> deserializeAggregatedValue(byte[] bytes) {
+    return CustomSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(bytes);
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
index aa4bdb410b..b4f90c4952 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/ValueAggregatorFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.aggregator;
 
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
@@ -66,6 +67,11 @@ public class ValueAggregatorFactory {
       case DISTINCTCOUNTTHETASKETCH:
       case DISTINCTCOUNTRAWTHETASKETCH:
         return new DistinctCountThetaSketchValueAggregator();
+      case DISTINCTCOUNTTUPLESKETCH:
+      case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+      case AVGVALUEINTEGERSUMTUPLESKETCH:
+      case SUMVALUESINTEGERSUMTUPLESKETCH:
+        return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
       default:
         throw new IllegalStateException("Unsupported aggregation type: " + aggregationType);
     }
@@ -107,6 +113,11 @@ public class ValueAggregatorFactory {
       case DISTINCTCOUNTTHETASKETCH:
       case DISTINCTCOUNTRAWTHETASKETCH:
         return DistinctCountThetaSketchValueAggregator.AGGREGATED_VALUE_TYPE;
+      case DISTINCTCOUNTTUPLESKETCH:
+      case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH:
+      case AVGVALUEINTEGERSUMTUPLESKETCH:
+      case SUMVALUESINTEGERSUMTUPLESKETCH:
+        return IntegerTupleSketchValueAggregator.AGGREGATED_VALUE_TYPE;
       default:
         throw new IllegalStateException("Unsupported aggregation type: " + aggregationType);
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
index 289715680b..1ed3a3e341 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
@@ -26,6 +26,8 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.apache.datasketches.tuple.aninteger.IntegerSummaryDeserializer;
 import org.apache.pinot.segment.local.customobject.AvgPair;
 import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
 import org.apache.pinot.segment.local.customobject.QuantileDigest;
@@ -228,6 +230,28 @@ public class CustomSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>> DATA_SKETCH_INT_TUPLE_SER_DE =
+      new ObjectSerDe<org.apache.datasketches.tuple.Sketch<IntegerSummary>>() {
+        @Override
+        public byte[] serialize(org.apache.datasketches.tuple.Sketch<IntegerSummary> value) {
+          return value.compact().toByteArray();
+        }
+
+        @Override
+        public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(byte[] bytes) {
+          return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+              new IntegerSummaryDeserializer());
+        }
+
+        @Override
+        public org.apache.datasketches.tuple.Sketch<IntegerSummary> deserialize(ByteBuffer byteBuffer) {
+          byte[] bytes = new byte[byteBuffer.remaining()];
+          byteBuffer.get(bytes);
+          return org.apache.datasketches.tuple.Sketches.heapifySketch(Memory.wrap(bytes),
+              new IntegerSummaryDeserializer());
+        }
+      };
+
   public static final ObjectSerDe<RoaringBitmap> ROARING_BITMAP_SER_DE = new ObjectSerDe<RoaringBitmap>() {
 
     @Override
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
new file mode 100644
index 0000000000..d108d799b0
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/IntegerTupleSketchValueAggregatorTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.aggregator;
+
+import org.apache.datasketches.tuple.Sketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSketch;
+import org.apache.datasketches.tuple.aninteger.IntegerSummary;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class IntegerTupleSketchValueAggregatorTest {
+
+  private byte[] sketchContaining(String key, int value) {
+    IntegerSketch is = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+    is.update(key, value);
+    return is.compact().toByteArray();
+  };
+
+  @Test
+  public void initialShouldParseASketch() {
+    IntegerTupleSketchValueAggregator agg = new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
+    assertEquals(agg.getInitialAggregatedValue(sketchContaining("hello world", 1)).getEstimate(), 1.0);
+  }
+
+  @Test
+  public void applyAggregatedValueShouldUnion() {
+    IntegerSketch s1 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+    IntegerSketch s2 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+    s1.update("a", 1);
+    s2.update("b", 1);
+    IntegerTupleSketchValueAggregator agg = new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
+    Sketch<IntegerSummary> merged = agg.applyAggregatedValue(s1, s2);
+    assertEquals(merged.getEstimate(), 2.0);
+
+    // and should update the max size
+    assertEquals(agg.getMaxAggregatedValueByteSize(), agg.serializeAggregatedValue(merged).length);
+  }
+
+  @Test
+  public void applyRawValueShouldUnion() {
+    IntegerSketch s1 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+    IntegerSketch s2 = new IntegerSketch(16, IntegerSummary.Mode.Sum);
+    s1.update("a", 1);
+    s2.update("b", 1);
+    IntegerTupleSketchValueAggregator agg = new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
+    Sketch<IntegerSummary> merged = agg.applyRawValue(s1, agg.serializeAggregatedValue(s2));
+    assertEquals(merged.getEstimate(), 2.0);
+
+    // and should update the max size
+    assertEquals(agg.getMaxAggregatedValueByteSize(), agg.serializeAggregatedValue(merged).length);
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index a9d2085c8f..7b2a02d666 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -72,6 +72,15 @@ public enum AggregationFunctionType {
   KURTOSIS("kurtosis"),
   FOURTHMOMENT("fourthmoment"),
 
+  // DataSketches Tuple Sketch support
+  DISTINCTCOUNTTUPLESKETCH("distinctCountTupleSketch"),
+
+  // DataSketches Tuple Sketch support for Integer based Tuple Sketches
+  DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH("distinctCountRawIntegerSumTupleSketch"),
+
+  SUMVALUESINTEGERSUMTUPLESKETCH("sumValuesIntegerSumTupleSketch"),
+  AVGVALUEINTEGERSUMTUPLESKETCH("avgValueIntegerSumTupleSketch"),
+
   // Geo aggregation functions
   STUNION("STUnion"),
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 585c56520c..4dc7496c3a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -96,6 +96,9 @@ public class CommonConstants {
     // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
     public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;
 
+
+    public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;
+
     // Whether to rewrite DistinctCount to DistinctCountBitmap
     public static final String ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY = "enable.distinct.count.bitmap.override";
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org