You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/12/15 20:08:32 UTC

(pinot) branch master updated: Remove parameters from ThetaSketchAggregation function (#12147)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new aa834f4ccc Remove parameters from ThetaSketchAggregation function (#12147)
aa834f4ccc is described below

commit aa834f4ccce46326b339348efd60f1bd1df162fa
Author: David Cromberge <da...@gmail.com>
AuthorDate: Fri Dec 15 20:08:26 2023 +0000

    Remove parameters from ThetaSketchAggregation function (#12147)
---
 ...inctCountRawThetaSketchAggregationFunction.java |  1 -
 ...istinctCountThetaSketchAggregationFunction.java | 45 ++++------------------
 .../pinot/core/common/ObjectSerDeUtilsTest.java    |  6 +--
 .../local/customobject/ThetaSketchAccumulator.java | 14 ++-----
 .../customobject/ThetaSketchAccumulatorTest.java   | 14 +++----
 5 files changed, 21 insertions(+), 59 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
index 00d6ec2906..b809193b80 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
@@ -54,7 +54,6 @@ public class DistinctCountRawThetaSketchAggregationFunction extends DistinctCoun
     List<Sketch> mergedSketches = new ArrayList<>(numAccumulators);
 
     for (ThetaSketchAccumulator accumulator : accumulators) {
-      accumulator.setOrdered(_intermediateOrdering);
       accumulator.setThreshold(_accumulatorThreshold);
       accumulator.setSetOperationBuilder(_setOperationBuilder);
       mergedSketches.add(accumulator.getResult());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
index 0fea53db34..4a9a846acd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.datasketches.common.ResizeFactor;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.AnotB;
 import org.apache.datasketches.theta.Intersection;
@@ -70,13 +69,11 @@ import org.apache.pinot.sql.parsers.CalciteSqlParser;
  *     'dimName=''course'' AND dimValue=''math''', 'SET_INTERSECT($1,$2)')
  *   </li>
  * </ul>
- * Currently, there are 5 parameters to the function:
+ * Currently, there are 3 parameters to the function:
  * <ul>
  *   <li>
  *     nominalEntries: The nominal entries used to create the sketch. (Default 4096)
- *     resizeFactor: Controls the size multiple that affects how fast the internal cache grows (Default 2^3=8)
  *     samplingProbability: Sets the upfront uniform sampling probability, p. (Default 1.0)
- *     intermediateOrdering: Whether compacted sketches should be ordered. (Default false)
  *     accumulatorThreshold: How many sketches should be kept in memory before merging. (Default 2)
  *   </li>
  * </ul>
@@ -90,15 +87,12 @@ public class DistinctCountThetaSketchAggregationFunction
   private static final String SET_DIFF = "setdiff";
   private static final String DEFAULT_SKETCH_IDENTIFIER = "$0";
   private static final int DEFAULT_ACCUMULATOR_THRESHOLD = 2;
-  private static final boolean DEFAULT_INTERMEDIATE_ORDERING = false;
-
   private final List<ExpressionContext> _inputExpressions;
   private final boolean _includeDefaultSketch;
   private final List<FilterEvaluator> _filterEvaluators;
   private final ExpressionContext _postAggregationExpression;
   private final UpdateSketchBuilder _updateSketchBuilder = new UpdateSketchBuilder();
   protected final SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
-  protected boolean _intermediateOrdering = DEFAULT_INTERMEDIATE_ORDERING;
   protected int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
 
   public DistinctCountThetaSketchAggregationFunction(List<ExpressionContext> arguments) {
@@ -113,8 +107,6 @@ public class DistinctCountThetaSketchAggregationFunction
       Parameters parameters = new Parameters(paramsExpression.getLiteral().getStringValue());
       // Allows the user to trade-off memory usage for merge CPU; higher values use more memory
       _accumulatorThreshold = parameters.getAccumulatorThreshold();
-      // Ordering controls whether intermediate compact sketches are ordered in set operations
-      _intermediateOrdering = parameters.getIntermediateOrdering();
       // Nominal entries controls sketch accuracy and size
       int nominalEntries = parameters.getNominalEntries();
       _updateSketchBuilder.setNominalEntries(nominalEntries);
@@ -123,10 +115,6 @@ public class DistinctCountThetaSketchAggregationFunction
       float p = parameters.getSamplingProbability();
       _setOperationBuilder.setP(p);
       _updateSketchBuilder.setP(p);
-      // Resize factor controls the size multiple that affects how fast the internal cache grows
-      ResizeFactor rf = parameters.getResizeFactor();
-      _setOperationBuilder.setResizeFactor(rf);
-      _updateSketchBuilder.setResizeFactor(rf);
     }
 
     if (numArguments < 4) {
@@ -953,7 +941,7 @@ public class DistinctCountThetaSketchAggregationFunction
       int numSketches = _filterEvaluators.size() + 1;
       List<ThetaSketchAccumulator> sketches = new ArrayList<>(numSketches);
       for (int i = 0; i < numSketches; i++) {
-        sketches.add(new ThetaSketchAccumulator(_setOperationBuilder, _intermediateOrdering, _accumulatorThreshold));
+        sketches.add(new ThetaSketchAccumulator(_setOperationBuilder, _accumulatorThreshold));
       }
       return sketches;
     }
@@ -963,7 +951,7 @@ public class DistinctCountThetaSketchAggregationFunction
       ArrayList<ThetaSketchAccumulator> thetaSketchAccumulators = new ArrayList<>(numSketches);
       for (Object o : result) {
         ThetaSketchAccumulator thetaSketchAccumulator =
-            new ThetaSketchAccumulator(_setOperationBuilder, _intermediateOrdering, _accumulatorThreshold);
+            new ThetaSketchAccumulator(_setOperationBuilder, _accumulatorThreshold);
         thetaSketchAccumulator.apply((Sketch) o);
         thetaSketchAccumulators.add(thetaSketchAccumulator);
       }
@@ -982,7 +970,7 @@ public class DistinctCountThetaSketchAggregationFunction
       ArrayList<ThetaSketchAccumulator> thetaSketchAccumulators = new ArrayList<>(numSketches);
       for (Object o : result) {
         ThetaSketchAccumulator thetaSketchAccumulator =
-            new ThetaSketchAccumulator(_setOperationBuilder, _intermediateOrdering, _accumulatorThreshold);
+            new ThetaSketchAccumulator(_setOperationBuilder, _accumulatorThreshold);
         thetaSketchAccumulator.apply((Sketch) o);
         thetaSketchAccumulators.add(thetaSketchAccumulator);
       }
@@ -1029,7 +1017,6 @@ public class DistinctCountThetaSketchAggregationFunction
     List<Sketch> mergedSketches = new ArrayList<>(numAccumulators);
 
     for (ThetaSketchAccumulator accumulator : accumulators) {
-      accumulator.setOrdered(_intermediateOrdering);
       accumulator.setThreshold(_accumulatorThreshold);
       accumulator.setSetOperationBuilder(_setOperationBuilder);
       mergedSketches.add(accumulator.getResult());
@@ -1270,7 +1257,7 @@ public class DistinctCountThetaSketchAggregationFunction
     List<ThetaSketchAccumulator> unions = new ArrayList<>(numUnions);
     for (int i = 0; i < numUnions; i++) {
       ThetaSketchAccumulator thetaSketchAccumulator =
-          new ThetaSketchAccumulator(_setOperationBuilder, _intermediateOrdering, _accumulatorThreshold);
+          new ThetaSketchAccumulator(_setOperationBuilder, _accumulatorThreshold);
       unions.add(thetaSketchAccumulator);
     }
     return unions;
@@ -1311,18 +1298,18 @@ public class DistinctCountThetaSketchAggregationFunction
         for (ExpressionContext argument : arguments) {
           union.union(evaluatePostAggregationExpression(argument, sketches));
         }
-        return union.getResult(_intermediateOrdering, null);
+        return union.getResult(false, null);
       case SET_INTERSECT:
         Intersection intersection = _setOperationBuilder.buildIntersection();
         for (ExpressionContext argument : arguments) {
           intersection.intersect(evaluatePostAggregationExpression(argument, sketches));
         }
-        return intersection.getResult(_intermediateOrdering, null);
+        return intersection.getResult(false, null);
       case SET_DIFF:
         AnotB diff = _setOperationBuilder.buildANotB();
         diff.setA(evaluatePostAggregationExpression(arguments.get(0), sketches));
         diff.notB(evaluatePostAggregationExpression(arguments.get(1), sketches));
-        return diff.getResult(_intermediateOrdering, null, false);
+        return diff.getResult(false, null, false);
       default:
         throw new IllegalStateException();
     }
@@ -1336,15 +1323,11 @@ public class DistinctCountThetaSketchAggregationFunction
     private static final char PARAMETER_DELIMITER = ';';
     private static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
     private static final String NOMINAL_ENTRIES_KEY = "nominalEntries";
-    private static final String RESIZE_FACTOR_KEY = "resizeFactor";
     private static final String SAMPLING_PROBABILITY_KEY = "samplingProbability";
-    private static final String INTERMEDIATE_ORDERING_KEY = "intermediateOrdering";
     private static final String ACCUMULATOR_THRESHOLD_KEY = "accumulatorThreshold";
 
-    private int _resizeFactor = ResizeFactor.X8.getValue();
     private int _nominalEntries = ThetaUtil.DEFAULT_NOMINAL_ENTRIES;
     private int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
-    private boolean _intermediateOrdering = DEFAULT_INTERMEDIATE_ORDERING;
     private float _samplingProbability = 1.0F;
 
     Parameters(String parametersString) {
@@ -1359,10 +1342,6 @@ public class DistinctCountThetaSketchAggregationFunction
           _nominalEntries = Integer.parseInt(value);
         } else if (key.equalsIgnoreCase(SAMPLING_PROBABILITY_KEY)) {
           _samplingProbability = Float.parseFloat(value);
-        } else if (key.equalsIgnoreCase(RESIZE_FACTOR_KEY)) {
-          _resizeFactor = Integer.parseInt(value);
-        } else if (key.equalsIgnoreCase(INTERMEDIATE_ORDERING_KEY)) {
-          _intermediateOrdering = Boolean.parseBoolean(value);
         } else if (key.equalsIgnoreCase(ACCUMULATOR_THRESHOLD_KEY)) {
           _accumulatorThreshold = Integer.parseInt(value);
         } else {
@@ -1379,17 +1358,9 @@ public class DistinctCountThetaSketchAggregationFunction
       return _samplingProbability;
     }
 
-    boolean getIntermediateOrdering() {
-      return _intermediateOrdering;
-    }
-
     int getAccumulatorThreshold() {
       return _accumulatorThreshold;
     }
-
-    ResizeFactor getResizeFactor() {
-      return ResizeFactor.getRF(_resizeFactor);
-    }
   }
 
   /**
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index b9f6f985b5..01c39b0105 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -504,15 +504,14 @@ public class ObjectSerDeUtilsTest {
     for (int i = 0; i < NUM_ITERATIONS; i++) {
       UpdateSketch input = Sketches.updateSketchBuilder().build();
       int size = RANDOM.nextInt(100) + 10;
-      boolean shouldOrder = RANDOM.nextBoolean();
 
       for (int j = 0; j < size; j++) {
         input.update(j);
       }
 
       SetOperationBuilder setOperationBuilder = new SetOperationBuilder();
-      ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(setOperationBuilder, shouldOrder, 2);
-      Sketch sketch = input.compact(shouldOrder, null);
+      ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(setOperationBuilder, 2);
+      Sketch sketch = input.compact(false, null);
       accumulator.apply(sketch);
 
       byte[] bytes = ObjectSerDeUtils.serialize(accumulator);
@@ -521,7 +520,6 @@ public class ObjectSerDeUtilsTest {
 
       assertEquals(actual.getResult().getEstimate(), sketch.getEstimate(), ERROR_MESSAGE);
       assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(), ERROR_MESSAGE);
-      assertEquals(actual.getResult().isOrdered(), shouldOrder, ERROR_MESSAGE);
     }
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
index b1cb94812c..c9554ce9bf 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
@@ -38,7 +38,6 @@ import org.apache.datasketches.theta.Union;
  */
 public class ThetaSketchAccumulator {
   private ArrayList<Sketch> _accumulator;
-  private boolean _ordered = false;
   private SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
   private Union _union;
   private int _threshold;
@@ -51,16 +50,11 @@ public class ThetaSketchAccumulator {
   // happens on serialization. Therefore, when deserialized, the values may be null and will
   // require re-initialisation. Since the primary use case is at query time for the Broker
   // and Server, these properties are already in memory and are re-set.
-  public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, boolean ordered, int threshold) {
+  public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, int threshold) {
     _setOperationBuilder = setOperationBuilder;
-    _ordered = ordered;
     _threshold = threshold;
   }
 
-  public void setOrdered(boolean ordered) {
-    _ordered = ordered;
-  }
-
   public void setSetOperationBuilder(SetOperationBuilder setOperationBuilder) {
     _setOperationBuilder = setOperationBuilder;
   }
@@ -111,12 +105,12 @@ public class ThetaSketchAccumulator {
     }
     // Return the default update "gadget" sketch as a compact sketch
     if (isEmpty()) {
-      return _union.getResult(_ordered, null);
+      return _union.getResult(false, null);
     }
     // Corner-case: the parameters are not strictly respected when there is a single sketch.
     // This single sketch might have been the result of a previously accumulated union and
     // would already have the parameters set.  The sketch is returned as-is without adjusting
-    // ordering and nominal entries which requires an additional union operation.
+    // nominal entries which requires an additional union operation.
     if (_numInputs == 1) {
       return _accumulator.get(0);
     }
@@ -136,6 +130,6 @@ public class ThetaSketchAccumulator {
     }
     _accumulator.clear();
 
-    return _union.getResult(_ordered, null);
+    return _union.getResult(false, null);
   }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
index d34ddd6447..f705e9e45b 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
@@ -39,7 +39,7 @@ public class ThetaSketchAccumulatorTest {
 
   @Test
   public void testEmptyAccumulator() {
-    ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, false, 2);
+    ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, 2);
     Assert.assertTrue(accumulator.isEmpty());
     Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
   }
@@ -50,7 +50,7 @@ public class ThetaSketchAccumulatorTest {
     IntStream.range(0, 1000).forEach(input::update);
     Sketch sketch = input.compact();
 
-    ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, false, 2);
+    ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, 2);
     accumulator.apply(sketch);
 
     Assert.assertFalse(accumulator.isEmpty());
@@ -66,9 +66,9 @@ public class ThetaSketchAccumulatorTest {
     IntStream.range(1000, 2000).forEach(input2::update);
     Sketch sketch2 = input2.compact();
 
-    ThetaSketchAccumulator accumulator1 = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+    ThetaSketchAccumulator accumulator1 = new ThetaSketchAccumulator(_setOperationBuilder, 3);
     accumulator1.apply(sketch1);
-    ThetaSketchAccumulator accumulator2 = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+    ThetaSketchAccumulator accumulator2 = new ThetaSketchAccumulator(_setOperationBuilder, 3);
     accumulator2.apply(sketch2);
     accumulator1.merge(accumulator2);
 
@@ -84,7 +84,7 @@ public class ThetaSketchAccumulatorTest {
     IntStream.range(1000, 2000).forEach(input2::update);
     Sketch sketch2 = input2.compact();
 
-    ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+    ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, 3);
     accumulator.apply(sketch1);
     accumulator.apply(sketch2);
 
@@ -93,8 +93,8 @@ public class ThetaSketchAccumulatorTest {
 
   @Test
   public void testUnionWithEmptyInput() {
-    ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
-    ThetaSketchAccumulator emptyAccumulator = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+    ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, 3);
+    ThetaSketchAccumulator emptyAccumulator = new ThetaSketchAccumulator(_setOperationBuilder, 3);
 
     accumulator.merge(emptyAccumulator);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org