You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/12/05 23:22:36 UTC

(pinot) branch master updated: Theta Sketch Aggregation Enhancements (#12042)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a9e319964f Theta Sketch Aggregation Enhancements (#12042)
a9e319964f is described below

commit a9e319964f642ff86f2c4a6122058bb033a2475a
Author: David Cromberge <da...@gmail.com>
AuthorDate: Tue Dec 5 23:22:30 2023 +0000

    Theta Sketch Aggregation Enhancements (#12042)
    
    * Theta Sketch Aggregation Enhancements
    
    Introduces additional parameters to the DistinctCountThetaSketch aggregation function that
    give the end-user more control over how sketches are merged.  The defaults are selected
    to ensure that the behaviour remains unchanged over the current implementation.
    
    Furthermore, an accumulator custom object is added to ensure that pairwise union
    operations are avoided as much as possible.  Instead, sketches can be aggregated
    and merged when a threshold is met.
    
    * Use correct naming convention for private variable
    
    * Fetch flaky test edge-case
    
    * Decrease default constant value
    
    This better aligns to the default nominal values parameter
    that is used in the query aggregation function.
    
    * Attempt 2: Simplify implementation
    
    Removes intermediate array list to buffer/accumulate sketch
    elements.  Instead, inputs are fed directly to the underlying
    union.  This ensures that the memory usage of the merge is
    kept under control.
    
    * Rename sampling probability parameter
    
    * Minor code improvements
    
    * Revert "Attempt 2: Simplify implementation"
    
    This reverts commit 2ed38f395ae1e3071637f58da2386e867dbc80e3.
    
    * Add toString methods for supported sketches to enable debugging
    
    * Additional inline commentary on early stop optimization.
    
    * Refactor serializer for Theta to remove temp variables
---
 .../apache/pinot/core/common/ObjectSerDeUtils.java |  43 ++++-
 .../core/function/scalar/SketchFunctions.java      |  10 +
 ...inctCountRawThetaSketchAggregationFunction.java |  19 +-
 ...istinctCountThetaSketchAggregationFunction.java | 202 ++++++++++++++-------
 .../pinot/core/common/ObjectSerDeUtilsTest.java    |  53 ++++++
 .../core/function/scalar/SketchFunctionsTest.java  |  22 +++
 .../DistinctCountThetaSketchQueriesTest.java       |  32 ++--
 .../local/customobject/ThetaSketchAccumulator.java | 141 ++++++++++++++
 .../segment/local/utils/CustomSerDeUtils.java      |   9 +-
 ...istinctCountThetaSketchValueAggregatorTest.java |  13 ++
 .../customobject/ThetaSketchAccumulatorTest.java   | 104 +++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |   4 +-
 12 files changed, 554 insertions(+), 98 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 903ef712db..14aa2d2f10 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -88,6 +88,7 @@ import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
 import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
 import org.apache.pinot.segment.local.customobject.QuantileDigest;
 import org.apache.pinot.segment.local.customobject.StringLongPair;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
 import org.apache.pinot.segment.local.customobject.VarianceTuple;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -154,7 +155,8 @@ public class ObjectSerDeUtils {
     LongArrayList(43),
     FloatArrayList(44),
     StringArrayList(45),
-    UltraLogLog(46);
+    UltraLogLog(46),
+    ThetaSketchAccumulator(47);
 
     private final int _value;
 
@@ -273,6 +275,8 @@ public class ObjectSerDeUtils {
         return ObjectType.CompressedProbabilisticCounting;
       } else if (value instanceof UltraLogLog) {
         return ObjectType.UltraLogLog;
+      } else if (value instanceof ThetaSketchAccumulator) {
+        return ObjectType.ThetaSketchAccumulator;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
       }
@@ -1125,9 +1129,12 @@ public class ObjectSerDeUtils {
 
     @Override
     public byte[] serialize(Sketch value) {
-      // NOTE: Compact the sketch in unsorted, on-heap fashion for performance concern.
-      //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html for more details.
-      return value.compact(false, null).toByteArray();
+      // The serializer should respect existing ordering to enable "early stop"
+      // optimisations on unions.
+      if (!value.isCompact()) {
+        return value.compact(value.isOrdered(), null).toByteArray();
+      }
+      return value.toByteArray();
     }
 
     @Override
@@ -1580,6 +1587,33 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<ThetaSketchAccumulator> DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE =
+      new ObjectSerDe<ThetaSketchAccumulator>() {
+
+        @Override
+        public byte[] serialize(ThetaSketchAccumulator thetaSketchBuffer) {
+          Sketch sketch = thetaSketchBuffer.getResult();
+          return sketch.toByteArray();
+        }
+
+        @Override
+        public ThetaSketchAccumulator deserialize(byte[] bytes) {
+          return deserialize(ByteBuffer.wrap(bytes));
+        }
+
+        // Note: The accumulator is designed to serialize as a sketch and should
+        // not be deserialized in practice.
+        @Override
+        public ThetaSketchAccumulator deserialize(ByteBuffer byteBuffer) {
+          ThetaSketchAccumulator thetaSketchAccumulator = new ThetaSketchAccumulator();
+          byte[] bytes = new byte[byteBuffer.remaining()];
+          byteBuffer.get(bytes);
+          Sketch sketch = Sketch.wrap(Memory.wrap(bytes));
+          thetaSketchAccumulator.apply(sketch);
+          return thetaSketchAccumulator;
+        }
+      };
+
   // NOTE: DO NOT change the order, it has to be the same order as the ObjectType
   //@formatter:off
   private static final ObjectSerDe[] SER_DES = {
@@ -1630,6 +1664,7 @@ public class ObjectSerDeUtils {
       FLOAT_ARRAY_LIST_SER_DE,
       STRING_ARRAY_LIST_SER_DE,
       ULTRA_LOG_LOG_OBJECT_SER_DE,
+      DATA_SKETCH_SKETCH_ACCUMULATOR_SER_DE,
   };
   //@formatter:on
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
index 74e35e8bb7..90e313edb2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/function/scalar/SketchFunctions.java
@@ -261,6 +261,11 @@ public class SketchFunctions {
     return diff.getResult(false, null, false);
   }
 
+  @ScalarFunction(names = {"thetaSketchToString", "theta_sketch_to_string"})
+  public static String thetaSketchToString(Object sketchObject) {
+    return asThetaSketch(sketchObject).toString();
+  }
+
   private static Sketch thetaSketchUnionVar(Object... sketchObjects) {
     Union union = SET_OPERATION_BUILDER.buildUnion();
     for (Object sketchObj : sketchObjects) {
@@ -417,6 +422,11 @@ public class SketchFunctions {
     return cpcSketchUnionVar(o1, o2, o3, o4, o5);
   }
 
+  @ScalarFunction(names = {"cpcSketchToString", "cpc_sketch_to_string"})
+  public static String cpcSketchToString(Object sketchObject) {
+    return asCpcSketch(sketchObject).toString();
+  }
+
   /**
    * Create a CPC Sketch containing the input, with a configured nominal entries
    *
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
index cd75fa3807..00d6ec2906 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import java.util.ArrayList;
 import java.util.Base64;
 import java.util.List;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
 
@@ -47,11 +49,18 @@ public class DistinctCountRawThetaSketchAggregationFunction extends DistinctCoun
   }
 
   @Override
-  public String extractFinalResult(List<Sketch> sketches) {
-    Sketch sketch = evaluatePostAggregationExpression(sketches);
+  public String extractFinalResult(List<ThetaSketchAccumulator> accumulators) {
+    int numAccumulators = accumulators.size();
+    List<Sketch> mergedSketches = new ArrayList<>(numAccumulators);
 
-    // NOTE: Compact the sketch in unsorted, on-heap fashion for performance concern.
-    //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html for more details.
-    return Base64.getEncoder().encodeToString(sketch.compact(false, null).toByteArray());
+    for (ThetaSketchAccumulator accumulator : accumulators) {
+      accumulator.setOrdered(_intermediateOrdering);
+      accumulator.setThreshold(_accumulatorThreshold);
+      accumulator.setSetOperationBuilder(_setOperationBuilder);
+      mergedSketches.add(accumulator.getResult());
+    }
+
+    Sketch sketch = evaluatePostAggregationExpression(mergedSketches);
+    return Base64.getEncoder().encodeToString(sketch.toByteArray());
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
index f3a9adfda6..0fea53db34 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.common.ResizeFactor;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.AnotB;
 import org.apache.datasketches.theta.Intersection;
@@ -48,6 +49,7 @@ import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -68,29 +70,36 @@ import org.apache.pinot.sql.parsers.CalciteSqlParser;
  *     'dimName=''course'' AND dimValue=''math''', 'SET_INTERSECT($1,$2)')
  *   </li>
  * </ul>
- * Currently there is only 1 parameter for the function:
+ * Currently, there are 5 parameters to the function:
  * <ul>
  *   <li>
  *     nominalEntries: The nominal entries used to create the sketch. (Default 4096)
+ *     resizeFactor: Controls the size multiple that affects how fast the internal cache grows (Default 2^3=8)
+ *     samplingProbability: Sets the upfront uniform sampling probability, p. (Default 1.0)
+ *     intermediateOrdering: Whether compacted sketches should be ordered. (Default false)
+ *     accumulatorThreshold: How many sketches should be kept in memory before merging. (Default 2)
  *   </li>
  * </ul>
  * <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col, 'nominalEntries=8192')
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class DistinctCountThetaSketchAggregationFunction
-    extends BaseSingleInputAggregationFunction<List<Sketch>, Comparable> {
+    extends BaseSingleInputAggregationFunction<List<ThetaSketchAccumulator>, Comparable> {
   private static final String SET_UNION = "setunion";
   private static final String SET_INTERSECT = "setintersect";
   private static final String SET_DIFF = "setdiff";
   private static final String DEFAULT_SKETCH_IDENTIFIER = "$0";
-  private static final Sketch EMPTY_SKETCH = new UpdateSketchBuilder().build().compact();
+  private static final int DEFAULT_ACCUMULATOR_THRESHOLD = 2;
+  private static final boolean DEFAULT_INTERMEDIATE_ORDERING = false;
 
   private final List<ExpressionContext> _inputExpressions;
   private final boolean _includeDefaultSketch;
   private final List<FilterEvaluator> _filterEvaluators;
   private final ExpressionContext _postAggregationExpression;
   private final UpdateSketchBuilder _updateSketchBuilder = new UpdateSketchBuilder();
-  private final SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
+  protected final SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
+  protected boolean _intermediateOrdering = DEFAULT_INTERMEDIATE_ORDERING;
+  protected int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
 
   public DistinctCountThetaSketchAggregationFunction(List<ExpressionContext> arguments) {
     super(arguments.get(0));
@@ -102,9 +111,22 @@ public class DistinctCountThetaSketchAggregationFunction
       Preconditions.checkArgument(paramsExpression.getType() == ExpressionContext.Type.LITERAL,
           "Second argument of DISTINCT_COUNT_THETA_SKETCH aggregation function must be literal (parameters)");
       Parameters parameters = new Parameters(paramsExpression.getLiteral().getStringValue());
+      // Allows the user to trade-off memory usage for merge CPU; higher values use more memory
+      _accumulatorThreshold = parameters.getAccumulatorThreshold();
+      // Ordering controls whether intermediate compact sketches are ordered in set operations
+      _intermediateOrdering = parameters.getIntermediateOrdering();
+      // Nominal entries controls sketch accuracy and size
       int nominalEntries = parameters.getNominalEntries();
       _updateSketchBuilder.setNominalEntries(nominalEntries);
       _setOperationBuilder.setNominalEntries(nominalEntries);
+      // Sampling probability sets the initial value of Theta, defaults to 1.0
+      float p = parameters.getSamplingProbability();
+      _setOperationBuilder.setP(p);
+      _updateSketchBuilder.setP(p);
+      // Resize factor controls the size multiple that affects how fast the internal cache grows
+      ResizeFactor rf = parameters.getResizeFactor();
+      _setOperationBuilder.setResizeFactor(rf);
+      _updateSketchBuilder.setResizeFactor(rf);
     }
 
     if (numArguments < 4) {
@@ -401,20 +423,20 @@ public class DistinctCountThetaSketchAggregationFunction
       }
     } else {
       // Serialized sketch
-      List<Union> unions = getUnions(aggregationResultHolder);
+      List<ThetaSketchAccumulator> thetaSketchAccumulators = getUnions(aggregationResultHolder);
       Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0], length);
       if (_includeDefaultSketch) {
-        Union defaultUnion = unions.get(0);
+        ThetaSketchAccumulator defaultThetaAccumulator = thetaSketchAccumulators.get(0);
         for (Sketch sketch : sketches) {
-          defaultUnion.union(sketch);
+          defaultThetaAccumulator.apply(sketch);
         }
       }
       for (int i = 0; i < numFilters; i++) {
         FilterEvaluator filterEvaluator = _filterEvaluators.get(i);
-        Union union = unions.get(i + 1);
+        ThetaSketchAccumulator thetaSketchAccumulator = thetaSketchAccumulators.get(i + 1);
         for (int j = 0; j < length; j++) {
           if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
-            union.union(sketches[j]);
+            thetaSketchAccumulator.apply(sketches[j]);
           }
         }
       }
@@ -631,14 +653,14 @@ public class DistinctCountThetaSketchAggregationFunction
       // Serialized sketch
       Sketch[] sketches = deserializeSketches((byte[][]) valueArrays[0], length);
       for (int i = 0; i < length; i++) {
-        List<Union> unions = getUnions(groupByResultHolder, groupKeyArray[i]);
+        List<ThetaSketchAccumulator> thetaSketchAccumulators = getUnions(groupByResultHolder, groupKeyArray[i]);
         Sketch sketch = sketches[i];
         if (_includeDefaultSketch) {
-          unions.get(0).union(sketch);
+          thetaSketchAccumulators.get(0).apply(sketch);
         }
         for (int j = 0; j < numFilters; j++) {
           if (_filterEvaluators.get(j).evaluate(singleValues, valueTypes, valueArrays, i)) {
-            unions.get(j + 1).union(sketch);
+            thetaSketchAccumulators.get(j + 1).apply(sketch);
           }
         }
       }
@@ -907,7 +929,7 @@ public class DistinctCountThetaSketchAggregationFunction
       if (_includeDefaultSketch) {
         for (int i = 0; i < length; i++) {
           for (int groupKey : groupKeysArray[i]) {
-            getUnions(groupByResultHolder, groupKey).get(0).union(sketches[i]);
+            getUnions(groupByResultHolder, groupKey).get(0).apply(sketches[i]);
           }
         }
       }
@@ -916,7 +938,7 @@ public class DistinctCountThetaSketchAggregationFunction
         for (int j = 0; j < length; j++) {
           if (filterEvaluator.evaluate(singleValues, valueTypes, valueArrays, j)) {
             for (int groupKey : groupKeysArray[i]) {
-              getUnions(groupByResultHolder, groupKey).get(i + 1).union(sketches[i]);
+              getUnions(groupByResultHolder, groupKey).get(i + 1).apply(sketches[i]);
             }
           }
         }
@@ -925,57 +947,70 @@ public class DistinctCountThetaSketchAggregationFunction
   }
 
   @Override
-  public List<Sketch> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+  public List<ThetaSketchAccumulator> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
     List result = aggregationResultHolder.getResult();
     if (result == null) {
       int numSketches = _filterEvaluators.size() + 1;
-      List<Sketch> sketches = new ArrayList<>(numSketches);
+      List<ThetaSketchAccumulator> sketches = new ArrayList<>(numSketches);
       for (int i = 0; i < numSketches; i++) {
-        sketches.add(EMPTY_SKETCH);
+        sketches.add(new ThetaSketchAccumulator(_setOperationBuilder, _intermediateOrdering, _accumulatorThreshold));
       }
       return sketches;
     }
 
     if (result.get(0) instanceof Sketch) {
-      return result;
+      int numSketches = result.size();
+      ArrayList<ThetaSketchAccumulator> thetaSketchAccumulators = new ArrayList<>(numSketches);
+      for (Object o : result) {
+        ThetaSketchAccumulator thetaSketchAccumulator =
+            new ThetaSketchAccumulator(_setOperationBuilder, _intermediateOrdering, _accumulatorThreshold);
+        thetaSketchAccumulator.apply((Sketch) o);
+        thetaSketchAccumulators.add(thetaSketchAccumulator);
+      }
+      return thetaSketchAccumulators;
     } else {
-      return convertToSketches(result);
+      return result;
     }
   }
 
   @Override
-  public List<Sketch> extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+  public List<ThetaSketchAccumulator> extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
     List result = groupByResultHolder.getResult(groupKey);
+
     if (result.get(0) instanceof Sketch) {
-      return result;
-    } else {
-      return convertToSketches(result);
+      int numSketches = result.size();
+      ArrayList<ThetaSketchAccumulator> thetaSketchAccumulators = new ArrayList<>(numSketches);
+      for (Object o : result) {
+        ThetaSketchAccumulator thetaSketchAccumulator =
+            new ThetaSketchAccumulator(_setOperationBuilder, _intermediateOrdering, _accumulatorThreshold);
+        thetaSketchAccumulator.apply((Sketch) o);
+        thetaSketchAccumulators.add(thetaSketchAccumulator);
+      }
+      return thetaSketchAccumulators;
     }
+
+    return result;
   }
 
   @Override
-  public List<Sketch> merge(List<Sketch> sketches1, List<Sketch> sketches2) {
-    int numSketches = sketches1.size();
-    List<Sketch> mergedSketches = new ArrayList<>(numSketches);
-    for (int i = 0; i < numSketches; i++) {
-      Sketch sketch1 = sketches1.get(i);
-      Sketch sketch2 = sketches2.get(i);
-      if (sketch1.isEmpty()) {
-        mergedSketches.add(sketch2);
+  public List<ThetaSketchAccumulator> merge(List<ThetaSketchAccumulator> acc1, List<ThetaSketchAccumulator> acc2) {
+    int numAccumulators = acc1.size();
+    List<ThetaSketchAccumulator> mergedAccumulators = new ArrayList<>(numAccumulators);
+    for (int i = 0; i < numAccumulators; i++) {
+      ThetaSketchAccumulator thetaSketchAccumulator1 = acc1.get(i);
+      ThetaSketchAccumulator thetaSketchAccumulator2 = acc2.get(i);
+      if (thetaSketchAccumulator1.isEmpty()) {
+        mergedAccumulators.add(thetaSketchAccumulator2);
         continue;
       }
-      if (sketch2.isEmpty()) {
-        mergedSketches.add(sketch1);
+      if (thetaSketchAccumulator2.isEmpty()) {
+        mergedAccumulators.add(thetaSketchAccumulator1);
         continue;
       }
-      Union union = _setOperationBuilder.buildUnion();
-      union.union(sketch1);
-      union.union(sketch2);
-      // NOTE: Compact the sketch in unsorted, on-heap fashion for performance concern.
-      //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html for more details.
-      mergedSketches.add(union.getResult(false, null));
+      thetaSketchAccumulator1.merge(thetaSketchAccumulator2);
+      mergedAccumulators.add(thetaSketchAccumulator1);
     }
-    return mergedSketches;
+    return mergedAccumulators;
   }
 
   @Override
@@ -989,8 +1024,18 @@ public class DistinctCountThetaSketchAggregationFunction
   }
 
   @Override
-  public Comparable extractFinalResult(List<Sketch> sketches) {
-    return Math.round(evaluatePostAggregationExpression(_postAggregationExpression, sketches).getEstimate());
+  public Comparable extractFinalResult(List<ThetaSketchAccumulator> accumulators) {
+    int numAccumulators = accumulators.size();
+    List<Sketch> mergedSketches = new ArrayList<>(numAccumulators);
+
+    for (ThetaSketchAccumulator accumulator : accumulators) {
+      accumulator.setOrdered(_intermediateOrdering);
+      accumulator.setThreshold(_accumulatorThreshold);
+      accumulator.setSetOperationBuilder(_setOperationBuilder);
+      mergedSketches.add(accumulator.getResult());
+    }
+
+    return Math.round(evaluatePostAggregationExpression(_postAggregationExpression, mergedSketches).getEstimate());
   }
 
   /**
@@ -1172,8 +1217,8 @@ public class DistinctCountThetaSketchAggregationFunction
   /**
    * Returns the Union list from the result holder or creates a new one if it does not exist.
    */
-  private List<Union> getUnions(AggregationResultHolder aggregationResultHolder) {
-    List<Union> unions = aggregationResultHolder.getResult();
+  private List<ThetaSketchAccumulator> getUnions(AggregationResultHolder aggregationResultHolder) {
+    List<ThetaSketchAccumulator> unions = aggregationResultHolder.getResult();
     if (unions == null) {
       unions = buildUnions();
       aggregationResultHolder.setValue(unions);
@@ -1196,8 +1241,8 @@ public class DistinctCountThetaSketchAggregationFunction
   /**
    * Returns the Union list for the given group key or creates a new one if it does not exist.
    */
-  private List<Union> getUnions(GroupByResultHolder groupByResultHolder, int groupKey) {
-    List<Union> unions = groupByResultHolder.getResult(groupKey);
+  private List<ThetaSketchAccumulator> getUnions(GroupByResultHolder groupByResultHolder, int groupKey) {
+    List<ThetaSketchAccumulator> unions = groupByResultHolder.getResult(groupKey);
     if (unions == null) {
       unions = buildUnions();
       groupByResultHolder.setValueForKey(groupKey, unions);
@@ -1220,11 +1265,13 @@ public class DistinctCountThetaSketchAggregationFunction
   /**
    * Builds the Union list.
    */
-  private List<Union> buildUnions() {
+  private List<ThetaSketchAccumulator> buildUnions() {
     int numUnions = _filterEvaluators.size() + 1;
-    List<Union> unions = new ArrayList<>(numUnions);
+    List<ThetaSketchAccumulator> unions = new ArrayList<>(numUnions);
     for (int i = 0; i < numUnions; i++) {
-      unions.add(_setOperationBuilder.buildUnion());
+      ThetaSketchAccumulator thetaSketchAccumulator =
+          new ThetaSketchAccumulator(_setOperationBuilder, _intermediateOrdering, _accumulatorThreshold);
+      unions.add(thetaSketchAccumulator);
     }
     return unions;
   }
@@ -1240,20 +1287,6 @@ public class DistinctCountThetaSketchAggregationFunction
     return sketches;
   }
 
-  /**
-   * Converts the given Unions to Sketches.
-   */
-  private List<Sketch> convertToSketches(List<Union> unions) {
-    int numUnions = unions.size();
-    List<Sketch> sketches = new ArrayList<>(numUnions);
-    for (Union union : unions) {
-      // NOTE: Compact the sketch in unsorted, on-heap fashion for performance concern.
-      //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html for more details.
-      sketches.add(union.getResult(false, null));
-    }
-    return sketches;
-  }
-
   /**
    * Evaluates the post-aggregation expression.
    */
@@ -1269,8 +1302,6 @@ public class DistinctCountThetaSketchAggregationFunction
       return sketches.get(extractSketchId(expression.getIdentifier()));
     }
 
-    // NOTE: Compact the sketch in unsorted, on-heap fashion for performance concern.
-    //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html for more details.
     FunctionContext function = expression.getFunction();
     String functionName = function.getFunctionName();
     List<ExpressionContext> arguments = function.getArguments();
@@ -1280,32 +1311,41 @@ public class DistinctCountThetaSketchAggregationFunction
         for (ExpressionContext argument : arguments) {
           union.union(evaluatePostAggregationExpression(argument, sketches));
         }
-        return union.getResult(false, null);
+        return union.getResult(_intermediateOrdering, null);
       case SET_INTERSECT:
         Intersection intersection = _setOperationBuilder.buildIntersection();
         for (ExpressionContext argument : arguments) {
           intersection.intersect(evaluatePostAggregationExpression(argument, sketches));
         }
-        return intersection.getResult(false, null);
+        return intersection.getResult(_intermediateOrdering, null);
       case SET_DIFF:
         AnotB diff = _setOperationBuilder.buildANotB();
         diff.setA(evaluatePostAggregationExpression(arguments.get(0), sketches));
         diff.notB(evaluatePostAggregationExpression(arguments.get(1), sketches));
-        return diff.getResult(false, null, false);
+        return diff.getResult(_intermediateOrdering, null, false);
       default:
         throw new IllegalStateException();
     }
   }
 
   /**
-   * Helper class to wrap the theta-sketch parameters.
+   * Helper class to wrap the theta-sketch parameters.  The initial values for the parameters are set to the
+   * same defaults in the Apache Datasketches library.
    */
   private static class Parameters {
     private static final char PARAMETER_DELIMITER = ';';
     private static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
     private static final String NOMINAL_ENTRIES_KEY = "nominalEntries";
+    private static final String RESIZE_FACTOR_KEY = "resizeFactor";
+    private static final String SAMPLING_PROBABILITY_KEY = "samplingProbability";
+    private static final String INTERMEDIATE_ORDERING_KEY = "intermediateOrdering";
+    private static final String ACCUMULATOR_THRESHOLD_KEY = "accumulatorThreshold";
 
+    private int _resizeFactor = ResizeFactor.X8.getValue();
     private int _nominalEntries = ThetaUtil.DEFAULT_NOMINAL_ENTRIES;
+    private int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
+    private boolean _intermediateOrdering = DEFAULT_INTERMEDIATE_ORDERING;
+    private float _samplingProbability = 1.0F;
 
     Parameters(String parametersString) {
       StringUtils.deleteWhitespace(parametersString);
@@ -1317,6 +1357,14 @@ public class DistinctCountThetaSketchAggregationFunction
         String value = keyAndValue[1];
         if (key.equalsIgnoreCase(NOMINAL_ENTRIES_KEY)) {
           _nominalEntries = Integer.parseInt(value);
+        } else if (key.equalsIgnoreCase(SAMPLING_PROBABILITY_KEY)) {
+          _samplingProbability = Float.parseFloat(value);
+        } else if (key.equalsIgnoreCase(RESIZE_FACTOR_KEY)) {
+          _resizeFactor = Integer.parseInt(value);
+        } else if (key.equalsIgnoreCase(INTERMEDIATE_ORDERING_KEY)) {
+          _intermediateOrdering = Boolean.parseBoolean(value);
+        } else if (key.equalsIgnoreCase(ACCUMULATOR_THRESHOLD_KEY)) {
+          _accumulatorThreshold = Integer.parseInt(value);
         } else {
           throw new IllegalArgumentException("Invalid parameter key: " + key);
         }
@@ -1326,6 +1374,22 @@ public class DistinctCountThetaSketchAggregationFunction
     int getNominalEntries() {
       return _nominalEntries;
     }
+
+    float getSamplingProbability() {
+      return _samplingProbability;
+    }
+
+    boolean getIntermediateOrdering() {
+      return _intermediateOrdering;
+    }
+
+    int getAccumulatorThreshold() {
+      return _accumulatorThreshold;
+    }
+
+    ResizeFactor getResizeFactor() {
+      return ResizeFactor.getRF(_resizeFactor);
+    }
   }
 
   /**
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index 8fb198474c..b9f6f985b5 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -39,6 +39,10 @@ import java.util.Map;
 import java.util.Random;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Sketches;
+import org.apache.datasketches.theta.UpdateSketch;
 import org.apache.pinot.core.query.aggregation.function.PercentileEstAggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
 import org.apache.pinot.segment.local.customobject.AvgPair;
@@ -49,6 +53,7 @@ import org.apache.pinot.segment.local.customobject.LongLongPair;
 import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
 import org.apache.pinot.segment.local.customobject.QuantileDigest;
 import org.apache.pinot.segment.local.customobject.StringLongPair;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
 import org.apache.pinot.segment.local.customobject.ValueLongPair;
 import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
 import org.testng.annotations.Test;
@@ -471,4 +476,52 @@ public class ObjectSerDeUtilsTest {
       assertEquals(actual.getState(), ull.getState(), ERROR_MESSAGE);
     }
   }
+
+  @Test
+  public void testThetaSketch() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      UpdateSketch input = Sketches.updateSketchBuilder().build();
+      int size = RANDOM.nextInt(100) + 10;
+      boolean shouldOrder = RANDOM.nextBoolean();
+
+      for (int j = 0; j < size; j++) {
+        input.update(j);
+      }
+
+      Sketch sketch = input.compact(shouldOrder, null);
+
+      byte[] bytes = ObjectSerDeUtils.serialize(sketch);
+      Sketch actual = ObjectSerDeUtils.deserialize(bytes, ObjectSerDeUtils.ObjectType.DataSketch);
+
+      assertEquals(actual.getEstimate(), sketch.getEstimate(), ERROR_MESSAGE);
+      assertEquals(actual.toByteArray(), sketch.toByteArray(), ERROR_MESSAGE);
+      assertEquals(actual.isOrdered(), shouldOrder, ERROR_MESSAGE);
+    }
+  }
+
+  @Test
+  public void testThetaSketchAccumulator() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      UpdateSketch input = Sketches.updateSketchBuilder().build();
+      int size = RANDOM.nextInt(100) + 10;
+      boolean shouldOrder = RANDOM.nextBoolean();
+
+      for (int j = 0; j < size; j++) {
+        input.update(j);
+      }
+
+      SetOperationBuilder setOperationBuilder = new SetOperationBuilder();
+      ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(setOperationBuilder, shouldOrder, 2);
+      Sketch sketch = input.compact(shouldOrder, null);
+      accumulator.apply(sketch);
+
+      byte[] bytes = ObjectSerDeUtils.serialize(accumulator);
+      ThetaSketchAccumulator actual =
+          ObjectSerDeUtils.deserialize(bytes, ObjectSerDeUtils.ObjectType.ThetaSketchAccumulator);
+
+      assertEquals(actual.getResult().getEstimate(), sketch.getEstimate(), ERROR_MESSAGE);
+      assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(), ERROR_MESSAGE);
+      assertEquals(actual.getResult().isOrdered(), shouldOrder, ERROR_MESSAGE);
+    }
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
index 11e31aaf0f..11662741db 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/function/scalar/SketchFunctionsTest.java
@@ -20,6 +20,10 @@ package org.apache.pinot.core.function.scalar;
 
 import com.dynatrace.hash4j.distinctcount.UltraLogLog;
 import java.math.BigDecimal;
+import org.apache.datasketches.cpc.CpcSketch;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Sketches;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.segment.local.utils.UltraLogLogUtils;
 import org.testng.Assert;
@@ -50,6 +54,15 @@ public class SketchFunctionsTest {
     Assert.assertThrows(IllegalArgumentException.class, () -> SketchFunctions.toThetaSketch(new Object(), 1024));
   }
 
+  @Test
+  public void thetaThetaSketchSummary() {
+    for (Object i : _inputs) {
+      Sketch sketch = Sketches.wrapSketch(Memory.wrap(SketchFunctions.toThetaSketch(i)));
+      Assert.assertEquals(SketchFunctions.thetaSketchToString(sketch), sketch.toString());
+    }
+    Assert.assertThrows(RuntimeException.class, () -> SketchFunctions.thetaSketchToString(new Object()));
+  }
+
   private long hllEstimate(byte[] bytes) {
     return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytes).cardinality();
   }
@@ -97,6 +110,15 @@ public class SketchFunctionsTest {
     Assert.assertThrows(IllegalArgumentException.class, () -> SketchFunctions.toCpcSketch(new Object(), 11));
   }
 
+  @Test
+  public void thetaCpcSketchToString() {
+    for (Object i : _inputs) {
+      CpcSketch sketch = CpcSketch.heapify(Memory.wrap(SketchFunctions.toCpcSketch(i)));
+      Assert.assertEquals(SketchFunctions.cpcSketchToString(sketch), sketch.toString());
+    }
+    Assert.assertThrows(RuntimeException.class, () -> SketchFunctions.cpcSketchToString(new Object()));
+  }
+
   private long ullEstimate(byte[] bytes) {
     // round it to a long to make it easier to assert on
     return Math.round(ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize(bytes).getDistinctCountEstimate());
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
index 4d04539b4a..98e9b90900 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchQueriesTest.java
@@ -37,6 +37,7 @@ import org.apache.pinot.core.operator.query.AggregationOperator;
 import org.apache.pinot.core.operator.query.GroupByOperator;
 import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.segment.local.customobject.ThetaSketchAccumulator;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -172,13 +173,13 @@ public class DistinctCountThetaSketchQueriesTest extends BaseQueriesTest {
     assertNotNull(aggregationResult);
     assertEquals(aggregationResult.size(), 11);
     for (int i = 0; i < 11; i++) {
-      List<Sketch> sketches = (List<Sketch>) aggregationResult.get(i);
-      assertEquals(sketches.size(), 1);
-      Sketch sketch = sketches.get(0);
+      List<ThetaSketchAccumulator> accumulators = (List<ThetaSketchAccumulator>) aggregationResult.get(i);
+      assertEquals(accumulators.size(), 1);
+      ThetaSketchAccumulator accumulator = accumulators.get(0);
       if (i < 5) {
-        assertEquals(Math.round(sketch.getEstimate()), NUM_RECORDS);
+        assertEquals(Math.round(accumulator.getResult().getEstimate()), NUM_RECORDS);
       } else {
-        assertEquals(Math.round(sketch.getEstimate()), 3 * NUM_RECORDS);
+        assertEquals(Math.round(accumulator.getResult().getEstimate()), 3 * NUM_RECORDS);
       }
     }
 
@@ -220,9 +221,10 @@ public class DistinctCountThetaSketchQueriesTest extends BaseQueriesTest {
         numGroups++;
         GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
         for (int i = 0; i < 6; i++) {
-          List<Sketch> sketches = (List<Sketch>) aggregationGroupByResult.getResultForGroupId(i, groupKey._groupId);
-          assertEquals(sketches.size(), 1);
-          Sketch sketch = sketches.get(0);
+          List<ThetaSketchAccumulator> accumulators =
+              (List<ThetaSketchAccumulator>) aggregationGroupByResult.getResultForGroupId(i, groupKey._groupId);
+          assertEquals(accumulators.size(), 1);
+          Sketch sketch = accumulators.get(0).getResult();
           if (i < 5) {
             assertEquals(Math.round(sketch.getEstimate()), 1);
           } else {
@@ -279,13 +281,13 @@ public class DistinctCountThetaSketchQueriesTest extends BaseQueriesTest {
     List<Object> aggregationResult = resultsBlock.getResults();
     assertNotNull(aggregationResult);
     assertEquals(aggregationResult.size(), 1);
-    List<Sketch> sketches = (List<Sketch>) aggregationResult.get(0);
-    assertEquals(sketches.size(), 5);
-    assertTrue(sketches.get(0).isEmpty());
-    assertEquals(Math.round(sketches.get(1).getEstimate()), 300);
-    assertEquals(Math.round(sketches.get(2).getEstimate()), 450);
-    assertEquals(Math.round(sketches.get(3).getEstimate()), 175);
-    assertEquals(Math.round(sketches.get(4).getEstimate()), 100);
+    List<ThetaSketchAccumulator> accumulators = (List<ThetaSketchAccumulator>) aggregationResult.get(0);
+    assertEquals(accumulators.size(), 5);
+    assertTrue(accumulators.get(0).getResult().isEmpty());
+    assertEquals(Math.round(accumulators.get(1).getResult().getEstimate()), 300);
+    assertEquals(Math.round(accumulators.get(2).getResult().getEstimate()), 450);
+    assertEquals(Math.round(accumulators.get(3).getResult().getEstimate()), 175);
+    assertEquals(Math.round(accumulators.get(4).getResult().getEstimate()), 100);
 
     // Inter segments
     Object[] expectedResults = new Object[]{225L};
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
new file mode 100644
index 0000000000..b1cb94812c
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import javax.annotation.Nonnull;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Union;
+
+
+/**
+ * Intermediate state used by {@code DistinctCountThetaSketchAggregationFunction} which gives
+ * the end user more control over how sketches are merged for performance.
+ * The end user can set parameters that trade-off more memory usage for more pre-aggregation.
+ * This permits use of the Union "early-stop" optimisation where ordered sketches require no further
+ * processing beyond the minimum Theta value.
+ * The union operation initialises an empty "gadget" bookkeeping sketch that is updated with hashed entries
+ * that fall below the minimum Theta value for all input sketches ("Broder Rule").  When the initial
+ * Theta value is set to the minimum immediately, further gains can be realised.
+ */
+public class ThetaSketchAccumulator {
+  private ArrayList<Sketch> _accumulator;
+  private boolean _ordered = false;
+  private SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
+  private Union _union;
+  private int _threshold;
+  private int _numInputs = 0;
+
+  public ThetaSketchAccumulator() {
+  }
+
+  // Note: The accumulator is serialized as a sketch.  This means that the majority of the processing
+  // happens on serialization. Therefore, when deserialized, the values may be null and will
+  // require re-initialisation. Since the primary use case is at query time for the Broker
+  // and Server, these properties are already in memory and are re-set.
+  public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, boolean ordered, int threshold) {
+    _setOperationBuilder = setOperationBuilder;
+    _ordered = ordered;
+    _threshold = threshold;
+  }
+
+  public void setOrdered(boolean ordered) {
+    _ordered = ordered;
+  }
+
+  public void setSetOperationBuilder(SetOperationBuilder setOperationBuilder) {
+    _setOperationBuilder = setOperationBuilder;
+  }
+
+  public void setThreshold(int threshold) {
+    _threshold = threshold;
+  }
+
+  public boolean isEmpty() {
+    return _numInputs == 0;
+  }
+
+  @Nonnull
+  public Sketch getResult() {
+    return unionAll();
+  }
+
+  public void apply(Sketch sketch) {
+    internalAdd(sketch);
+  }
+
+  public void merge(ThetaSketchAccumulator thetaUnion) {
+    if (thetaUnion.isEmpty()) {
+      return;
+    }
+    Sketch sketch = thetaUnion.getResult();
+    internalAdd(sketch);
+  }
+
+  private void internalAdd(Sketch sketch) {
+    if (sketch.isEmpty()) {
+      return;
+    }
+    if (_accumulator == null) {
+      _accumulator = new ArrayList<>(_threshold);
+    }
+    _accumulator.add(sketch);
+    _numInputs += 1;
+
+    if (_accumulator.size() >= _threshold) {
+      unionAll();
+    }
+  }
+
+  private Sketch unionAll() {
+    if (_union == null) {
+      _union = _setOperationBuilder.buildUnion();
+    }
+    // Return the default update "gadget" sketch as a compact sketch
+    if (isEmpty()) {
+      return _union.getResult(_ordered, null);
+    }
+    // Corner-case: the parameters are not strictly respected when there is a single sketch.
+    // This single sketch might have been the result of a previously accumulated union and
+    // would already have the parameters set.  The sketch is returned as-is without adjusting
+    // ordering and nominal entries which requires an additional union operation.
+    if (_numInputs == 1) {
+      return _accumulator.get(0);
+    }
+
+    // Performance optimization: ensure that the minimum Theta is used for "early stop".
+    // The "early stop" optimization is implemented in the Apache Datasketches Union operation for
+    // ordered and compact Theta sketches. Internally, a compact and ordered Theta sketch can be
+    // compared to a sorted array of K items.  When performing a union, only those items from
+    // the input sketch less than Theta need to be processed.  The loop terminates as soon as a hash
+    // is seen that is > Theta.
+    // The following "sort" improves on this further by selecting the minimal Theta value up-front,
+    // which results in fewer redundant entries being retained and subsequently discarded during the
+    // union operation.
+    _accumulator.sort(Comparator.comparingDouble(Sketch::getTheta));
+    for (Sketch accumulatedSketch : _accumulator) {
+      _union.union(accumulatedSketch);
+    }
+    _accumulator.clear();
+
+    return _union.getResult(_ordered, null);
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
index dcd689afae..2879c474a5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/CustomSerDeUtils.java
@@ -243,9 +243,12 @@ public class CustomSerDeUtils {
 
     @Override
     public byte[] serialize(Sketch value) {
-      // NOTE: Compact the sketch in unsorted, on-heap fashion for performance concern.
-      //       See https://datasketches.apache.org/docs/Theta/ThetaSize.html for more details.
-      return value.compact(false, null).toByteArray();
+      // The serializer should respect existing ordering to enable "early stop"
+      // optimisations on unions.
+      if (!value.isCompact()) {
+        return value.compact(value.isOrdered(), null).toByteArray();
+      }
+      return value.toByteArray();
     }
 
     @Override
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
index 822335cfb0..fdc820c120 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/aggregator/DistinctCountThetaSketchValueAggregatorTest.java
@@ -27,7 +27,9 @@ import org.apache.pinot.spi.utils.CommonConstants;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
 
 
 public class DistinctCountThetaSketchValueAggregatorTest {
@@ -162,4 +164,15 @@ public class DistinctCountThetaSketchValueAggregatorTest {
     byte[][] zeroSketches = {};
     assertEquals(agg.getInitialAggregatedValue(zeroSketches).getEstimate(), 0.0);
   }
+
+  @Test
+  public void shouldRetainSketchOrdering() {
+    UpdateSketch input = Sketches.updateSketchBuilder().build();
+    IntStream.range(0, 10).forEach(input::update);
+    Sketch unordered = input.compact(false, null);
+    Sketch ordered = input.compact(true, null);
+    DistinctCountThetaSketchValueAggregator agg = new DistinctCountThetaSketchValueAggregator();
+    assertTrue(agg.cloneAggregatedValue(ordered).isOrdered());
+    assertFalse(agg.cloneAggregatedValue(unordered).isOrdered());
+  }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
new file mode 100644
index 0000000000..d34ddd6447
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.customobject;
+
+import java.util.stream.IntStream;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Sketches;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class ThetaSketchAccumulatorTest {
+  private SetOperationBuilder _setOperationBuilder;
+
+  @BeforeMethod
+  public void setUp() {
+    _setOperationBuilder = new SetOperationBuilder();
+  }
+
+  @Test
+  public void testEmptyAccumulator() {
+    ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, false, 2);
+    Assert.assertTrue(accumulator.isEmpty());
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+  }
+
+  @Test
+  public void testAccumulatorWithSingleSketch() {
+    UpdateSketch input = Sketches.updateSketchBuilder().build();
+    IntStream.range(0, 1000).forEach(input::update);
+    Sketch sketch = input.compact();
+
+    ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, false, 2);
+    accumulator.apply(sketch);
+
+    Assert.assertFalse(accumulator.isEmpty());
+    Assert.assertEquals(accumulator.getResult().getEstimate(), sketch.getEstimate());
+  }
+
+  @Test
+  public void testAccumulatorMerge() {
+    UpdateSketch input1 = Sketches.updateSketchBuilder().build();
+    IntStream.range(0, 1000).forEach(input1::update);
+    Sketch sketch1 = input1.compact();
+    UpdateSketch input2 = Sketches.updateSketchBuilder().build();
+    IntStream.range(1000, 2000).forEach(input2::update);
+    Sketch sketch2 = input2.compact();
+
+    ThetaSketchAccumulator accumulator1 = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+    accumulator1.apply(sketch1);
+    ThetaSketchAccumulator accumulator2 = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+    accumulator2.apply(sketch2);
+    accumulator1.merge(accumulator2);
+
+    Assert.assertEquals(accumulator1.getResult().getEstimate(), sketch1.getEstimate() + sketch2.getEstimate());
+  }
+
+  @Test
+  public void testThresholdBehavior() {
+    UpdateSketch input1 = Sketches.updateSketchBuilder().build();
+    IntStream.range(0, 1000).forEach(input1::update);
+    Sketch sketch1 = input1.compact();
+    UpdateSketch input2 = Sketches.updateSketchBuilder().build();
+    IntStream.range(1000, 2000).forEach(input2::update);
+    Sketch sketch2 = input2.compact();
+
+    ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+    accumulator.apply(sketch1);
+    accumulator.apply(sketch2);
+
+    Assert.assertEquals(accumulator.getResult().getEstimate(), sketch1.getEstimate() + sketch2.getEstimate());
+  }
+
+  @Test
+  public void testUnionWithEmptyInput() {
+    ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+    ThetaSketchAccumulator emptyAccumulator = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+
+    accumulator.merge(emptyAccumulator);
+
+    Assert.assertTrue(accumulator.isEmpty());
+    Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 09fe5129e0..84bcea58e2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -98,9 +98,9 @@ public class CommonConstants {
     public static final int DEFAULT_HYPERLOGLOG_PLUS_P = 14;
     public static final int DEFAULT_HYPERLOGLOG_PLUS_SP = 0;
 
-    // 2 to the power of 16, for tradeoffs see datasketches library documentation:
+    // 2 to the power of 14, for tradeoffs see datasketches library documentation:
     // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
-    public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;
+    public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 16384;
 
     public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org