You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/12/15 20:08:32 UTC
(pinot) branch master updated: Remove parameters from ThetaSketchAggregation function (#12147)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new aa834f4ccc Remove parameters from ThetaSketchAggregation function (#12147)
aa834f4ccc is described below
commit aa834f4ccce46326b339348efd60f1bd1df162fa
Author: David Cromberge <da...@gmail.com>
AuthorDate: Fri Dec 15 20:08:26 2023 +0000
Remove parameters from ThetaSketchAggregation function (#12147)
---
...inctCountRawThetaSketchAggregationFunction.java | 1 -
...istinctCountThetaSketchAggregationFunction.java | 45 ++++------------------
.../pinot/core/common/ObjectSerDeUtilsTest.java | 6 +--
.../local/customobject/ThetaSketchAccumulator.java | 14 ++-----
.../customobject/ThetaSketchAccumulatorTest.java | 14 +++----
5 files changed, 21 insertions(+), 59 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
index 00d6ec2906..b809193b80 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawThetaSketchAggregationFunction.java
@@ -54,7 +54,6 @@ public class DistinctCountRawThetaSketchAggregationFunction extends DistinctCoun
List<Sketch> mergedSketches = new ArrayList<>(numAccumulators);
for (ThetaSketchAccumulator accumulator : accumulators) {
- accumulator.setOrdered(_intermediateOrdering);
accumulator.setThreshold(_accumulatorThreshold);
accumulator.setSetOperationBuilder(_setOperationBuilder);
mergedSketches.add(accumulator.getResult());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
index 0fea53db34..4a9a846acd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
-import org.apache.datasketches.common.ResizeFactor;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.AnotB;
import org.apache.datasketches.theta.Intersection;
@@ -70,13 +69,11 @@ import org.apache.pinot.sql.parsers.CalciteSqlParser;
* 'dimName=''course'' AND dimValue=''math''', 'SET_INTERSECT($1,$2)')
* </li>
* </ul>
- * Currently, there are 5 parameters to the function:
+ * Currently, there are 3 parameters to the function:
* <ul>
* <li>
* nominalEntries: The nominal entries used to create the sketch. (Default 4096)
- * resizeFactor: Controls the size multiple that affects how fast the internal cache grows (Default 2^3=8)
* samplingProbability: Sets the upfront uniform sampling probability, p. (Default 1.0)
- * intermediateOrdering: Whether compacted sketches should be ordered. (Default false)
* accumulatorThreshold: How many sketches should be kept in memory before merging. (Default 2)
* </li>
* </ul>
@@ -90,15 +87,12 @@ public class DistinctCountThetaSketchAggregationFunction
private static final String SET_DIFF = "setdiff";
private static final String DEFAULT_SKETCH_IDENTIFIER = "$0";
private static final int DEFAULT_ACCUMULATOR_THRESHOLD = 2;
- private static final boolean DEFAULT_INTERMEDIATE_ORDERING = false;
-
private final List<ExpressionContext> _inputExpressions;
private final boolean _includeDefaultSketch;
private final List<FilterEvaluator> _filterEvaluators;
private final ExpressionContext _postAggregationExpression;
private final UpdateSketchBuilder _updateSketchBuilder = new UpdateSketchBuilder();
protected final SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
- protected boolean _intermediateOrdering = DEFAULT_INTERMEDIATE_ORDERING;
protected int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
public DistinctCountThetaSketchAggregationFunction(List<ExpressionContext> arguments) {
@@ -113,8 +107,6 @@ public class DistinctCountThetaSketchAggregationFunction
Parameters parameters = new Parameters(paramsExpression.getLiteral().getStringValue());
// Allows the user to trade-off memory usage for merge CPU; higher values use more memory
_accumulatorThreshold = parameters.getAccumulatorThreshold();
- // Ordering controls whether intermediate compact sketches are ordered in set operations
- _intermediateOrdering = parameters.getIntermediateOrdering();
// Nominal entries controls sketch accuracy and size
int nominalEntries = parameters.getNominalEntries();
_updateSketchBuilder.setNominalEntries(nominalEntries);
@@ -123,10 +115,6 @@ public class DistinctCountThetaSketchAggregationFunction
float p = parameters.getSamplingProbability();
_setOperationBuilder.setP(p);
_updateSketchBuilder.setP(p);
- // Resize factor controls the size multiple that affects how fast the internal cache grows
- ResizeFactor rf = parameters.getResizeFactor();
- _setOperationBuilder.setResizeFactor(rf);
- _updateSketchBuilder.setResizeFactor(rf);
}
if (numArguments < 4) {
@@ -953,7 +941,7 @@ public class DistinctCountThetaSketchAggregationFunction
int numSketches = _filterEvaluators.size() + 1;
List<ThetaSketchAccumulator> sketches = new ArrayList<>(numSketches);
for (int i = 0; i < numSketches; i++) {
- sketches.add(new ThetaSketchAccumulator(_setOperationBuilder, _intermediateOrdering, _accumulatorThreshold));
+ sketches.add(new ThetaSketchAccumulator(_setOperationBuilder, _accumulatorThreshold));
}
return sketches;
}
@@ -963,7 +951,7 @@ public class DistinctCountThetaSketchAggregationFunction
ArrayList<ThetaSketchAccumulator> thetaSketchAccumulators = new ArrayList<>(numSketches);
for (Object o : result) {
ThetaSketchAccumulator thetaSketchAccumulator =
- new ThetaSketchAccumulator(_setOperationBuilder, _intermediateOrdering, _accumulatorThreshold);
+ new ThetaSketchAccumulator(_setOperationBuilder, _accumulatorThreshold);
thetaSketchAccumulator.apply((Sketch) o);
thetaSketchAccumulators.add(thetaSketchAccumulator);
}
@@ -982,7 +970,7 @@ public class DistinctCountThetaSketchAggregationFunction
ArrayList<ThetaSketchAccumulator> thetaSketchAccumulators = new ArrayList<>(numSketches);
for (Object o : result) {
ThetaSketchAccumulator thetaSketchAccumulator =
- new ThetaSketchAccumulator(_setOperationBuilder, _intermediateOrdering, _accumulatorThreshold);
+ new ThetaSketchAccumulator(_setOperationBuilder, _accumulatorThreshold);
thetaSketchAccumulator.apply((Sketch) o);
thetaSketchAccumulators.add(thetaSketchAccumulator);
}
@@ -1029,7 +1017,6 @@ public class DistinctCountThetaSketchAggregationFunction
List<Sketch> mergedSketches = new ArrayList<>(numAccumulators);
for (ThetaSketchAccumulator accumulator : accumulators) {
- accumulator.setOrdered(_intermediateOrdering);
accumulator.setThreshold(_accumulatorThreshold);
accumulator.setSetOperationBuilder(_setOperationBuilder);
mergedSketches.add(accumulator.getResult());
@@ -1270,7 +1257,7 @@ public class DistinctCountThetaSketchAggregationFunction
List<ThetaSketchAccumulator> unions = new ArrayList<>(numUnions);
for (int i = 0; i < numUnions; i++) {
ThetaSketchAccumulator thetaSketchAccumulator =
- new ThetaSketchAccumulator(_setOperationBuilder, _intermediateOrdering, _accumulatorThreshold);
+ new ThetaSketchAccumulator(_setOperationBuilder, _accumulatorThreshold);
unions.add(thetaSketchAccumulator);
}
return unions;
@@ -1311,18 +1298,18 @@ public class DistinctCountThetaSketchAggregationFunction
for (ExpressionContext argument : arguments) {
union.union(evaluatePostAggregationExpression(argument, sketches));
}
- return union.getResult(_intermediateOrdering, null);
+ return union.getResult(false, null);
case SET_INTERSECT:
Intersection intersection = _setOperationBuilder.buildIntersection();
for (ExpressionContext argument : arguments) {
intersection.intersect(evaluatePostAggregationExpression(argument, sketches));
}
- return intersection.getResult(_intermediateOrdering, null);
+ return intersection.getResult(false, null);
case SET_DIFF:
AnotB diff = _setOperationBuilder.buildANotB();
diff.setA(evaluatePostAggregationExpression(arguments.get(0), sketches));
diff.notB(evaluatePostAggregationExpression(arguments.get(1), sketches));
- return diff.getResult(_intermediateOrdering, null, false);
+ return diff.getResult(false, null, false);
default:
throw new IllegalStateException();
}
@@ -1336,15 +1323,11 @@ public class DistinctCountThetaSketchAggregationFunction
private static final char PARAMETER_DELIMITER = ';';
private static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
private static final String NOMINAL_ENTRIES_KEY = "nominalEntries";
- private static final String RESIZE_FACTOR_KEY = "resizeFactor";
private static final String SAMPLING_PROBABILITY_KEY = "samplingProbability";
- private static final String INTERMEDIATE_ORDERING_KEY = "intermediateOrdering";
private static final String ACCUMULATOR_THRESHOLD_KEY = "accumulatorThreshold";
- private int _resizeFactor = ResizeFactor.X8.getValue();
private int _nominalEntries = ThetaUtil.DEFAULT_NOMINAL_ENTRIES;
private int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
- private boolean _intermediateOrdering = DEFAULT_INTERMEDIATE_ORDERING;
private float _samplingProbability = 1.0F;
Parameters(String parametersString) {
@@ -1359,10 +1342,6 @@ public class DistinctCountThetaSketchAggregationFunction
_nominalEntries = Integer.parseInt(value);
} else if (key.equalsIgnoreCase(SAMPLING_PROBABILITY_KEY)) {
_samplingProbability = Float.parseFloat(value);
- } else if (key.equalsIgnoreCase(RESIZE_FACTOR_KEY)) {
- _resizeFactor = Integer.parseInt(value);
- } else if (key.equalsIgnoreCase(INTERMEDIATE_ORDERING_KEY)) {
- _intermediateOrdering = Boolean.parseBoolean(value);
} else if (key.equalsIgnoreCase(ACCUMULATOR_THRESHOLD_KEY)) {
_accumulatorThreshold = Integer.parseInt(value);
} else {
@@ -1379,17 +1358,9 @@ public class DistinctCountThetaSketchAggregationFunction
return _samplingProbability;
}
- boolean getIntermediateOrdering() {
- return _intermediateOrdering;
- }
-
int getAccumulatorThreshold() {
return _accumulatorThreshold;
}
-
- ResizeFactor getResizeFactor() {
- return ResizeFactor.getRF(_resizeFactor);
- }
}
/**
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index b9f6f985b5..01c39b0105 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -504,15 +504,14 @@ public class ObjectSerDeUtilsTest {
for (int i = 0; i < NUM_ITERATIONS; i++) {
UpdateSketch input = Sketches.updateSketchBuilder().build();
int size = RANDOM.nextInt(100) + 10;
- boolean shouldOrder = RANDOM.nextBoolean();
for (int j = 0; j < size; j++) {
input.update(j);
}
SetOperationBuilder setOperationBuilder = new SetOperationBuilder();
- ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(setOperationBuilder, shouldOrder, 2);
- Sketch sketch = input.compact(shouldOrder, null);
+ ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(setOperationBuilder, 2);
+ Sketch sketch = input.compact(false, null);
accumulator.apply(sketch);
byte[] bytes = ObjectSerDeUtils.serialize(accumulator);
@@ -521,7 +520,6 @@ public class ObjectSerDeUtilsTest {
assertEquals(actual.getResult().getEstimate(), sketch.getEstimate(), ERROR_MESSAGE);
assertEquals(actual.getResult().toByteArray(), sketch.toByteArray(), ERROR_MESSAGE);
- assertEquals(actual.getResult().isOrdered(), shouldOrder, ERROR_MESSAGE);
}
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
index b1cb94812c..c9554ce9bf 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulator.java
@@ -38,7 +38,6 @@ import org.apache.datasketches.theta.Union;
*/
public class ThetaSketchAccumulator {
private ArrayList<Sketch> _accumulator;
- private boolean _ordered = false;
private SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
private Union _union;
private int _threshold;
@@ -51,16 +50,11 @@ public class ThetaSketchAccumulator {
// happens on serialization. Therefore, when deserialized, the values may be null and will
// require re-initialisation. Since the primary use case is at query time for the Broker
// and Server, these properties are already in memory and are re-set.
- public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, boolean ordered, int threshold) {
+ public ThetaSketchAccumulator(SetOperationBuilder setOperationBuilder, int threshold) {
_setOperationBuilder = setOperationBuilder;
- _ordered = ordered;
_threshold = threshold;
}
- public void setOrdered(boolean ordered) {
- _ordered = ordered;
- }
-
public void setSetOperationBuilder(SetOperationBuilder setOperationBuilder) {
_setOperationBuilder = setOperationBuilder;
}
@@ -111,12 +105,12 @@ public class ThetaSketchAccumulator {
}
// Return the default update "gadget" sketch as a compact sketch
if (isEmpty()) {
- return _union.getResult(_ordered, null);
+ return _union.getResult(false, null);
}
// Corner-case: the parameters are not strictly respected when there is a single sketch.
// This single sketch might have been the result of a previously accumulated union and
// would already have the parameters set. The sketch is returned as-is without adjusting
- // ordering and nominal entries which requires an additional union operation.
+ // nominal entries which requires an additional union operation.
if (_numInputs == 1) {
return _accumulator.get(0);
}
@@ -136,6 +130,6 @@ public class ThetaSketchAccumulator {
}
_accumulator.clear();
- return _union.getResult(_ordered, null);
+ return _union.getResult(false, null);
}
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
index d34ddd6447..f705e9e45b 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/ThetaSketchAccumulatorTest.java
@@ -39,7 +39,7 @@ public class ThetaSketchAccumulatorTest {
@Test
public void testEmptyAccumulator() {
- ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, false, 2);
+ ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, 2);
Assert.assertTrue(accumulator.isEmpty());
Assert.assertEquals(accumulator.getResult().getEstimate(), 0.0);
}
@@ -50,7 +50,7 @@ public class ThetaSketchAccumulatorTest {
IntStream.range(0, 1000).forEach(input::update);
Sketch sketch = input.compact();
- ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, false, 2);
+ ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, 2);
accumulator.apply(sketch);
Assert.assertFalse(accumulator.isEmpty());
@@ -66,9 +66,9 @@ public class ThetaSketchAccumulatorTest {
IntStream.range(1000, 2000).forEach(input2::update);
Sketch sketch2 = input2.compact();
- ThetaSketchAccumulator accumulator1 = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+ ThetaSketchAccumulator accumulator1 = new ThetaSketchAccumulator(_setOperationBuilder, 3);
accumulator1.apply(sketch1);
- ThetaSketchAccumulator accumulator2 = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+ ThetaSketchAccumulator accumulator2 = new ThetaSketchAccumulator(_setOperationBuilder, 3);
accumulator2.apply(sketch2);
accumulator1.merge(accumulator2);
@@ -84,7 +84,7 @@ public class ThetaSketchAccumulatorTest {
IntStream.range(1000, 2000).forEach(input2::update);
Sketch sketch2 = input2.compact();
- ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+ ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, 3);
accumulator.apply(sketch1);
accumulator.apply(sketch2);
@@ -93,8 +93,8 @@ public class ThetaSketchAccumulatorTest {
@Test
public void testUnionWithEmptyInput() {
- ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
- ThetaSketchAccumulator emptyAccumulator = new ThetaSketchAccumulator(_setOperationBuilder, true, 3);
+ ThetaSketchAccumulator accumulator = new ThetaSketchAccumulator(_setOperationBuilder, 3);
+ ThetaSketchAccumulator emptyAccumulator = new ThetaSketchAccumulator(_setOperationBuilder, 3);
accumulator.merge(emptyAccumulator);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org