You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/09/03 22:33:38 UTC
[incubator-pinot] branch master updated: Add
RawThetaSketchAggregationFunction (#5970)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a21ec8a Add RawThetaSketchAggregationFunction (#5970)
a21ec8a is described below
commit a21ec8ac47a018efdbadf06160e2e6d46c8f7287
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Sep 3 15:33:25 2020 -0700
Add RawThetaSketchAggregationFunction (#5970)
Introduce `RawThetaSketchAggregationFunction` which collects the values for a given expression (can be single-valued or multi-valued) into a Sketch object, and returns the sketch as a base64 encoded string. It treats `BYTES` expression as serialized sketches.
It takes an optional second argument as the parameters for the function, e.g. `RAW_THETA_SKETCH(col, 'nominalEntries=8192')`.
---
.../common/function/AggregationFunctionType.java | 1 +
.../core/query/aggregation/ThetaSketchParams.java | 66 ---
.../function/AggregationFunctionFactory.java | 2 +
...istinctCountThetaSketchAggregationFunction.java | 60 +--
.../RawThetaSketchAggregationFunction.java | 563 +++++++++++++++++++++
.../BrokerRequestToQueryContextConverter.java | 7 +-
.../queries/DistinctCountThetaSketchTest.java | 226 ++++-----
7 files changed, 685 insertions(+), 240 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index 2f3085c..5f96e53 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -43,6 +43,7 @@ public enum AggregationFunctionType {
PERCENTILE("percentile"),
PERCENTILEEST("percentileEst"),
PERCENTILETDIGEST("percentileTDigest"),
+ RAWTHETASKETCH("rawThetaSketch"),
IDSET("idSet"),
// Geo aggregation functions
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ThetaSketchParams.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ThetaSketchParams.java
deleted file mode 100644
index c9c1f03..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ThetaSketchParams.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.aggregation;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.pinot.spi.utils.JsonUtils;
-
-
-/**
- * Class to hold Theta Sketch Params, and is Json serializable.
- */
-@SuppressWarnings("unused")
-public class ThetaSketchParams {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final String PARAMS_DELIMITER = ";";
- private static final String PARAM_KEY_VALUE_SEPARATOR = "=";
-
- @JsonProperty("nominalEntries")
- private int _nominalEntries;
-
- public int getNominalEntries() {
- return _nominalEntries;
- }
-
- public void setNominalEntries(int nominalEntries) {
- _nominalEntries = nominalEntries;
- }
-
- /**
- * Creates a ThetaSketchParams object from the specified string. The specified string is
- * expected to be of form: "key1 = value1; key2 = value2.."
- * @param paramsString Param in string form
- * @return Param object, null if string is null or empty
- */
- public static ThetaSketchParams fromString(String paramsString) {
- if (paramsString == null || paramsString.isEmpty()) {
- return null;
- }
-
- ObjectNode objectNode = JsonUtils.newObjectNode();
- for (String pair : paramsString.split(PARAMS_DELIMITER)) {
- String[] keyValue = pair.split(PARAM_KEY_VALUE_SEPARATOR);
- objectNode.put(keyValue[0].replaceAll("\\s", ""), keyValue[1].replaceAll("\\s", ""));
- }
-
- return OBJECT_MAPPER.convertValue(objectNode, ThetaSketchParams.class);
- }
-}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index d74db3e..f0a7d8e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -137,6 +137,8 @@ public class AggregationFunctionFactory {
return new DistinctCountThetaSketchAggregationFunction(arguments);
case DISTINCTCOUNTRAWTHETASKETCH:
return new DistinctCountRawThetaSketchAggregationFunction(arguments);
+ case RAWTHETASKETCH:
+ return new RawThetaSketchAggregationFunction(arguments);
case IDSET:
return new IdSetAggregationFunction(arguments);
case COUNTMV:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
index 42b8eaf..049bead 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountThetaSketchAggregationFunction.java
@@ -32,7 +32,6 @@ import org.apache.commons.collections.MapUtils;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.AnotB;
import org.apache.datasketches.theta.Intersection;
-import org.apache.datasketches.theta.SetOperation;
import org.apache.datasketches.theta.SetOperationBuilder;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
@@ -43,7 +42,7 @@ import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.ThetaSketchParams;
+import org.apache.pinot.core.query.aggregation.function.RawThetaSketchAggregationFunction.Parameters;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.core.query.request.context.ExpressionContext;
@@ -70,16 +69,14 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
public static final String CSV_VALUES = String.join(",", STRING_VALUES);
public static boolean isValid(String name) {
- return SET_UNION.name().equalsIgnoreCase(name)
- || SET_INTERSECT.name().equalsIgnoreCase(name)
- || SET_DIFF.name().equalsIgnoreCase(name);
+ return SET_UNION.name().equalsIgnoreCase(name) || SET_INTERSECT.name().equalsIgnoreCase(name) || SET_DIFF.name()
+ .equalsIgnoreCase(name);
}
}
private static final Pattern ARGUMENT_SUBSTITUTION = Pattern.compile("\\$(\\d+)");
private final ExpressionContext _thetaSketchColumn;
- private final ThetaSketchParams _thetaSketchParams;
private final SetOperationBuilder _setOperationBuilder;
private final List<ExpressionContext> _inputExpressions;
private final ExpressionContext _postAggregationExpression;
@@ -111,13 +108,13 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
"First argument of DistinctCountThetaSketch must be identifier (theta-sketch column)");
// Initialize the theta-sketch parameters
- ExpressionContext paramsExpression = arguments.get(1);
- Preconditions.checkArgument(paramsExpression.getType() == ExpressionContext.Type.LITERAL,
+ ExpressionContext parametersExpression = arguments.get(1);
+ Preconditions.checkArgument(parametersExpression.getType() == ExpressionContext.Type.LITERAL,
"Second argument of DistinctCountThetaSketch must be literal (parameters)");
- _thetaSketchParams = ThetaSketchParams.fromString(paramsExpression.getLiteral());
+ Parameters parameters = new Parameters(parametersExpression.getLiteral());
// Initialize the theta-sketch set operation builder
- _setOperationBuilder = getSetOperationBuilder();
+ _setOperationBuilder = new SetOperationBuilder().setNominalEntries(parameters.getNominalEntries());
// Index of the original input predicates
// This list is zero indexed, whereas argument substitution is 1-indexed: index[0] = $1
@@ -131,7 +128,7 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
// Initialize the post-aggregation expression
// NOTE: It is modeled as a filter
ExpressionContext postAggregationExpression = arguments.get(numArguments - 1);
- Preconditions.checkArgument(paramsExpression.getType() == ExpressionContext.Type.LITERAL,
+ Preconditions.checkArgument(parametersExpression.getType() == ExpressionContext.Type.LITERAL,
"Last argument of DistinctCountThetaSketch must be literal (post-aggregation expression)");
_postAggregationExpression = QueryContextConverterUtils
.getExpression(CalciteSqlParser.compileToExpression(postAggregationExpression.getLiteral()));
@@ -448,7 +445,7 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
Sketch sketch = intermediateResult2.get(predicate);
if (sketch != null) {
// Merge the overlapping ones
- Union union = getSetOperationBuilder().buildUnion();
+ Union union = _setOperationBuilder.buildUnion();
union.update(entry.getValue());
union.update(sketch);
mergedResult.put(predicate, union.getResult());
@@ -485,7 +482,7 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
@Override
public Long extractFinalResult(Map<String, Sketch> intermediateResult) {
Sketch finalSketch = extractFinalSketch(intermediateResult);
- return Math.round(finalSketch.getEstimate());
+ return finalSketch != null ? Math.round(finalSketch.getEstimate()) : 0;
}
private Predicate getPredicate(String predicateString) {
@@ -544,25 +541,22 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
* @param sketchMap Precomputed sketches for predicates that are part of the expression.
* @return Overall evaluated sketch for the expression.
*/
- private Sketch evalPostAggregationExpression(
- ExpressionContext postAggregationExpression,
+ private Sketch evalPostAggregationExpression(ExpressionContext postAggregationExpression,
Map<Predicate, Sketch> sketchMap) {
if (postAggregationExpression.getType() == ExpressionContext.Type.LITERAL) {
throw new IllegalArgumentException("Literal not supported in post-aggregation function");
}
if (postAggregationExpression.getType() == ExpressionContext.Type.IDENTIFIER) {
- final Predicate exp =
- _predicates.get(extractSubstitutionPosition(postAggregationExpression.getLiteral()) - 1);
+ final Predicate exp = _predicates.get(extractSubstitutionPosition(postAggregationExpression.getLiteral()) - 1);
return sketchMap.get(exp);
}
// shouldn't throw exception because of the validation in the constructor
- MergeFunction func =
- MergeFunction.valueOf(postAggregationExpression.getFunction().getFunctionName().toUpperCase());
+ MergeFunction func = MergeFunction.valueOf(postAggregationExpression.getFunction().getFunctionName().toUpperCase());
// handle functions recursively
- switch(func) {
+ switch (func) {
case SET_UNION:
Union union = _setOperationBuilder.buildUnion();
for (ExpressionContext exp : postAggregationExpression.getFunction().getArguments()) {
@@ -583,10 +577,8 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
diff.update(a, b);
return diff.getResult();
default:
- throw new IllegalStateException(
- String.format(
- "Invalid post-aggregation function: %s",
- postAggregationExpression.getFunction().getFunctionName().toUpperCase()));
+ throw new IllegalStateException(String.format("Invalid post-aggregation function: %s",
+ postAggregationExpression.getFunction().getFunctionName().toUpperCase()));
}
}
@@ -606,16 +598,6 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
}
/**
- * Returns the theta-sketch SetOperation builder properly configured.
- * Currently, only setting of nominalEntries is supported.
- * @return SetOperationBuilder
- */
- private SetOperationBuilder getSetOperationBuilder() {
- return _thetaSketchParams == null ? SetOperation.builder()
- : SetOperation.builder().setNominalEntries(_thetaSketchParams.getNominalEntries());
- }
-
- /**
* Validates that the function context's substitution parameters ($1, $2, etc) does not exceed the number
* of predicates passed into the post-aggregation function.
*
@@ -636,19 +618,19 @@ public class DistinctCountThetaSketchAggregationFunction implements AggregationF
if (context.getType() == ExpressionContext.Type.IDENTIFIER) {
int id = extractSubstitutionPosition(context.getIdentifier());
- if (id <= 0)
+ if (id <= 0) {
throw new IllegalArgumentException("Argument substitution starts at $1");
- if (id > numPredicates)
+ }
+ if (id > numPredicates) {
throw new IllegalArgumentException("Argument substitution exceeded number of predicates");
+ }
// if none of the invalid conditions are met above, exit out early
return;
}
if (!MergeFunction.isValid(context.getFunction().getFunctionName())) {
throw new IllegalArgumentException(
- String.format(
- "Invalid Theta Sketch aggregation function. Allowed: [%s]",
- MergeFunction.CSV_VALUES));
+ String.format("Invalid Theta Sketch aggregation function. Allowed: [%s]", MergeFunction.CSV_VALUES));
}
switch (MergeFunction.valueOf(context.getFunction().getFunctionName().toUpperCase())) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/RawThetaSketchAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/RawThetaSketchAggregationFunction.java
new file mode 100644
index 0000000..94c952d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/RawThetaSketchAggregationFunction.java
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.datasketches.Util;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.SetOperationBuilder;
+import org.apache.datasketches.theta.Sketch;
+import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.theta.UpdateSketch;
+import org.apache.datasketches.theta.UpdateSketchBuilder;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * The {@code RawThetaSketchAggregationFunction} collects the values for a given expression (can be single-valued or
+ * multi-valued) into a {@link Sketch} object, and returns the sketch as a base64 encoded string. It treats BYTES
+ * expression as serialized sketches.
+ * <p>The function takes an optional second argument as the parameters for the function. Currently there is only 1
+ * parameter for the function:
+ * <ul>
+ * <li>
+ * nominalEntries: The nominal entries used to create the sketch. (Default 4096)
+ * </li>
+ * </ul>
+ * <p>Example: RAW_THETA_SKETCH(col, 'nominalEntries=8192')
+ */
+public class RawThetaSketchAggregationFunction extends BaseSingleInputAggregationFunction<Sketch, String> {
+ private final UpdateSketchBuilder _updateSketchBuilder = new UpdateSketchBuilder();
+ private final SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
+
+ public RawThetaSketchAggregationFunction(List<ExpressionContext> arguments) {
+ super(arguments.get(0));
+
+ // Optional second argument for theta-sketch parameters
+ if (arguments.size() > 1) {
+ ExpressionContext paramsExpression = arguments.get(1);
+ Preconditions.checkArgument(paramsExpression.getType() == ExpressionContext.Type.LITERAL,
+ "Second argument of RAW_THETA_SKETCH aggregation function must be literal (parameters)");
+ Parameters parameters = new Parameters(paramsExpression.getLiteral());
+ int nominalEntries = parameters.getNominalEntries();
+ _updateSketchBuilder.setNominalEntries(nominalEntries);
+ _setOperationBuilder.setNominalEntries(nominalEntries);
+ }
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.RAWTHETASKETCH;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ FieldSpec.DataType valueType = blockValSet.getValueType();
+
+ if (valueType != FieldSpec.DataType.BYTES) {
+ UpdateSketch updateSketch = getUpdateSketch(aggregationResultHolder);
+ if (blockValSet.isSingleValue()) {
+ switch (valueType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ updateSketch.update(intValues[i]);
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ updateSketch.update(longValues[i]);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ updateSketch.update(floatValues[i]);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ updateSketch.update(doubleValues[i]);
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ updateSketch.update(stringValues[i]);
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal single-value data type for RAW_THETA_SKETCH aggregation function: " + valueType);
+ }
+ } else {
+ switch (valueType) {
+ case INT:
+ int[][] intValues = blockValSet.getIntValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (int value : intValues[i]) {
+ updateSketch.update(value);
+ }
+ }
+ break;
+ case LONG:
+ long[][] longValues = blockValSet.getLongValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (long value : longValues[i]) {
+ updateSketch.update(value);
+ }
+ }
+ break;
+ case FLOAT:
+ float[][] floatValues = blockValSet.getFloatValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (float value : floatValues[i]) {
+ updateSketch.update(value);
+ }
+ }
+ break;
+ case DOUBLE:
+ double[][] doubleValues = blockValSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (double value : doubleValues[i]) {
+ updateSketch.update(value);
+ }
+ }
+ break;
+ case STRING:
+ String[][] stringValues = blockValSet.getStringValuesMV();
+ for (int i = 0; i < length; i++) {
+ for (String value : stringValues[i]) {
+ updateSketch.update(value);
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal multi-value data type for RAW_THETA_SKETCH aggregation function: " + valueType);
+ }
+ }
+ } else {
+ // Serialized sketch
+ Union union = getUnion(aggregationResultHolder);
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ union.update(Sketch.wrap(Memory.wrap(bytesValues[i])));
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ FieldSpec.DataType valueType = blockValSet.getValueType();
+
+ if (blockValSet.isSingleValue()) {
+ switch (valueType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ getUpdateSketch(groupByResultHolder, groupKeyArray[i]).update(intValues[i]);
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ getUpdateSketch(groupByResultHolder, groupKeyArray[i]).update(longValues[i]);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ getUpdateSketch(groupByResultHolder, groupKeyArray[i]).update(floatValues[i]);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ getUpdateSketch(groupByResultHolder, groupKeyArray[i]).update(doubleValues[i]);
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ getUpdateSketch(groupByResultHolder, groupKeyArray[i]).update(stringValues[i]);
+ }
+ break;
+ case BYTES:
+ // Serialized sketch
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ getUnion(groupByResultHolder, groupKeyArray[i]).update(Sketch.wrap(Memory.wrap(bytesValues[i])));
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal single-value data type for RAW_THETA_SKETCH aggregation function: " + valueType);
+ }
+ } else {
+ switch (valueType) {
+ case INT:
+ int[][] intValues = blockValSet.getIntValuesMV();
+ for (int i = 0; i < length; i++) {
+ UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKeyArray[i]);
+ for (int value : intValues[i]) {
+ updateSketch.update(value);
+ }
+ }
+ break;
+ case LONG:
+ long[][] longValues = blockValSet.getLongValuesMV();
+ for (int i = 0; i < length; i++) {
+ UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKeyArray[i]);
+ for (long value : longValues[i]) {
+ updateSketch.update(value);
+ }
+ }
+ break;
+ case FLOAT:
+ float[][] floatValues = blockValSet.getFloatValuesMV();
+ for (int i = 0; i < length; i++) {
+ UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKeyArray[i]);
+ for (float value : floatValues[i]) {
+ updateSketch.update(value);
+ }
+ }
+ break;
+ case DOUBLE:
+ double[][] doubleValues = blockValSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKeyArray[i]);
+ for (double value : doubleValues[i]) {
+ updateSketch.update(value);
+ }
+ }
+ break;
+ case STRING:
+ String[][] stringValues = blockValSet.getStringValuesMV();
+ for (int i = 0; i < length; i++) {
+ UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKeyArray[i]);
+ for (String value : stringValues[i]) {
+ updateSketch.update(value);
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal multi-value data type for RAW_THETA_SKETCH aggregation function: " + valueType);
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ FieldSpec.DataType valueType = blockValSet.getValueType();
+
+ if (blockValSet.isSingleValue()) {
+ switch (valueType) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int i = 0; i < length; i++) {
+ int value = intValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ getUpdateSketch(groupByResultHolder, groupKey).update(value);
+ }
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ long value = longValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ getUpdateSketch(groupByResultHolder, groupKey).update(value);
+ }
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int i = 0; i < length; i++) {
+ float value = floatValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ getUpdateSketch(groupByResultHolder, groupKey).update(value);
+ }
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int i = 0; i < length; i++) {
+ double value = doubleValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ getUpdateSketch(groupByResultHolder, groupKey).update(value);
+ }
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int i = 0; i < length; i++) {
+ String value = stringValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ getUpdateSketch(groupByResultHolder, groupKey).update(value);
+ }
+ }
+ break;
+ case BYTES:
+ // Serialized sketch
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ Sketch sketch = Sketch.wrap(Memory.wrap(bytesValues[i]));
+ for (int groupKey : groupKeysArray[i]) {
+ getUnion(groupByResultHolder, groupKey).update(sketch);
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal single-value data type for RAW_THETA_SKETCH aggregation function: " + valueType);
+ }
+ } else {
+ switch (valueType) {
+ case INT:
+ int[][] intValues = blockValSet.getIntValuesMV();
+ for (int i = 0; i < length; i++) {
+ int[] values = intValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKey);
+ for (int value : values) {
+ updateSketch.update(value);
+ }
+ }
+ }
+ break;
+ case LONG:
+ long[][] longValues = blockValSet.getLongValuesMV();
+ for (int i = 0; i < length; i++) {
+ long[] values = longValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKey);
+ for (long value : values) {
+ updateSketch.update(value);
+ }
+ }
+ }
+ break;
+ case FLOAT:
+ float[][] floatValues = blockValSet.getFloatValuesMV();
+ for (int i = 0; i < length; i++) {
+ float[] values = floatValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKey);
+ for (float value : values) {
+ updateSketch.update(value);
+ }
+ }
+ }
+ break;
+ case DOUBLE:
+ double[][] doubleValues = blockValSet.getDoubleValuesMV();
+ for (int i = 0; i < length; i++) {
+ double[] values = doubleValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKey);
+ for (double value : values) {
+ updateSketch.update(value);
+ }
+ }
+ }
+ break;
+ case STRING:
+ String[][] stringValues = blockValSet.getStringValuesMV();
+ for (int i = 0; i < length; i++) {
+ String[] values = stringValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ UpdateSketch updateSketch = getUpdateSketch(groupByResultHolder, groupKey);
+ for (String value : values) {
+ updateSketch.update(value);
+ }
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException(
+ "Illegal multi-value data type for RAW_THETA_SKETCH aggregation function: " + valueType);
+ }
+ }
+ }
+
+ @Override
+ public Sketch extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+ Object result = aggregationResultHolder.getResult();
+ if (result == null) {
+ return _updateSketchBuilder.build();
+ } else {
+ if (result instanceof Sketch) {
+ return (Sketch) result;
+ } else {
+ assert result instanceof Union;
+ return ((Union) result).getResult();
+ }
+ }
+ }
+
+ @Override
+ public Sketch extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+ Object result = groupByResultHolder.getResult(groupKey);
+ if (result instanceof Sketch) {
+ return (Sketch) result;
+ } else {
+ assert result instanceof Union;
+ return ((Union) result).getResult();
+ }
+ }
+
+ @Override
+ public Sketch merge(Sketch sketch1, Sketch sketch2) {
+ Union union = _setOperationBuilder.buildUnion();
+ union.update(sketch1);
+ union.update(sketch2);
+ return union.getResult();
+ }
+
+ @Override
+ public boolean isIntermediateResultComparable() {
+ return false;
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.STRING;
+ }
+
+ @Override
+ public String extractFinalResult(Sketch sketch) {
+ return Base64.getEncoder().encodeToString(sketch.compact().toByteArray());
+ }
+
+ /**
+ * Returns the UpdateSketch from the result holder or creates a new one if it does not exist.
+ */
+ private UpdateSketch getUpdateSketch(AggregationResultHolder aggregationResultHolder) {
+ UpdateSketch updateSketch = aggregationResultHolder.getResult();
+ if (updateSketch == null) {
+ updateSketch = _updateSketchBuilder.build();
+ aggregationResultHolder.setValue(updateSketch);
+ }
+ return updateSketch;
+ }
+
+ /**
+ * Returns the Union from the result holder or creates a new one if it does not exist.
+ */
+ private Union getUnion(AggregationResultHolder aggregationResultHolder) {
+ Union union = aggregationResultHolder.getResult();
+ if (union == null) {
+ union = _setOperationBuilder.buildUnion();
+ aggregationResultHolder.setValue(union);
+ }
+ return union;
+ }
+
+ /**
+ * Returns the UpdateSketch for the given group key or creates a new one if it does not exist.
+ */
+ private UpdateSketch getUpdateSketch(GroupByResultHolder groupByResultHolder, int groupKey) {
+ UpdateSketch updateSketch = groupByResultHolder.getResult(groupKey);
+ if (updateSketch == null) {
+ updateSketch = _updateSketchBuilder.build();
+ groupByResultHolder.setValueForKey(groupKey, updateSketch);
+ }
+ return updateSketch;
+ }
+
+ /**
+ * Returns the UpdateSketch for the given group key or creates a new one if it does not exist.
+ */
+ private Union getUnion(GroupByResultHolder groupByResultHolder, int groupKey) {
+ Union union = groupByResultHolder.getResult(groupKey);
+ if (union == null) {
+ union = _setOperationBuilder.buildUnion();
+ groupByResultHolder.setValueForKey(groupKey, union);
+ }
+ return union;
+ }
+
+ /**
+ * Helper class to wrap the theta-sketch parameters.
+ */
+ static class Parameters {
+ private static final char PARAMETER_DELIMITER = ';';
+ private static final char PARAMETER_KEY_VALUE_SEPARATOR = '=';
+ private static final String NOMINAL_ENTRIES_KEY = "nominalEntries";
+
+ private int _nominalEntries = Util.DEFAULT_NOMINAL_ENTRIES;
+
+ Parameters(String parametersString) {
+ StringUtils.deleteWhitespace(parametersString);
+ String[] keyValuePairs = StringUtils.split(parametersString, PARAMETER_DELIMITER);
+ for (String keyValuePair : keyValuePairs) {
+ String[] keyAndValue = StringUtils.split(keyValuePair, PARAMETER_KEY_VALUE_SEPARATOR);
+ Preconditions.checkArgument(keyAndValue.length == 2, "Invalid parameter: %s", keyValuePair);
+ String key = keyAndValue[0];
+ String value = keyAndValue[1];
+ if (key.equalsIgnoreCase(NOMINAL_ENTRIES_KEY)) {
+ _nominalEntries = Integer.parseInt(value);
+ } else {
+ throw new IllegalArgumentException("Invalid parameter key: " + key);
+ }
+ }
+ }
+
+ int getNominalEntries() {
+ return _nominalEntries;
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
index 2859450..57beb3c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
@@ -120,10 +120,11 @@ public class BrokerRequestToQueryContextConverter {
List<ExpressionContext> arguments = new ArrayList<>(numArguments);
if (functionName.equalsIgnoreCase(AggregationFunctionType.DISTINCTCOUNTTHETASKETCH.getName()) || functionName
.equalsIgnoreCase(AggregationFunctionType.DISTINCTCOUNTRAWTHETASKETCH.getName()) || functionName
+ .equalsIgnoreCase(AggregationFunctionType.RAWTHETASKETCH.name()) || functionName
.equalsIgnoreCase(AggregationFunctionType.IDSET.getName())) {
- // NOTE: For DistinctCountThetaSketch, DistinctCountRawThetaSketch and IdSet, because of the legacy behavior
- // of PQL compiler treating string literal as identifier in aggregation, here we treat all expressions
- // except for the first one as string literal.
+ // NOTE: For DistinctCountThetaSketch, DistinctCountRawThetaSketch, RawThetaSketch and IdSet, because of the
+ // legacy behavior of PQL compiler treating string literal as identifier in aggregation, here we treat
+ // all expressions except for the first one as string literal.
arguments.add(QueryContextConverterUtils.getExpression(stringExpressions.get(0)));
for (int i = 1; i < numArguments; i++) {
arguments.add(ExpressionContext.forLiteral(stringExpressions.get(i)));
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
index 0ad20a6..2627b78 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
@@ -18,23 +18,21 @@
*/
package org.apache.pinot.queries;
-import static org.apache.pinot.core.query.aggregation.function.DistinctCountThetaSketchAggregationFunction.MergeFunction;
-
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
-import joptsimple.internal.Strings;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.UpdateSketch;
import org.apache.datasketches.theta.UpdateSketchBuilder;
import org.apache.pinot.common.function.AggregationFunctionType;
-import org.apache.pinot.common.response.broker.AggregationResult;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.GroupByResult;
import org.apache.pinot.common.segment.ReadMode;
@@ -52,14 +50,14 @@ import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+
/**
* Unit tests for {@link org.apache.pinot.core.query.aggregation.function.DistinctCountThetaSketchAggregationFunction}.
@@ -80,6 +78,7 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
private static final long RANDOM_SEED = System.nanoTime();
private static final Random RANDOM = new Random(RANDOM_SEED);
+ private static final String ERROR_MESSAGE = "Random seed: " + RANDOM_SEED;
private IndexSegment _indexSegment;
private List<IndexSegment> _indexSegments;
@@ -128,7 +127,7 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
@DataProvider(name = "badQueries")
public Object[][] badQueries() {
- return new Object[][] {
+ return new Object[][]{
// need at least 4 arguments in agg func
{"select distinctCountThetaSketch(colTS, 'nominalEntries=123', '$0') from testTable"},
// substitution arguments should start at $1
@@ -142,59 +141,47 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
// union with < 2 arguments
{"select distinctCountThetaSketch(colTS, 'nominalEntries=123', 'colA = 1', 'SET_UNION($1)')"},
// intersect with < 2 arguments
- {"select distinctCountThetaSketch(colTS, 'nominalEntries=123', 'colA = 1', 'SET_INTERSECT($1)')"}
- };
+ {"select distinctCountThetaSketch(colTS, 'nominalEntries=123', 'colA = 1', 'SET_INTERSECT($1)')"}};
}
private void testThetaSketches(boolean groupBy, boolean sql) {
- String tsQuery, distinctQuery;
- String thetaSketchParams = "nominalEntries=1001";
-
- List<String> predicateStrings = Collections.singletonList("colA = 1");
- String substitution = "$1";
- String whereClause = Strings.join(predicateStrings, " or ");
- tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, false);
- distinctQuery = buildQuery(whereClause, null, null, null, groupBy, false);
- testQuery(tsQuery, distinctQuery, groupBy, sql, false);
+ String parameters = "nominalEntries=1001";
- tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, true);
- testQuery(tsQuery, distinctQuery, groupBy, sql, true);
+ List<String> predicates = Collections.singletonList("colA = 1");
+ String postAggregationExpression = "$1";
+ String filter = "colA = 1";
+ testThetaSketch(parameters, predicates, postAggregationExpression, filter, groupBy, sql);
// Test Intersection (AND)
- predicateStrings = Arrays.asList("colA = 1", "colB >= 2.0", "colC <> 'colC_1'");
- substitution = "SET_INTERSECT($1, $2, $3)";
- whereClause = Strings.join(predicateStrings, " and ");
- tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, false);
- distinctQuery = buildQuery(whereClause, null, null, null, groupBy, false);
- testQuery(tsQuery, distinctQuery, groupBy, sql, false);
-
- tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, true);
- testQuery(tsQuery, distinctQuery, groupBy, sql, true);
+ predicates = Arrays.asList("colA = 1", "colB >= 2.0", "colC <> 'colC_1'");
+ postAggregationExpression = "SET_INTERSECT($1, $2, $3)";
+ filter = StringUtils.join(predicates, " and ");
+ testThetaSketch(parameters, predicates, postAggregationExpression, filter, groupBy, sql);
// Test Union (OR)
- predicateStrings = Arrays.asList("colA = 1", "colB = 1.9");
- substitution = "SET_UNION($1, $2)";
- whereClause = Strings.join(predicateStrings, " or ");
- tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, false);
- distinctQuery = buildQuery(whereClause, null, null, null, groupBy, false);
- testQuery(tsQuery, distinctQuery, groupBy, sql, false);
-
- tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, true);
- testQuery(tsQuery, distinctQuery, groupBy, sql, true);
+ predicates = Arrays.asList("colA = 1", "colB = 1.9");
+ postAggregationExpression = "SET_UNION($1, $2)";
+ filter = StringUtils.join(predicates, " or ");
+ testThetaSketch(parameters, predicates, postAggregationExpression, filter, groupBy, sql);
// Test complex predicates
- predicateStrings = Arrays.asList("colA in (1, 2)", "colB not in (3.0)", "colC between 'colC_1' and 'colC_5'");
- // operator precedence. ORs are evaluated after ANDs
- substitution = "SET_UNION(SET_INTERSECT($1, $2), SET_INTERSECT($1, $3))";
- whereClause =
- predicateStrings.get(0) + " and " + predicateStrings.get(1) + " or " + predicateStrings.get(0) + " and "
- + predicateStrings.get(2);
- tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, false);
- distinctQuery = buildQuery(whereClause, null, null, null, groupBy, false);
- testQuery(tsQuery, distinctQuery, groupBy, sql, false);
-
- tsQuery = buildQuery(whereClause, thetaSketchParams, predicateStrings, substitution, groupBy, true);
- testQuery(tsQuery, distinctQuery, groupBy, sql, true);
+ predicates = Arrays.asList("colA in (1, 2)", "colB not in (3.0)", "colC between 'colC_1' and 'colC_5'");
+ postAggregationExpression = "SET_UNION(SET_INTERSECT($1, $2), SET_INTERSECT($1, $3))";
+ filter = '(' + predicates.get(0) + " and " + predicates.get(1) + ") or (" + predicates.get(0) + " and " + predicates
+ .get(2) + ')';
+ testThetaSketch(parameters, predicates, postAggregationExpression, filter, groupBy, sql);
+ }
+
+ private void testThetaSketch(String parameters, List<String> predicates, String postAggregationExpression,
+ String filter, boolean groupBy, boolean sql) {
+ String distinctCountThetaSketchQuery =
+ buildDistinctCountThetaSketchQuery(parameters, predicates, postAggregationExpression, filter, groupBy);
+ String rawThetaSketchQuery1 = buildRawThetaSketchQuery(THETA_SKETCH_COLUMN, parameters, filter, groupBy);
+ String rawThetaSketchQuery2 = buildRawThetaSketchQuery(DISTINCT_COLUMN, parameters, filter, groupBy);
+ String distinctCountQuery = buildDistinctCountQuery(filter, groupBy);
+ testQuery(distinctCountThetaSketchQuery, distinctCountQuery, groupBy, sql, false);
+ testQuery(rawThetaSketchQuery1, distinctCountQuery, groupBy, sql, true);
+ testQuery(rawThetaSketchQuery2, distinctCountQuery, groupBy, sql, true);
}
private void testQuery(String tsQuery, String distinctQuery, boolean groupBy, boolean sql, boolean raw) {
@@ -229,111 +216,86 @@ public class DistinctCountThetaSketchTest extends BaseQueriesTest {
}
}
- private void compareAggregationPql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse,
- boolean raw) {
- List<AggregationResult> actualResults = actualResponse.getAggregationResults();
- Assert.assertEquals(actualResults.size(), 1);
- double actual = getSketchValue((String) actualResults.get(0).getValue(), raw);
-
- List<AggregationResult> expectedResults = expectedResponse.getAggregationResults();
- double expected = Double.parseDouble((String) expectedResults.get(0).getValue());
-
- Assert.assertEquals(actual, expected, (expected * 0.1), // Allow for 10 % error.
- "Distinct count mismatch: actual: " + actual + "expected: " + expected + "seed:" + RANDOM_SEED);
- }
-
private void compareSql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse, boolean raw) {
List<Object[]> actualRows = actualResponse.getResultTable().getRows();
List<Object[]> expectedRows = expectedResponse.getResultTable().getRows();
-
- Assert.assertEquals(actualRows.size(), expectedRows.size());
-
- for (int i = 0; i < actualRows.size(); i++) {
- double actual = getSketchValue(actualRows.get(i)[0].toString(), raw);
- double expected = (Integer) expectedRows.get(i)[0];
- Assert.assertEquals(actual, expected);
+ int numRows = actualRows.size();
+ assertEquals(numRows, expectedRows.size(), ERROR_MESSAGE);
+ for (int i = 0; i < numRows; i++) {
+ int actual = getSketchValue(actualRows.get(i)[0].toString(), raw);
+ int expected = (int) expectedRows.get(i)[0];
+ assertEquals(actual, expected, ERROR_MESSAGE);
}
}
- private void compareGroupByPql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse,
+ private void compareAggregationPql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse,
boolean raw) {
- AggregationResult actualResult = actualResponse.getAggregationResults().get(0);
- List<GroupByResult> actualGroupBy = actualResult.getGroupByResult();
-
- AggregationResult expectedResult = expectedResponse.getAggregationResults().get(0);
- List<GroupByResult> expectedGroupBy = expectedResult.getGroupByResult();
-
- Assert.assertEquals(actualGroupBy.size(), expectedGroupBy.size());
- for (int i = 0; i < actualGroupBy.size(); i++) {
- double actual = getSketchValue((String) actualGroupBy.get(i).getValue(), raw);
- double expected = Double.parseDouble((String) expectedGroupBy.get(i).getValue());
+ int actual = getSketchValue((String) actualResponse.getAggregationResults().get(0).getValue(), raw);
+ int expected = Integer.parseInt((String) expectedResponse.getAggregationResults().get(0).getValue());
+ assertEquals(actual, expected, ERROR_MESSAGE);
+ }
- Assert.assertEquals(actual, expected, (expected * 0.1), // Allow for 10 % error.
- "Distinct count mismatch: actual: " + actual + "expected: " + expected + "seed:" + RANDOM_SEED);
+ private void compareGroupByPql(BrokerResponseNative actualResponse, BrokerResponseNative expectedResponse,
+ boolean raw) {
+ List<GroupByResult> actualResults = actualResponse.getAggregationResults().get(0).getGroupByResult();
+ List<GroupByResult> expectedResults = expectedResponse.getAggregationResults().get(0).getGroupByResult();
+ int numGroups = actualResults.size();
+ assertEquals(numGroups, expectedResults.size(), ERROR_MESSAGE);
+ for (int i = 0; i < numGroups; i++) {
+ int actual = getSketchValue((String) actualResults.get(i).getValue(), raw);
+ int expected = Integer.parseInt((String) expectedResults.get(i).getValue());
+ assertEquals(actual, expected, ERROR_MESSAGE);
}
}
- private double getSketchValue(String value, boolean raw) {
+ private int getSketchValue(String value, boolean raw) {
if (!raw) {
- return Double.parseDouble(value);
+ return Integer.parseInt(value);
+ } else {
+ byte[] bytes = Base64.getDecoder().decode(value);
+ return (int) Math.round(Sketch.wrap(Memory.wrap(bytes)).getEstimate());
}
-
- byte[] bytes = BytesUtils.toBytes(value);
- return Sketch.wrap(Memory.wrap(bytes)).getEstimate();
}
- private String buildQuery(String whereClause, String thetaSketchParams, List<String> thetaSketchPredicates,
- String postAggregationExpression, boolean groupBy, boolean raw) {
- String column;
- String aggrFunction;
- boolean thetaSketch = (postAggregationExpression != null);
-
- if (thetaSketch) {
- aggrFunction = (raw) ? AggregationFunctionType.DISTINCTCOUNTRAWTHETASKETCH.getName()
- : AggregationFunctionType.DISTINCTCOUNTTHETASKETCH.getName();
- column = THETA_SKETCH_COLUMN;
- } else {
- aggrFunction = AggregationFunctionType.DISTINCTCOUNT.getName();
- column = DISTINCT_COLUMN;
+ private String buildDistinctCountThetaSketchQuery(String parameters, List<String> predicates,
+ String postAggregationExpression, String filter, boolean groupBy) {
+ StringBuilder stringBuilder =
+ new StringBuilder("select ").append(AggregationFunctionType.DISTINCTCOUNTTHETASKETCH.getName()).append('(')
+ .append(THETA_SKETCH_COLUMN).append(",'").append(parameters).append("','");
+ for (String predicate : predicates) {
+ stringBuilder.append(predicate.replace("'", "''")).append("','");
}
+ stringBuilder.append(postAggregationExpression.replace("'", "''")).append("')");
- StringBuilder sb = new StringBuilder("select ");
- sb.append(aggrFunction);
- sb.append("(");
- sb.append(column);
-
- if (thetaSketch) {
- sb.append(", ");
-
- sb.append("'");
- if (thetaSketchParams != null) {
- sb.append(thetaSketchParams.replace("'", "''"));
- }
- sb.append("', ");
-
- for (String predicate : thetaSketchPredicates) {
- sb.append('\'');
- sb.append(predicate.replace("'", "''"));
- sb.append('\'');
- sb.append(", ");
- }
-
- sb.append('\'');
- sb.append(postAggregationExpression.replace("'", "''"));
- sb.append('\'');
+ stringBuilder.append(" from ").append(TABLE_NAME).append(" where ").append(filter);
+ if (groupBy) {
+ stringBuilder.append(" group by colA, colB");
}
+ return stringBuilder.toString();
+ }
- sb.append(") from ");
-
- sb.append(TABLE_NAME);
- sb.append(" where ");
- sb.append(whereClause);
+ private String buildRawThetaSketchQuery(String column, String parameters, String filter, boolean groupBy) {
+ StringBuilder stringBuilder =
+ new StringBuilder("select ").append(AggregationFunctionType.RAWTHETASKETCH.getName()).append('(').append(column)
+ .append(",'").append(parameters).append("')");
+ stringBuilder.append(" from ").append(TABLE_NAME).append(" where ").append(filter);
if (groupBy) {
- sb.append(" group by colA, colB");
+ stringBuilder.append(" group by colA, colB");
}
+ return stringBuilder.toString();
+ }
+
+ private String buildDistinctCountQuery(String filter, boolean groupBy) {
+ StringBuilder stringBuilder =
+ new StringBuilder("select ").append(AggregationFunctionType.DISTINCTCOUNT.getName()).append('(')
+ .append(DISTINCT_COLUMN).append(')');
- return sb.toString();
+ stringBuilder.append(" from ").append(TABLE_NAME).append(" where ").append(filter);
+ if (groupBy) {
+ stringBuilder.append(" group by colA, colB");
+ }
+ return stringBuilder.toString();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org