You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/12/08 22:31:58 UTC
[pinot] 01/01: Revert "Add Variance and Standard Deviation Aggregation Functions (#9910)"
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch revert-9910-add-variance-function
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 48bed903895c6e75a7f5e031c2d4ff2b20f5788b
Author: Seunghyun Lee <sn...@apache.org>
AuthorDate: Thu Dec 8 14:31:51 2022 -0800
Revert "Add Variance and Standard Deviation Aggregation Functions (#9910)"
This reverts commit 852477bdf312607c9a0eba7278453b7957139466.
---
.../apache/pinot/core/common/ObjectSerDeUtils.java | 26 +-
.../function/AggregationFunctionFactory.java | 8 -
.../function/CovarianceAggregationFunction.java | 31 +-
.../function/VarianceAggregationFunction.java | 188 ------
.../utils/StatisticalAggregationFunctionUtils.java | 53 --
.../org/apache/pinot/queries/BaseQueriesTest.java | 16 +-
.../pinot/queries/CovarianceQueriesTest.java | 461 +++++++++++++
.../pinot/queries/StatisticalQueriesTest.java | 749 ---------------------
.../segment/local/customobject/VarianceTuple.java | 105 ---
.../pinot/segment/spi/AggregationFunctionType.java | 4 -
10 files changed, 495 insertions(+), 1146 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 6fbacc5fcd..e22a2e7fb0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -72,7 +72,6 @@ import org.apache.pinot.segment.local.customobject.LongLongPair;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
import org.apache.pinot.segment.local.customobject.StringLongPair;
-import org.apache.pinot.segment.local.customobject.VarianceTuple;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
@@ -124,8 +123,7 @@ public class ObjectSerDeUtils {
FloatLongPair(29),
DoubleLongPair(30),
StringLongPair(31),
- CovarianceTuple(32),
- VarianceTuple(33);
+ CovarianceTuple(32);
private final int _value;
@@ -207,8 +205,6 @@ public class ObjectSerDeUtils {
return ObjectType.StringLongPair;
} else if (value instanceof CovarianceTuple) {
return ObjectType.CovarianceTuple;
- } else if (value instanceof VarianceTuple) {
- return ObjectType.VarianceTuple;
} else {
throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
}
@@ -466,23 +462,6 @@ public class ObjectSerDeUtils {
}
};
- public static final ObjectSerDe<VarianceTuple> VARIANCE_TUPLE_OBJECT_SER_DE = new ObjectSerDe<VarianceTuple>() {
- @Override
- public byte[] serialize(VarianceTuple varianceTuple) {
- return varianceTuple.toBytes();
- }
-
- @Override
- public VarianceTuple deserialize(byte[] bytes) {
- return VarianceTuple.fromBytes(bytes);
- }
-
- @Override
- public VarianceTuple deserialize(ByteBuffer byteBuffer) {
- return VarianceTuple.fromByteBuffer(byteBuffer);
- }
- };
-
public static final ObjectSerDe<HyperLogLog> HYPER_LOG_LOG_SER_DE = new ObjectSerDe<HyperLogLog>() {
@Override
@@ -1212,8 +1191,7 @@ public class ObjectSerDeUtils {
FLOAT_LONG_PAIR_SER_DE,
DOUBLE_LONG_PAIR_SER_DE,
STRING_LONG_PAIR_SER_DE,
- COVARIANCE_TUPLE_OBJECT_SER_DE,
- VARIANCE_TUPLE_OBJECT_SER_DE
+ COVARIANCE_TUPLE_OBJECT_SER_DE
};
//@formatter:on
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 2758d193a2..4e1bda2025 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -277,14 +277,6 @@ public class AggregationFunctionFactory {
return new BooleanAndAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
case BOOLOR:
return new BooleanOrAggregationFunction(firstArgument, queryContext.isNullHandlingEnabled());
- case VARPOP:
- return new VarianceAggregationFunction(firstArgument, false, false);
- case VARSAMP:
- return new VarianceAggregationFunction(firstArgument, true, false);
- case STDDEVPOP:
- return new VarianceAggregationFunction(firstArgument, false, true);
- case STDDEVSAMP:
- return new VarianceAggregationFunction(firstArgument, true, true);
default:
throw new IllegalArgumentException();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CovarianceAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CovarianceAggregationFunction.java
index 1676835bae..bd68235c48 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CovarianceAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CovarianceAggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -29,7 +30,6 @@ import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
-import org.apache.pinot.core.query.aggregation.utils.StatisticalAggregationFunctionUtils;
import org.apache.pinot.segment.local.customobject.CovarianceTuple;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -99,8 +99,8 @@ public class CovarianceAggregationFunction implements AggregationFunction<Covari
@Override
public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
- double[] values1 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression1);
- double[] values2 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression2);
+ double[] values1 = getValSet(blockValSetMap, _expression1);
+ double[] values2 = getValSet(blockValSetMap, _expression2);
double sumX = 0.0;
double sumY = 0.0;
@@ -134,11 +134,28 @@ public class CovarianceAggregationFunction implements AggregationFunction<Covari
}
}
+ private double[] getValSet(Map<ExpressionContext, BlockValSet> blockValSetMap, ExpressionContext expression) {
+ BlockValSet blockValSet = blockValSetMap.get(expression);
+ //TODO: Add MV support for covariance
+ Preconditions.checkState(blockValSet.isSingleValue(),
+ "Covariance function currently only supports single-valued column");
+ switch (blockValSet.getValueType().getStoredType()) {
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ return blockValSet.getDoubleValuesSV();
+ default:
+ throw new IllegalStateException(
+ "Cannot compute covariance for non-numeric type: " + blockValSet.getValueType());
+ }
+ }
+
@Override
public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
- double[] values1 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression1);
- double[] values2 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression2);
+ double[] values1 = getValSet(blockValSetMap, _expression1);
+ double[] values2 = getValSet(blockValSetMap, _expression2);
for (int i = 0; i < length; i++) {
setGroupByResult(groupKeyArray[i], groupByResultHolder, values1[i], values2[i], values1[i] * values2[i], 1L);
}
@@ -147,8 +164,8 @@ public class CovarianceAggregationFunction implements AggregationFunction<Covari
@Override
public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
- double[] values1 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression1);
- double[] values2 = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression2);
+ double[] values1 = getValSet(blockValSetMap, _expression1);
+ double[] values2 = getValSet(blockValSetMap, _expression2);
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
setGroupByResult(groupKey, groupByResultHolder, values1[i], values2[i], values1[i] * values2[i], 1L);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/VarianceAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/VarianceAggregationFunction.java
deleted file mode 100644
index c86269b7ce..0000000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/VarianceAggregationFunction.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.aggregation.function;
-
-import java.util.Map;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
-import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
-import org.apache.pinot.core.query.aggregation.utils.StatisticalAggregationFunctionUtils;
-import org.apache.pinot.segment.local.customobject.VarianceTuple;
-import org.apache.pinot.segment.spi.AggregationFunctionType;
-
-
-/**
- * Aggregation function which computes Variance and Standard Deviation
- *
- * The algorithm to compute variance is based on "Updating Formulae and a Pairwise Algorithm for Computing
- * Sample Variances" by Chan et al. Please refer to the "Parallel Algorithm" section from the following wiki:
- * - https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
- */
-public class VarianceAggregationFunction extends BaseSingleInputAggregationFunction<VarianceTuple, Double> {
- private static final double DEFAULT_FINAL_RESULT = Double.NEGATIVE_INFINITY;
- protected final boolean _isSample;
-
- protected final boolean _isStdDev;
-
- public VarianceAggregationFunction(ExpressionContext expression, boolean isSample, boolean isStdDev) {
- super(expression);
- _isSample = isSample;
- _isStdDev = isStdDev;
- }
-
- @Override
- public AggregationFunctionType getType() {
- if (_isSample) {
- return (_isStdDev) ? AggregationFunctionType.STDDEVSAMP : AggregationFunctionType.VARSAMP;
- }
- return (_isStdDev) ? AggregationFunctionType.STDDEVPOP : AggregationFunctionType.VARPOP;
- }
-
- @Override
- public AggregationResultHolder createAggregationResultHolder() {
- return new ObjectAggregationResultHolder();
- }
-
- @Override
- public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
- return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
- }
-
- @Override
- public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
-
- long count = 0;
- double sum = 0.0;
- double variance = 0.0;
- for (int i = 0; i < length; i++) {
- count++;
- sum += values[i];
- if (count > 1) {
- variance = computeIntermediateVariance(count, sum, variance, values[i]);
- }
- }
- setAggregationResult(aggregationResultHolder, length, sum, variance);
- }
-
- private double computeIntermediateVariance(long count, double sum, double m2, double value) {
- double t = count * value - sum;
- m2 += (t * t) / (count * (count - 1));
- return m2;
- }
-
- protected void setAggregationResult(AggregationResultHolder aggregationResultHolder, long count, double sum,
- double m2) {
- VarianceTuple varianceTuple = aggregationResultHolder.getResult();
- if (varianceTuple == null) {
- aggregationResultHolder.setValue(new VarianceTuple(count, sum, m2));
- } else {
- varianceTuple.apply(count, sum, m2);
- }
- }
-
- protected void setGroupByResult(int groupKey, GroupByResultHolder groupByResultHolder, long count, double sum,
- double m2) {
- VarianceTuple varianceTuple = groupByResultHolder.getResult(groupKey);
- if (varianceTuple == null) {
- groupByResultHolder.setValueForKey(groupKey, new VarianceTuple(count, sum, m2));
- } else {
- varianceTuple.apply(count, sum, m2);
- }
- }
-
- @Override
- public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
- for (int i = 0; i < length; i++) {
- setGroupByResult(groupKeyArray[i], groupByResultHolder, 1L, values[i], 0.0);
- }
- }
-
- @Override
- public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
- Map<ExpressionContext, BlockValSet> blockValSetMap) {
- double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
- for (int i = 0; i < length; i++) {
- for (int groupKey : groupKeysArray[i]) {
- setGroupByResult(groupKey, groupByResultHolder, 1L, values[i], 0.0);
- }
- }
- }
-
- @Override
- public VarianceTuple extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
- VarianceTuple varianceTuple = aggregationResultHolder.getResult();
- if (varianceTuple == null) {
- return new VarianceTuple(0L, 0.0, 0.0);
- } else {
- return varianceTuple;
- }
- }
-
- @Override
- public VarianceTuple extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
- return groupByResultHolder.getResult(groupKey);
- }
-
- @Override
- public VarianceTuple merge(VarianceTuple intermediateResult1, VarianceTuple intermediateResult2) {
- intermediateResult1.apply(intermediateResult2);
- return intermediateResult1;
- }
-
- @Override
- public DataSchema.ColumnDataType getIntermediateResultColumnType() {
- return DataSchema.ColumnDataType.OBJECT;
- }
-
- @Override
- public DataSchema.ColumnDataType getFinalResultColumnType() {
- return DataSchema.ColumnDataType.DOUBLE;
- }
-
- @Override
- public Double extractFinalResult(VarianceTuple varianceTuple) {
- if (varianceTuple == null) {
- return null;
- }
- long count = varianceTuple.getCount();
- if (count == 0L) {
- return DEFAULT_FINAL_RESULT;
- } else {
- double variance = varianceTuple.getM2();
- if (_isSample) {
- if (count - 1 == 0L) {
- return DEFAULT_FINAL_RESULT;
- }
- double sampleVar = variance / (count - 1);
- return (_isStdDev) ? Math.sqrt(sampleVar) : sampleVar;
- } else {
- double popVar = variance / count;
- return (_isStdDev) ? Math.sqrt(popVar) : popVar;
- }
- }
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/StatisticalAggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/StatisticalAggregationFunctionUtils.java
deleted file mode 100644
index b7a05de3a4..0000000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/utils/StatisticalAggregationFunctionUtils.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.aggregation.utils;
-
-import com.google.common.base.Preconditions;
-import java.util.Map;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-
-
-/**
- * Util class for statistical aggregation functions
- *
- * e.g. Variance, Covariance, Standard Deviation...
- */
-public class StatisticalAggregationFunctionUtils {
- private StatisticalAggregationFunctionUtils() {
- }
-
- public static double[] getValSet(Map<ExpressionContext, BlockValSet> blockValSetMap, ExpressionContext expression) {
- BlockValSet blockValSet = blockValSetMap.get(expression);
- //TODO: Add MV support for covariance
- Preconditions.checkState(blockValSet.isSingleValue(),
- "Variance, Covariance, Standard Deviation function currently only supports single-valued column");
- switch (blockValSet.getValueType().getStoredType()) {
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE:
- return blockValSet.getDoubleValuesSV();
- default:
- throw new IllegalStateException(
- "Cannot compute variance, covariance, or standard deviation for non-numeric type: "
- + blockValSet.getValueType());
- }
- }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index fa5a5068df..f359987780 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -111,7 +111,7 @@ public abstract class BaseQueriesTest {
* In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
* different index segments in the test and overriding getDistinctInstances.
* This can be particularly useful to test statistical aggregation functions.
- * @see StatisticalQueriesTest for an example use case.
+ * @see CovarianceQueriesTest for an example use case.
*/
protected BrokerResponseNative getBrokerResponse(String query) {
return getBrokerResponse(query, PLAN_MAKER);
@@ -125,7 +125,7 @@ public abstract class BaseQueriesTest {
* In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
* different index segments in the test and overriding getDistinctInstances.
* This can be particularly useful to test statistical aggregation functions.
- * @see StatisticalQueriesTest for an example use case.
+ * @see CovarianceQueriesTest for an example use case.
*/
protected BrokerResponseNative getBrokerResponseWithFilter(String query) {
return getBrokerResponse(query + getFilter());
@@ -139,7 +139,7 @@ public abstract class BaseQueriesTest {
* In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
* different index segments in the test and overriding getDistinctInstances.
* This can be particularly useful to test statistical aggregation functions.
- * @see StatisticalQueriesTest for an example use case.
+ * @see CovarianceQueriesTest for an example use case.
*/
protected BrokerResponseNative getBrokerResponse(String query, PlanMaker planMaker) {
return getBrokerResponse(query, planMaker, null);
@@ -153,7 +153,7 @@ public abstract class BaseQueriesTest {
* In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
* different index segments in the test and overriding getDistinctInstances.
* This can be particularly useful to test statistical aggregation functions.
- * @see StatisticalQueriesTest for an example use case.
+ * @see CovarianceQueriesTest for an example use case.
*/
protected BrokerResponseNative getBrokerResponse(String query, @Nullable Map<String, String> extraQueryOptions) {
return getBrokerResponse(query, PLAN_MAKER, extraQueryOptions);
@@ -167,7 +167,7 @@ public abstract class BaseQueriesTest {
* In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
* different index segments in the test and overriding getDistinctInstances.
* This can be particularly useful to test statistical aggregation functions.
- * @see StatisticalQueriesTest for an example use case.
+ * @see CovarianceQueriesTest for an example use case.
*/
private BrokerResponseNative getBrokerResponse(String query, PlanMaker planMaker,
@Nullable Map<String, String> extraQueryOptions) {
@@ -191,7 +191,7 @@ public abstract class BaseQueriesTest {
* In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
* different index segments in the test and overriding getDistinctInstances.
* This can be particularly useful to test statistical aggregation functions.
- * @see StatisticalQueriesTest for an example use case.
+ * @see CovarianceQueriesTest for an example use case.
*/
private BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery, PlanMaker planMaker) {
PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
@@ -249,7 +249,7 @@ public abstract class BaseQueriesTest {
* In order to query 2 distinct instances, the caller of this function should handle initializing 2 instances with
* different index segments in the test and overriding getDistinctInstances.
* This can be particularly useful to test statistical aggregation functions.
- * @see StatisticalQueriesTest for an example use case.
+ * @see CovarianceQueriesTest for an example use case.
*/
protected BrokerResponseNative getBrokerResponseForOptimizedQuery(String query, @Nullable TableConfig config,
@Nullable Schema schema) {
@@ -286,7 +286,7 @@ public abstract class BaseQueriesTest {
* The caller of this function should handle initializing 2 instances with different index segments in the test and
* overriding getDistinctInstances.
* This can be particularly useful to test statistical aggregation functions.
- * @see StatisticalQueriesTest for an example use case.
+ * @see CovarianceQueriesTest for an example use case.
*/
private BrokerResponseNative getBrokerResponseDistinctInstances(PinotQuery pinotQuery, PlanMaker planMaker) {
PinotQuery serverPinotQuery = GapfillUtils.stripGapfill(pinotQuery);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/CovarianceQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/CovarianceQueriesTest.java
new file mode 100644
index 0000000000..0ab86180e8
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/CovarianceQueriesTest.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.math3.stat.correlation.Covariance;
+import org.apache.commons.math3.util.Precision;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import org.apache.pinot.core.operator.query.GroupByOperator;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.segment.local.customobject.CovarianceTuple;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Queries test for covariance queries.
+ */
+public class CovarianceQueriesTest extends BaseQueriesTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "CovarianceQueriesTest");
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME = "testSegment";
+
+ // test segments 1-4 evenly divide testSegment into 4 distinct segments
+ private static final String SEGMENT_NAME_1 = "testSegment1";
+ private static final String SEGMENT_NAME_2 = "testSegment2";
+ private static final String SEGMENT_NAME_3 = "testSegment3";
+ private static final String SEGMENT_NAME_4 = "testSegment4";
+
+ private static final int NUM_RECORDS = 2000;
+ private static final int NUM_GROUPS = 10;
+ private static final int MAX_VALUE = 500;
+ private static final double RELATIVE_EPSILON = 0.0001;
+ private static final double DELTA = 0.0001;
+
+ private static final String INT_COLUMN_X = "intColumnX";
+ private static final String INT_COLUMN_Y = "intColumnY";
+ private static final String DOUBLE_COLUMN_X = "doubleColumnX";
+ private static final String DOUBLE_COLUMN_Y = "doubleColumnY";
+ private static final String LONG_COLUMN = "longColumn";
+ private static final String FLOAT_COLUMN = "floatColumn";
+ private static final String GROUP_BY_COLUMN = "groupByColumn";
+
+ private static final Schema SCHEMA =
+ new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN_X, FieldSpec.DataType.INT)
+ .addSingleValueDimension(INT_COLUMN_Y, FieldSpec.DataType.INT)
+ .addSingleValueDimension(DOUBLE_COLUMN_X, FieldSpec.DataType.DOUBLE)
+ .addSingleValueDimension(DOUBLE_COLUMN_Y, FieldSpec.DataType.DOUBLE)
+ .addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(FLOAT_COLUMN, FieldSpec.DataType.FLOAT)
+ .addSingleValueDimension(GROUP_BY_COLUMN, FieldSpec.DataType.DOUBLE).build();
+ private static final TableConfig TABLE_CONFIG =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
+ private List<List<IndexSegment>> _distinctInstances;
+ private int _sumIntX = 0;
+ private int _sumIntY = 0;
+ private int _sumIntXY = 0;
+
+ private double _sumDoubleX = 0;
+ private double _sumDoubleY = 0;
+ private double _sumDoubleXY = 0;
+
+ private long _sumLong = 0L;
+ private double _sumFloat = 0;
+
+ private double _sumIntDouble = 0;
+ private long _sumIntLong = 0L;
+ private double _sumIntFloat = 0;
+ private double _sumDoubleLong = 0;
+ private double _sumDoubleFloat = 0;
+ private double _sumLongFloat = 0;
+
+ private double _expectedCovIntXY;
+ private double _expectedCovDoubleXY;
+ private double _expectedCovIntDouble;
+ private double _expectedCovIntLong;
+ private double _expectedCovIntFloat;
+ private double _expectedCovDoubleLong;
+ private double _expectedCovDoubleFloat;
+ private double _expectedCovLongFloat;
+
+ private double _expectedCovWithFilter;
+
+ private final CovarianceTuple[] _expectedGroupByResultVer1 = new CovarianceTuple[NUM_GROUPS];
+ private final CovarianceTuple[] _expectedGroupByResultVer2 = new CovarianceTuple[NUM_GROUPS];
+ private final double[] _expectedFinalResultVer1 = new double[NUM_GROUPS];
+ private final double[] _expectedFinalResultVer2 = new double[NUM_GROUPS];
+
+ private boolean _useIdenticalSegment = false;
+
+ @Override
+ protected String getFilter() {
+ // filter out half of the rows based on group id
+ return " WHERE groupByColumn < " + (NUM_GROUPS / 2);
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @Override
+ protected List<List<IndexSegment>> getDistinctInstances() {
+ if (_useIdenticalSegment) {
+ return Collections.singletonList(_indexSegments);
+ }
+ return _distinctInstances;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(INDEX_DIR);
+
+ List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+
+ Random rand = new Random();
+ int[] intColX = rand.ints(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
+ int[] intColY = rand.ints(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
+ double[] doubleColX = rand.doubles(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
+ double[] doubleColY = rand.doubles(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
+ long[] longCol = rand.longs(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
+ double[] floatCol = new double[NUM_RECORDS];
+ double[] groupByCol = new double[NUM_RECORDS];
+
+ int groupSize = NUM_RECORDS / NUM_GROUPS;
+ double sumX = 0;
+ double sumY = 0;
+ double sumGroupBy = 0;
+ double sumXY = 0;
+ double sumXGroupBy = 0;
+ int groupByVal = 0;
+
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ GenericRow record = new GenericRow();
+ int intX = intColX[i];
+ int intY = intColY[i];
+ double doubleX = doubleColX[i];
+ double doubleY = doubleColY[i];
+ long longVal = longCol[i];
+ float floatVal = -MAX_VALUE + rand.nextFloat() * 2 * MAX_VALUE;
+
+ // set up inner segment group by results
+ groupByVal = (int) Math.floor(i / groupSize);
+ if (i % groupSize == 0 && groupByVal > 0) {
+ _expectedGroupByResultVer1[groupByVal - 1] = new CovarianceTuple(sumX, sumGroupBy, sumXGroupBy, groupSize);
+ _expectedGroupByResultVer2[groupByVal - 1] = new CovarianceTuple(sumX, sumY, sumXY, groupSize);
+ sumX = 0;
+ sumY = 0;
+ sumGroupBy = 0;
+ sumXY = 0;
+ sumXGroupBy = 0;
+ }
+
+ sumX += doubleX;
+ sumY += doubleY;
+ sumGroupBy += groupByVal;
+ sumXY += doubleX * doubleY;
+ sumXGroupBy += doubleX * groupByVal;
+
+ floatCol[i] = floatVal;
+ groupByCol[i] = groupByVal;
+
+ // calculate inner segment results
+ _sumIntX += intX;
+ _sumIntY += intY;
+ _sumDoubleX += doubleX;
+ _sumDoubleY += doubleY;
+ _sumLong += longVal;
+ _sumFloat += floatVal;
+ _sumIntXY += intX * intY;
+ _sumDoubleXY += doubleX * doubleY;
+ _sumIntDouble += intX * doubleX;
+ _sumIntLong += intX * longVal;
+ _sumIntFloat += intX * floatCol[i];
+ _sumDoubleLong += doubleX * longVal;
+ _sumDoubleFloat += doubleX * floatCol[i];
+ _sumLongFloat += longVal * floatCol[i];
+
+ record.putValue(INT_COLUMN_X, intX);
+ record.putValue(INT_COLUMN_Y, intY);
+ record.putValue(DOUBLE_COLUMN_X, doubleX);
+ record.putValue(DOUBLE_COLUMN_Y, doubleY);
+ record.putValue(LONG_COLUMN, longVal);
+ record.putValue(FLOAT_COLUMN, floatVal);
+ record.putValue(GROUP_BY_COLUMN, groupByVal);
+ records.add(record);
+ }
+ _expectedGroupByResultVer1[groupByVal] = new CovarianceTuple(sumX, sumGroupBy, sumXGroupBy, groupSize);
+ _expectedGroupByResultVer2[groupByVal] = new CovarianceTuple(sumX, sumY, sumXY, groupSize);
+
+ // calculate inter segment result
+ Covariance cov = new Covariance();
+ double[] newIntColX = Arrays.stream(intColX).asDoubleStream().toArray();
+ double[] newIntColY = Arrays.stream(intColY).asDoubleStream().toArray();
+ double[] newLongCol = Arrays.stream(longCol).asDoubleStream().toArray();
+ _expectedCovIntXY = cov.covariance(newIntColX, newIntColY, false);
+ _expectedCovDoubleXY = cov.covariance(doubleColX, doubleColY, false);
+ _expectedCovIntDouble = cov.covariance(newIntColX, doubleColX, false);
+ _expectedCovIntLong = cov.covariance(newIntColX, newLongCol, false);
+ _expectedCovIntFloat = cov.covariance(newIntColX, floatCol, false);
+ _expectedCovDoubleLong = cov.covariance(doubleColX, newLongCol, false);
+ _expectedCovDoubleFloat = cov.covariance(doubleColX, floatCol, false);
+ _expectedCovLongFloat = cov.covariance(newLongCol, floatCol, false);
+
+ double[] filteredX = Arrays.copyOfRange(doubleColX, 0, NUM_RECORDS / 2);
+ double[] filteredY = Arrays.copyOfRange(doubleColY, 0, NUM_RECORDS / 2);
+ _expectedCovWithFilter = cov.covariance(filteredX, filteredY, false);
+
+ // calculate inter segment group by results
+ for (int i = 0; i < NUM_GROUPS; i++) {
+ double[] colX = Arrays.copyOfRange(doubleColX, i * groupSize, (i + 1) * groupSize);
+ double[] colGroupBy = Arrays.copyOfRange(groupByCol, i * groupSize, (i + 1) * groupSize);
+ double[] colY = Arrays.copyOfRange(doubleColY, i * groupSize, (i + 1) * groupSize);
+ _expectedFinalResultVer1[i] = cov.covariance(colX, colGroupBy, false);
+ _expectedFinalResultVer2[i] = cov.covariance(colX, colY, false);
+ }
+
+ // generate testSegment
+ ImmutableSegment immutableSegment = setUpSingleSegment(records, SEGMENT_NAME);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+
+ // divide testSegment into 4 distinct segments for distinct inter segment tests
+ // by doing so, we can avoid calculating global covariance again
+ _distinctInstances = new ArrayList<>();
+ int segmentSize = NUM_RECORDS / 4;
+ ImmutableSegment immutableSegment1 = setUpSingleSegment(records.subList(0, segmentSize), SEGMENT_NAME_1);
+ ImmutableSegment immutableSegment2 =
+ setUpSingleSegment(records.subList(segmentSize, segmentSize * 2), SEGMENT_NAME_2);
+ ImmutableSegment immutableSegment3 =
+ setUpSingleSegment(records.subList(segmentSize * 2, segmentSize * 3), SEGMENT_NAME_3);
+ ImmutableSegment immutableSegment4 =
+ setUpSingleSegment(records.subList(segmentSize * 3, NUM_RECORDS), SEGMENT_NAME_4);
+ // generate 2 instances each with 2 distinct segments
+ _distinctInstances.add(Arrays.asList(immutableSegment1, immutableSegment2));
+ _distinctInstances.add(Arrays.asList(immutableSegment3, immutableSegment4));
+ }
+
+ private ImmutableSegment setUpSingleSegment(List<GenericRow> recordSet, String segmentName)
+ throws Exception {
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+ segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+ segmentGeneratorConfig.setSegmentName(segmentName);
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig, new GenericRowRecordReader(recordSet));
+ driver.build();
+
+ return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap);
+ }
+
+ @Test
+ public void testAggregationOnly() {
+ // Inner Segment
+ String query =
+ "SELECT COVAR_POP(intColumnX, intColumnY), COVAR_POP(doubleColumnX, doubleColumnY), COVAR_POP(intColumnX, "
+ + "doubleColumnX), " + "COVAR_POP(intColumnX, longColumn), COVAR_POP(intColumnX, floatColumn), "
+ + "COVAR_POP(doubleColumnX, longColumn), COVAR_POP(doubleColumnX, floatColumn), COVAR_POP(longColumn, "
+ + "floatColumn) FROM testTable";
+ AggregationOperator aggregationOperator = getOperator(query);
+ AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
+ QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0,
+ NUM_RECORDS * 6, NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getResults();
+ assertNotNull(aggregationResult);
+ checkWithPrecision((CovarianceTuple) aggregationResult.get(0), _sumIntX, _sumIntY, _sumIntXY, NUM_RECORDS);
+ checkWithPrecision((CovarianceTuple) aggregationResult.get(1), _sumDoubleX, _sumDoubleY, _sumDoubleXY, NUM_RECORDS);
+ checkWithPrecision((CovarianceTuple) aggregationResult.get(2), _sumIntX, _sumDoubleX, _sumIntDouble, NUM_RECORDS);
+ checkWithPrecision((CovarianceTuple) aggregationResult.get(3), _sumIntX, _sumLong, _sumIntLong, NUM_RECORDS);
+ checkWithPrecision((CovarianceTuple) aggregationResult.get(4), _sumIntX, _sumFloat, _sumIntFloat, NUM_RECORDS);
+ checkWithPrecision((CovarianceTuple) aggregationResult.get(5), _sumDoubleX, _sumLong, _sumDoubleLong, NUM_RECORDS);
+ checkWithPrecision((CovarianceTuple) aggregationResult.get(6), _sumDoubleX, _sumFloat, _sumDoubleFloat,
+ NUM_RECORDS);
+ checkWithPrecision((CovarianceTuple) aggregationResult.get(7), _sumLong, _sumFloat, _sumLongFloat, NUM_RECORDS);
+
+ // Inter segments with 4 identical segments (2 instances each having 2 identical segments)
+ _useIdenticalSegment = true;
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ _useIdenticalSegment = false;
+ assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+ assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 6 * NUM_RECORDS);
+ assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+ checkResultTableWithPrecision(brokerResponse);
+
+ // Inter segments with 4 distinct segments (2 instances each having 2 distinct segments)
+ brokerResponse = getBrokerResponse(query);
+ assertEquals(brokerResponse.getNumDocsScanned(), NUM_RECORDS);
+ assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 6 * NUM_RECORDS);
+ assertEquals(brokerResponse.getTotalDocs(), NUM_RECORDS);
+ checkResultTableWithPrecision(brokerResponse);
+
+ // Inter segments with 4 identical segments with filter
+ _useIdenticalSegment = true;
+ query = "SELECT COVAR_POP(doubleColumnX, doubleColumnY) FROM testTable" + getFilter();
+ brokerResponse = getBrokerResponse(query);
+ _useIdenticalSegment = false;
+ assertEquals(brokerResponse.getNumDocsScanned(), 2 * NUM_RECORDS);
+ assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * NUM_RECORDS);
+ assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+ Object[] results = brokerResponse.getResultTable().getRows().get(0);
+ assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], _expectedCovWithFilter, RELATIVE_EPSILON));
+ }
+
+ @Test
+ public void testAggregationGroupBy() {
+
+ // Inner Segment
+ // case 1: (col1, groupByCol) group by groupByCol => all covariances are 0's
+ String query =
+ "SELECT COVAR_POP(doubleColumnX, groupByColumn) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn";
+ GroupByOperator groupByOperator = getOperator(query);
+ GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
+ QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
+ NUM_RECORDS * 2, NUM_RECORDS);
+ AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
+ assertNotNull(aggregationGroupByResult);
+ for (int i = 0; i < NUM_GROUPS; i++) {
+ CovarianceTuple actualCovTuple = (CovarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i);
+ CovarianceTuple expectedCovTuple = _expectedGroupByResultVer1[i];
+ checkWithPrecision(actualCovTuple, expectedCovTuple);
+ }
+
+ // Inter Segment with 4 identical segments
+ _useIdenticalSegment = true;
+ BrokerResponseNative brokerResponse = getBrokerResponse(query);
+ checkGroupByResults(brokerResponse, _expectedFinalResultVer1);
+ _useIdenticalSegment = false;
+ // Inter Segment with 4 distinct segments
+ brokerResponse = getBrokerResponse(query);
+ checkGroupByResults(brokerResponse, _expectedFinalResultVer1);
+
+ // Inner Segment
+ // case 2: COVAR_POP(col1, col2) group by groupByCol => nondeterministic cov
+ query =
+ "SELECT COVAR_POP(doubleColumnX, doubleColumnY) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn";
+ groupByOperator = getOperator(query);
+ resultsBlock = groupByOperator.nextBlock();
+ QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
+ NUM_RECORDS * 3, NUM_RECORDS);
+ aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
+ assertNotNull(aggregationGroupByResult);
+
+ for (int i = 0; i < NUM_GROUPS; i++) {
+ CovarianceTuple actualCovTuple = (CovarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i);
+ CovarianceTuple expectedCovTuple = _expectedGroupByResultVer2[i];
+ checkWithPrecision(actualCovTuple, expectedCovTuple);
+ }
+
+ // Inter Segment with 4 identical segments
+ _useIdenticalSegment = true;
+ brokerResponse = getBrokerResponse(query);
+ checkGroupByResults(brokerResponse, _expectedFinalResultVer2);
+ _useIdenticalSegment = false;
+ // Inter Segment with 4 distinct segments
+ brokerResponse = getBrokerResponse(query);
+ checkGroupByResults(brokerResponse, _expectedFinalResultVer2);
+ }
+
+ private void checkWithPrecision(CovarianceTuple tuple, double sumX, double sumY, double sumXY, int count) {
+ assertEquals(tuple.getCount(), count);
+ assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumX(), sumX, RELATIVE_EPSILON));
+ assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumY(), sumY, RELATIVE_EPSILON));
+ assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumXY(), sumXY, RELATIVE_EPSILON));
+ }
+
+ private void checkWithPrecision(CovarianceTuple actual, CovarianceTuple expected) {
+ checkWithPrecision(actual, expected.getSumX(), expected.getSumY(), expected.getSumXY(), (int) expected.getCount());
+ }
+
+ private void checkResultTableWithPrecision(BrokerResponseNative brokerResponse) {
+ Object[] results = brokerResponse.getResultTable().getRows().get(0);
+ assertEquals(results.length, 8);
+ assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], _expectedCovIntXY, RELATIVE_EPSILON));
+ assertTrue(Precision.equalsWithRelativeTolerance((double) results[1], _expectedCovDoubleXY, RELATIVE_EPSILON));
+ assertTrue(Precision.equalsWithRelativeTolerance((double) results[2], _expectedCovIntDouble, RELATIVE_EPSILON));
+ assertTrue(Precision.equalsWithRelativeTolerance((double) results[3], _expectedCovIntLong, RELATIVE_EPSILON));
+ assertTrue(Precision.equalsWithRelativeTolerance((double) results[4], _expectedCovIntFloat, RELATIVE_EPSILON));
+ assertTrue(Precision.equalsWithRelativeTolerance((double) results[5], _expectedCovDoubleLong, RELATIVE_EPSILON));
+ assertTrue(Precision.equalsWithRelativeTolerance((double) results[6], _expectedCovDoubleFloat, RELATIVE_EPSILON));
+ assertTrue(Precision.equalsWithRelativeTolerance((double) results[7], _expectedCovLongFloat, RELATIVE_EPSILON));
+ }
+
+ private void checkGroupByResults(BrokerResponseNative brokerResponse, double[] expectedResults) {
+ ResultTable resultTable = brokerResponse.getResultTable();
+ List<Object[]> rows = resultTable.getRows();
+ for (int i = 0; i < NUM_GROUPS; i++) {
+ assertTrue(Precision.equals((double) rows.get(i)[0], expectedResults[i], DELTA));
+ }
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ _indexSegment.destroy();
+ for (List<IndexSegment> indexList : _distinctInstances) {
+ for (IndexSegment seg : indexList) {
+ seg.destroy();
+ }
+ }
+ FileUtils.deleteDirectory(INDEX_DIR);
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java
deleted file mode 100644
index d61dcaa18f..0000000000
--- a/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java
+++ /dev/null
@@ -1,749 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.queries;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.math3.stat.correlation.Covariance;
-import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
-import org.apache.commons.math3.stat.descriptive.moment.Variance;
-import org.apache.commons.math3.util.Precision;
-import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.response.broker.ResultTable;
-import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
-import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
-import org.apache.pinot.core.operator.query.AggregationOperator;
-import org.apache.pinot.core.operator.query.GroupByOperator;
-import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
-import org.apache.pinot.segment.local.customobject.CovarianceTuple;
-import org.apache.pinot.segment.local.customobject.VarianceTuple;
-import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.ImmutableSegment;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-
-/**
- * Queries test for statistical queries (i.e Variance, Covariance, Standard Deviation etc)
- */
-public class StatisticalQueriesTest extends BaseQueriesTest {
- private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "CovarianceQueriesTest");
- private static final String RAW_TABLE_NAME = "testTable";
- private static final String SEGMENT_NAME = "testSegment";
-
- // test segments 1-4 evenly divide testSegment into 4 distinct segments
- private static final String SEGMENT_NAME_1 = "testSegment1";
- private static final String SEGMENT_NAME_2 = "testSegment2";
- private static final String SEGMENT_NAME_3 = "testSegment3";
- private static final String SEGMENT_NAME_4 = "testSegment4";
-
- private static final int NUM_RECORDS = 2000;
- private static final int NUM_GROUPS = 10;
- private static final int MAX_VALUE = 500;
- private static final double RELATIVE_EPSILON = 0.0001;
- private static final double DELTA = 0.0001;
-
- private static final String INT_COLUMN_X = "intColumnX";
- private static final String INT_COLUMN_Y = "intColumnY";
- private static final String DOUBLE_COLUMN_X = "doubleColumnX";
- private static final String DOUBLE_COLUMN_Y = "doubleColumnY";
- private static final String LONG_COLUMN = "longColumn";
- private static final String FLOAT_COLUMN = "floatColumn";
- private static final String GROUP_BY_COLUMN = "groupByColumn";
-
- private static final Schema SCHEMA =
- new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN_X, FieldSpec.DataType.INT)
- .addSingleValueDimension(INT_COLUMN_Y, FieldSpec.DataType.INT)
- .addSingleValueDimension(DOUBLE_COLUMN_X, FieldSpec.DataType.DOUBLE)
- .addSingleValueDimension(DOUBLE_COLUMN_Y, FieldSpec.DataType.DOUBLE)
- .addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG)
- .addSingleValueDimension(FLOAT_COLUMN, FieldSpec.DataType.FLOAT)
- .addSingleValueDimension(GROUP_BY_COLUMN, FieldSpec.DataType.DOUBLE).build();
- private static final TableConfig TABLE_CONFIG =
- new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
-
- private IndexSegment _indexSegment;
- private List<IndexSegment> _indexSegments;
- private List<List<IndexSegment>> _distinctInstances;
- private int _sumIntX = 0;
- private int _sumIntY = 0;
- private int _sumIntXY = 0;
-
- private double _sumDoubleX = 0;
- private double _sumDoubleY = 0;
- private double _sumDoubleXY = 0;
-
- private long _sumLong = 0L;
- private double _sumFloat = 0;
-
- private double _sumIntDouble = 0;
- private long _sumIntLong = 0L;
- private double _sumIntFloat = 0;
- private double _sumDoubleLong = 0;
- private double _sumDoubleFloat = 0;
- private double _sumLongFloat = 0;
-
- private double _expectedCovIntXY;
- private double _expectedCovDoubleXY;
- private double _expectedCovIntDouble;
- private double _expectedCovIntLong;
- private double _expectedCovIntFloat;
- private double _expectedCovDoubleLong;
- private double _expectedCovDoubleFloat;
- private double _expectedCovLongFloat;
-
- private double _expectedCovWithFilter;
-
- private final CovarianceTuple[] _expectedGroupByResultVer1 = new CovarianceTuple[NUM_GROUPS];
- private final CovarianceTuple[] _expectedGroupByResultVer2 = new CovarianceTuple[NUM_GROUPS];
- private final double[] _expectedFinalResultVer1 = new double[NUM_GROUPS];
- private final double[] _expectedFinalResultVer2 = new double[NUM_GROUPS];
-
- private boolean _useIdenticalSegment = false;
-
- int[] _intColX = new int[NUM_RECORDS];
- int[] _intColY = new int[NUM_RECORDS];
- long[] _longCol = new long[NUM_RECORDS];
- double[] _floatCol = new double[NUM_RECORDS];
- double[] _doubleColX = new double[NUM_RECORDS];
- double[] _doubleColY = new double[NUM_RECORDS];
- double[] _groupByCol = new double[NUM_RECORDS];
-
- @Override
- protected String getFilter() {
- // filter out half of the rows based on group id
- return " WHERE groupByColumn < " + (NUM_GROUPS / 2);
- }
-
- @Override
- protected IndexSegment getIndexSegment() {
- return _indexSegment;
- }
-
- @Override
- protected List<IndexSegment> getIndexSegments() {
- return _indexSegments;
- }
-
- @Override
- protected List<List<IndexSegment>> getDistinctInstances() {
- if (_useIdenticalSegment) {
- return Collections.singletonList(_indexSegments);
- }
- return _distinctInstances;
- }
-
- @BeforeClass
- public void setUp()
- throws Exception {
- FileUtils.deleteDirectory(INDEX_DIR);
-
- List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
-
- Random rand = new Random();
- _intColX = rand.ints(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
- _intColY = rand.ints(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
- _doubleColX = rand.doubles(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
- _doubleColY = rand.doubles(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
- _longCol = rand.longs(NUM_RECORDS, -MAX_VALUE, MAX_VALUE).toArray();
-
- int groupSize = NUM_RECORDS / NUM_GROUPS;
- double sumX = 0;
- double sumY = 0;
- double sumGroupBy = 0;
- double sumXY = 0;
- double sumXGroupBy = 0;
- int groupByVal = 0;
-
- for (int i = 0; i < NUM_RECORDS; i++) {
- GenericRow record = new GenericRow();
- int intX = _intColX[i];
- int intY = _intColY[i];
- double doubleX = _doubleColX[i];
- double doubleY = _doubleColY[i];
- long longVal = _longCol[i];
- float floatVal = -MAX_VALUE + rand.nextFloat() * 2 * MAX_VALUE;
-
- // set up inner segment group by results
- groupByVal = (int) Math.floor(i / groupSize);
- if (i % groupSize == 0 && groupByVal > 0) {
- _expectedGroupByResultVer1[groupByVal - 1] = new CovarianceTuple(sumX, sumGroupBy, sumXGroupBy, groupSize);
- _expectedGroupByResultVer2[groupByVal - 1] = new CovarianceTuple(sumX, sumY, sumXY, groupSize);
- sumX = 0;
- sumY = 0;
- sumGroupBy = 0;
- sumXY = 0;
- sumXGroupBy = 0;
- }
-
- sumX += doubleX;
- sumY += doubleY;
- sumGroupBy += groupByVal;
- sumXY += doubleX * doubleY;
- sumXGroupBy += doubleX * groupByVal;
-
- _floatCol[i] = floatVal;
- _groupByCol[i] = groupByVal;
-
- // calculate inner segment results
- _sumIntX += intX;
- _sumIntY += intY;
- _sumDoubleX += doubleX;
- _sumDoubleY += doubleY;
- _sumLong += longVal;
- _sumFloat += floatVal;
- _sumIntXY += intX * intY;
- _sumDoubleXY += doubleX * doubleY;
- _sumIntDouble += intX * doubleX;
- _sumIntLong += intX * longVal;
- _sumIntFloat += intX * _floatCol[i];
- _sumDoubleLong += doubleX * longVal;
- _sumDoubleFloat += doubleX * _floatCol[i];
- _sumLongFloat += longVal * _floatCol[i];
-
- record.putValue(INT_COLUMN_X, intX);
- record.putValue(INT_COLUMN_Y, intY);
- record.putValue(DOUBLE_COLUMN_X, doubleX);
- record.putValue(DOUBLE_COLUMN_Y, doubleY);
- record.putValue(LONG_COLUMN, longVal);
- record.putValue(FLOAT_COLUMN, floatVal);
- record.putValue(GROUP_BY_COLUMN, groupByVal);
- records.add(record);
- }
- _expectedGroupByResultVer1[groupByVal] = new CovarianceTuple(sumX, sumGroupBy, sumXGroupBy, groupSize);
- _expectedGroupByResultVer2[groupByVal] = new CovarianceTuple(sumX, sumY, sumXY, groupSize);
-
- // calculate inter segment result
- Covariance cov = new Covariance();
- double[] newIntColX = Arrays.stream(_intColX).asDoubleStream().toArray();
- double[] newIntColY = Arrays.stream(_intColY).asDoubleStream().toArray();
- double[] newLongCol = Arrays.stream(_longCol).asDoubleStream().toArray();
- _expectedCovIntXY = cov.covariance(newIntColX, newIntColY, false);
- _expectedCovDoubleXY = cov.covariance(_doubleColX, _doubleColY, false);
- _expectedCovIntDouble = cov.covariance(newIntColX, _doubleColX, false);
- _expectedCovIntLong = cov.covariance(newIntColX, newLongCol, false);
- _expectedCovIntFloat = cov.covariance(newIntColX, _floatCol, false);
- _expectedCovDoubleLong = cov.covariance(_doubleColX, newLongCol, false);
- _expectedCovDoubleFloat = cov.covariance(_doubleColX, _floatCol, false);
- _expectedCovLongFloat = cov.covariance(newLongCol, _floatCol, false);
-
- double[] filteredX = Arrays.copyOfRange(_doubleColX, 0, NUM_RECORDS / 2);
- double[] filteredY = Arrays.copyOfRange(_doubleColY, 0, NUM_RECORDS / 2);
- _expectedCovWithFilter = cov.covariance(filteredX, filteredY, false);
-
- // calculate inter segment group by results
- for (int i = 0; i < NUM_GROUPS; i++) {
- double[] colX = Arrays.copyOfRange(_doubleColX, i * groupSize, (i + 1) * groupSize);
- double[] colGroupBy = Arrays.copyOfRange(_groupByCol, i * groupSize, (i + 1) * groupSize);
- double[] colY = Arrays.copyOfRange(_doubleColY, i * groupSize, (i + 1) * groupSize);
- _expectedFinalResultVer1[i] = cov.covariance(colX, colGroupBy, false);
- _expectedFinalResultVer2[i] = cov.covariance(colX, colY, false);
- }
-
- // generate testSegment
- ImmutableSegment immutableSegment = setUpSingleSegment(records, SEGMENT_NAME);
- _indexSegment = immutableSegment;
- _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
-
- // divide testSegment into 4 distinct segments for distinct inter segment tests
- // by doing so, we can avoid calculating global covariance again
- _distinctInstances = new ArrayList<>();
- int segmentSize = NUM_RECORDS / 4;
- ImmutableSegment immutableSegment1 = setUpSingleSegment(records.subList(0, segmentSize), SEGMENT_NAME_1);
- ImmutableSegment immutableSegment2 =
- setUpSingleSegment(records.subList(segmentSize, segmentSize * 2), SEGMENT_NAME_2);
- ImmutableSegment immutableSegment3 =
- setUpSingleSegment(records.subList(segmentSize * 2, segmentSize * 3), SEGMENT_NAME_3);
- ImmutableSegment immutableSegment4 =
- setUpSingleSegment(records.subList(segmentSize * 3, NUM_RECORDS), SEGMENT_NAME_4);
- // generate 2 instances each with 2 distinct segments
- _distinctInstances.add(Arrays.asList(immutableSegment1, immutableSegment2));
- _distinctInstances.add(Arrays.asList(immutableSegment3, immutableSegment4));
- }
-
- private ImmutableSegment setUpSingleSegment(List<GenericRow> recordSet, String segmentName)
- throws Exception {
- SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
- segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
- segmentGeneratorConfig.setSegmentName(segmentName);
- segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
-
- SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
- driver.init(segmentGeneratorConfig, new GenericRowRecordReader(recordSet));
- driver.build();
-
- return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap);
- }
-
- @Test
- public void testCovarianceAggregationOnly() {
- // Inner Segment
- String query =
- "SELECT COVAR_POP(intColumnX, intColumnY), COVAR_POP(doubleColumnX, doubleColumnY), COVAR_POP(intColumnX, "
- + "doubleColumnX), " + "COVAR_POP(intColumnX, longColumn), COVAR_POP(intColumnX, floatColumn), "
- + "COVAR_POP(doubleColumnX, longColumn), COVAR_POP(doubleColumnX, floatColumn), COVAR_POP(longColumn, "
- + "floatColumn) FROM testTable";
- AggregationOperator aggregationOperator = getOperator(query);
- AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
- QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0,
- NUM_RECORDS * 6, NUM_RECORDS);
- List<Object> aggregationResult = resultsBlock.getResults();
- assertNotNull(aggregationResult);
- checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(0), _sumIntX, _sumIntY, _sumIntXY,
- NUM_RECORDS);
- checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(1), _sumDoubleX, _sumDoubleY, _sumDoubleXY,
- NUM_RECORDS);
- checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(2), _sumIntX, _sumDoubleX, _sumIntDouble,
- NUM_RECORDS);
- checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(3), _sumIntX, _sumLong, _sumIntLong,
- NUM_RECORDS);
- checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(4), _sumIntX, _sumFloat, _sumIntFloat,
- NUM_RECORDS);
- checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(5), _sumDoubleX, _sumLong, _sumDoubleLong,
- NUM_RECORDS);
- checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(6), _sumDoubleX, _sumFloat, _sumDoubleFloat,
- NUM_RECORDS);
- checkWithPrecisionForCovariance((CovarianceTuple) aggregationResult.get(7), _sumLong, _sumFloat, _sumLongFloat,
- NUM_RECORDS);
-
- // Inter segments with 4 identical segments (2 instances each having 2 identical segments)
- _useIdenticalSegment = true;
- BrokerResponseNative brokerResponse = getBrokerResponse(query);
- _useIdenticalSegment = false;
- assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
- assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
- assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 6 * NUM_RECORDS);
- assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
- checkResultTableWithPrecisionForCovariance(brokerResponse);
-
- // Inter segments with 4 distinct segments (2 instances each having 2 distinct segments)
- brokerResponse = getBrokerResponse(query);
- assertEquals(brokerResponse.getNumDocsScanned(), NUM_RECORDS);
- assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
- assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 6 * NUM_RECORDS);
- assertEquals(brokerResponse.getTotalDocs(), NUM_RECORDS);
- checkResultTableWithPrecisionForCovariance(brokerResponse);
-
- // Inter segments with 4 identical segments with filter
- _useIdenticalSegment = true;
- query = "SELECT COVAR_POP(doubleColumnX, doubleColumnY) FROM testTable" + getFilter();
- brokerResponse = getBrokerResponse(query);
- _useIdenticalSegment = false;
- assertEquals(brokerResponse.getNumDocsScanned(), 2 * NUM_RECORDS);
- assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
- assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * NUM_RECORDS);
- assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
- Object[] results = brokerResponse.getResultTable().getRows().get(0);
- assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], _expectedCovWithFilter, RELATIVE_EPSILON));
- }
-
- @Test
- public void testCovarianceAggregationGroupBy() {
- // Inner Segment
- // case 1: (col1, groupByCol) group by groupByCol => all covariances are 0's
- String query =
- "SELECT COVAR_POP(doubleColumnX, groupByColumn) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn";
- GroupByOperator groupByOperator = getOperator(query);
- GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
- QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
- NUM_RECORDS * 2, NUM_RECORDS);
- AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
- assertNotNull(aggregationGroupByResult);
- for (int i = 0; i < NUM_GROUPS; i++) {
- CovarianceTuple actualCovTuple = (CovarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i);
- CovarianceTuple expectedCovTuple = _expectedGroupByResultVer1[i];
- checkWithPrecisionForCovariance(actualCovTuple, expectedCovTuple);
- }
-
- // Inter Segment with 4 identical segments
- _useIdenticalSegment = true;
- BrokerResponseNative brokerResponse = getBrokerResponse(query);
- checkGroupByResultsForCovariance(brokerResponse, _expectedFinalResultVer1);
- _useIdenticalSegment = false;
- // Inter Segment with 4 distinct segments
- brokerResponse = getBrokerResponse(query);
- checkGroupByResultsForCovariance(brokerResponse, _expectedFinalResultVer1);
-
- // Inner Segment
- // case 2: COVAR_POP(col1, col2) group by groupByCol => nondeterministic cov
- query =
- "SELECT COVAR_POP(doubleColumnX, doubleColumnY) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn";
- groupByOperator = getOperator(query);
- resultsBlock = groupByOperator.nextBlock();
- QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
- NUM_RECORDS * 3, NUM_RECORDS);
- aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
- assertNotNull(aggregationGroupByResult);
-
- for (int i = 0; i < NUM_GROUPS; i++) {
- CovarianceTuple actualCovTuple = (CovarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i);
- CovarianceTuple expectedCovTuple = _expectedGroupByResultVer2[i];
- checkWithPrecisionForCovariance(actualCovTuple, expectedCovTuple);
- }
-
- // Inter Segment with 4 identical segments
- _useIdenticalSegment = true;
- brokerResponse = getBrokerResponse(query);
- checkGroupByResultsForCovariance(brokerResponse, _expectedFinalResultVer2);
- _useIdenticalSegment = false;
- // Inter Segment with 4 distinct segments
- brokerResponse = getBrokerResponse(query);
- checkGroupByResultsForCovariance(brokerResponse, _expectedFinalResultVer2);
- }
-
- @Test
- public void testVarianceAggregationOnly() {
- // Compute the expected values
- Variance[] expectedVariances = new Variance[8];
- for (int i = 0; i < 8; i++) {
- if (i < 4) {
- expectedVariances[i] = new Variance(false);
- } else {
- expectedVariances[i] = new Variance(true);
- }
- }
- for (int i = 0; i < NUM_RECORDS; i++) {
- expectedVariances[0].increment(_intColX[i]);
- expectedVariances[1].increment(_longCol[i]);
- expectedVariances[2].increment(_floatCol[i]);
- expectedVariances[3].increment(_doubleColX[i]);
- expectedVariances[4].increment(_intColX[i]);
- expectedVariances[5].increment(_longCol[i]);
- expectedVariances[6].increment(_floatCol[i]);
- expectedVariances[7].increment(_doubleColX[i]);
- }
- double expectedIntSum = Arrays.stream(_intColX).asDoubleStream().sum();
- double expectedLongSum = Arrays.stream(_longCol).asDoubleStream().sum();
- double expectedFloatSum = 0.0;
- for (int i = 0; i < _floatCol.length; i++) {
- expectedFloatSum += _floatCol[i];
- }
- double expectedDoubleSum = Arrays.stream(_doubleColX).sum();
-
- // Compute the query
- String query = "SELECT VAR_POP(intColumnX), VAR_POP(longColumn), VAR_POP(floatColumn), VAR_POP(doubleColumnX),"
- + "VAR_SAMP(intColumnX), VAR_SAMP(longColumn), VAR_SAMP(floatColumn), VAR_SAMP(doubleColumnX) FROM testTable";
- AggregationOperator aggregationOperator = getOperator(query);
- AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
- QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0,
- NUM_RECORDS * 4, NUM_RECORDS);
- List<Object> aggregationResult = resultsBlock.getResults();
-
- // Validate the aggregation results
- checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(0), NUM_RECORDS, expectedIntSum,
- expectedVariances[0].getResult(), false);
- checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(1), NUM_RECORDS, expectedLongSum,
- expectedVariances[1].getResult(), false);
- checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(2), NUM_RECORDS, expectedFloatSum,
- expectedVariances[2].getResult(), false);
- checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(3), NUM_RECORDS, expectedDoubleSum,
- expectedVariances[3].getResult(), false);
- checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(4), NUM_RECORDS, expectedIntSum,
- expectedVariances[4].getResult(), true);
- checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(5), NUM_RECORDS, expectedLongSum,
- expectedVariances[5].getResult(), true);
- checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(6), NUM_RECORDS, expectedFloatSum,
- expectedVariances[6].getResult(), true);
- checkWithPrecisionForVariance((VarianceTuple) aggregationResult.get(7), NUM_RECORDS, expectedDoubleSum,
- expectedVariances[7].getResult(), true);
-
- // Validate the response
- BrokerResponseNative brokerResponse = getBrokerResponse(query);
- Object[] results = brokerResponse.getResultTable().getRows().get(0);
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[0], expectedVariances[0].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[1], expectedVariances[1].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[2], expectedVariances[2].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[3], expectedVariances[3].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[4], expectedVariances[4].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[5], expectedVariances[5].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[6], expectedVariances[6].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[7], expectedVariances[7].getResult(), RELATIVE_EPSILON));
-
- VarianceTuple test = ((VarianceTuple) aggregationResult.get(0));
- test.apply((new VarianceTuple(0, 0, 0.0d)));
- System.out.println(test.getM2());
- // Validate the response for a query with a filter
- query = "SELECT VAR_POP(intColumnX) from testTable" + getFilter();
- brokerResponse = getBrokerResponse(query);
- brokerResponse.getResultTable();
- results = brokerResponse.getResultTable().getRows().get(0);
- Variance filterExpectedVariance = new Variance(false);
- for (int i = 0; i < NUM_RECORDS / 2; i++) {
- filterExpectedVariance.increment(_intColX[i]);
- }
- assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], filterExpectedVariance.getResult(),
- RELATIVE_EPSILON));
- }
-
- @Test
- public void testVarianceAggregationGroupBy() {
- // Compute expected group results
- Variance[] expectedGroupByResult = new Variance[NUM_GROUPS];
- double[] expectedSum = new double[NUM_GROUPS];
-
- for (int i = 0; i < NUM_GROUPS; i++) {
- expectedGroupByResult[i] = new Variance(false);
- }
- for (int j = 0; j < NUM_RECORDS; j++) {
- int pos = j / (NUM_RECORDS / NUM_GROUPS);
- expectedGroupByResult[pos].increment(_intColX[j]);
- expectedSum[pos] += _intColX[j];
- }
-
- String query = "SELECT VAR_POP(intColumnX) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn";
- GroupByOperator groupByOperator = getOperator(query);
- GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
- QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
- NUM_RECORDS * 2, NUM_RECORDS);
- AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
- assertNotNull(aggregationGroupByResult);
- for (int i = 0; i < NUM_GROUPS; i++) {
-
- VarianceTuple actualVarianceTuple = (VarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i);
- checkWithPrecisionForVariance(actualVarianceTuple, NUM_RECORDS / NUM_GROUPS, expectedSum[i],
- expectedGroupByResult[i].getResult(), false);
- }
- }
-
- @Test
- public void testStandardDeviationAggregationOnly() {
- // Compute the expected values
- StandardDeviation[] expectedStdDevs = new StandardDeviation[8];
- for (int i = 0; i < 8; i++) {
- if (i < 4) {
- expectedStdDevs[i] = new StandardDeviation(false);
- } else {
- expectedStdDevs[i] = new StandardDeviation(true);
- }
- }
- for (int i = 0; i < NUM_RECORDS; i++) {
- expectedStdDevs[0].increment(_intColX[i]);
- expectedStdDevs[1].increment(_longCol[i]);
- expectedStdDevs[2].increment(_floatCol[i]);
- expectedStdDevs[3].increment(_doubleColX[i]);
- expectedStdDevs[4].increment(_intColX[i]);
- expectedStdDevs[5].increment(_longCol[i]);
- expectedStdDevs[6].increment(_floatCol[i]);
- expectedStdDevs[7].increment(_doubleColX[i]);
- }
-
- double expectedIntSum = Arrays.stream(_intColX).asDoubleStream().sum();
- double expectedLongSum = Arrays.stream(_longCol).asDoubleStream().sum();
- double expectedFloatSum = 0.0;
- for (int i = 0; i < _floatCol.length; i++) {
- expectedFloatSum += _floatCol[i];
- }
- double expectedDoubleSum = Arrays.stream(_doubleColX).sum();
-
- // Compute the query
- String query =
- "SELECT STDDEV_POP(intColumnX), STDDEV_POP(longColumn), STDDEV_POP(floatColumn), STDDEV_POP(doubleColumnX),"
- + "STDDEV_SAMP(intColumnX), STDDEV_SAMP(longColumn), STDDEV_SAMP(floatColumn), STDDEV_SAMP(doubleColumnX) "
- + "FROM testTable";
- AggregationOperator aggregationOperator = getOperator(query);
- AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock();
- QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0,
- NUM_RECORDS * 4, NUM_RECORDS);
- List<Object> aggregationResult = resultsBlock.getResults();
-
- // Validate the aggregation results
- checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(0), NUM_RECORDS, expectedIntSum,
- expectedStdDevs[0].getResult(), false);
- checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(1), NUM_RECORDS, expectedLongSum,
- expectedStdDevs[1].getResult(), false);
- checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(2), NUM_RECORDS, expectedFloatSum,
- expectedStdDevs[2].getResult(), false);
- checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(3), NUM_RECORDS, expectedDoubleSum,
- expectedStdDevs[3].getResult(), false);
- checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(4), NUM_RECORDS, expectedIntSum,
- expectedStdDevs[4].getResult(), true);
- checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(5), NUM_RECORDS, expectedLongSum,
- expectedStdDevs[5].getResult(), true);
- checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(6), NUM_RECORDS, expectedFloatSum,
- expectedStdDevs[6].getResult(), true);
- checkWithPrecisionForStandardDeviation((VarianceTuple) aggregationResult.get(7), NUM_RECORDS, expectedDoubleSum,
- expectedStdDevs[7].getResult(), true);
-
- // Validate the response
- BrokerResponseNative brokerResponse = getBrokerResponse(query);
- brokerResponse.getResultTable();
- Object[] results = brokerResponse.getResultTable().getRows().get(0);
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[0], expectedStdDevs[0].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[1], expectedStdDevs[1].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[2], expectedStdDevs[2].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[3], expectedStdDevs[3].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[4], expectedStdDevs[4].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[5], expectedStdDevs[5].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[6], expectedStdDevs[6].getResult(), RELATIVE_EPSILON));
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[7], expectedStdDevs[7].getResult(), RELATIVE_EPSILON));
-
- // Validate the response for a query with a filter
- query = "SELECT STDDEV_POP(intColumnX) from testTable" + getFilter();
- brokerResponse = getBrokerResponse(query);
- brokerResponse.getResultTable();
- results = brokerResponse.getResultTable().getRows().get(0);
- StandardDeviation filterExpectedStdDev = new StandardDeviation(false);
- for (int i = 0; i < NUM_RECORDS / 2; i++) {
- filterExpectedStdDev.increment(_intColX[i]);
- }
- assertTrue(
- Precision.equalsWithRelativeTolerance((double) results[0], filterExpectedStdDev.getResult(), RELATIVE_EPSILON));
- }
-
- @Test
- public void testStandardDeviationAggreagtionGroupBy() {
- // Compute expected group results
- StandardDeviation[] expectedGroupByResult = new StandardDeviation[NUM_GROUPS];
- double[] expectedSum = new double[NUM_GROUPS];
-
- for (int i = 0; i < NUM_GROUPS; i++) {
- expectedGroupByResult[i] = new StandardDeviation(false);
- }
- for (int j = 0; j < NUM_RECORDS; j++) {
- int pos = j / (NUM_RECORDS / NUM_GROUPS);
- expectedGroupByResult[pos].increment(_intColX[j]);
- expectedSum[pos] += _intColX[j];
- }
-
- String query = "SELECT STDDEV_POP(intColumnX) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn";
- GroupByOperator groupByOperator = getOperator(query);
- GroupByResultsBlock resultsBlock = groupByOperator.nextBlock();
- QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0,
- NUM_RECORDS * 2, NUM_RECORDS);
- AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
- assertNotNull(aggregationGroupByResult);
- for (int i = 0; i < NUM_GROUPS; i++) {
- VarianceTuple actualVarianceTuple = (VarianceTuple) aggregationGroupByResult.getResultForGroupId(0, i);
- checkWithPrecisionForStandardDeviation(actualVarianceTuple, NUM_RECORDS / NUM_GROUPS, expectedSum[i],
- expectedGroupByResult[i].getResult(), false);
- }
- }
-
- private void checkWithPrecisionForCovariance(CovarianceTuple tuple, double sumX, double sumY, double sumXY,
- int count) {
- assertEquals(tuple.getCount(), count);
- assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumX(), sumX, RELATIVE_EPSILON));
- assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumY(), sumY, RELATIVE_EPSILON));
- assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSumXY(), sumXY, RELATIVE_EPSILON));
- }
-
- private void checkWithPrecisionForCovariance(CovarianceTuple actual, CovarianceTuple expected) {
- checkWithPrecisionForCovariance(actual, expected.getSumX(), expected.getSumY(), expected.getSumXY(),
- (int) expected.getCount());
- }
-
- private void checkResultTableWithPrecisionForCovariance(BrokerResponseNative brokerResponse) {
- Object[] results = brokerResponse.getResultTable().getRows().get(0);
- assertEquals(results.length, 8);
- assertTrue(Precision.equalsWithRelativeTolerance((double) results[0], _expectedCovIntXY, RELATIVE_EPSILON));
- assertTrue(Precision.equalsWithRelativeTolerance((double) results[1], _expectedCovDoubleXY, RELATIVE_EPSILON));
- assertTrue(Precision.equalsWithRelativeTolerance((double) results[2], _expectedCovIntDouble, RELATIVE_EPSILON));
- assertTrue(Precision.equalsWithRelativeTolerance((double) results[3], _expectedCovIntLong, RELATIVE_EPSILON));
- assertTrue(Precision.equalsWithRelativeTolerance((double) results[4], _expectedCovIntFloat, RELATIVE_EPSILON));
- assertTrue(Precision.equalsWithRelativeTolerance((double) results[5], _expectedCovDoubleLong, RELATIVE_EPSILON));
- assertTrue(Precision.equalsWithRelativeTolerance((double) results[6], _expectedCovDoubleFloat, RELATIVE_EPSILON));
- assertTrue(Precision.equalsWithRelativeTolerance((double) results[7], _expectedCovLongFloat, RELATIVE_EPSILON));
- }
-
- private void checkGroupByResultsForCovariance(BrokerResponseNative brokerResponse, double[] expectedResults) {
- ResultTable resultTable = brokerResponse.getResultTable();
- List<Object[]> rows = resultTable.getRows();
- for (int i = 0; i < NUM_GROUPS; i++) {
- assertTrue(Precision.equals((double) rows.get(i)[0], expectedResults[i], DELTA));
- }
- }
-
- private void checkWithPrecisionForVariance(VarianceTuple tuple, int expectedCount, double expectedSum,
- double expectedVariance, boolean isBiasCorrected) {
- assertEquals(tuple.getCount(), expectedCount);
- assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSum(), expectedSum, RELATIVE_EPSILON));
- if (!isBiasCorrected) {
- assertTrue(
- Precision.equalsWithRelativeTolerance(tuple.getM2(), expectedVariance * expectedCount, RELATIVE_EPSILON));
- } else {
- assertTrue(Precision.equalsWithRelativeTolerance(tuple.getM2(), expectedVariance * (expectedCount - 1),
- RELATIVE_EPSILON));
- }
- }
-
- private void checkWithPrecisionForStandardDeviation(VarianceTuple tuple, int expectedCount, double expectedSum,
- double expectedStdDev, boolean isBiasCorrected) {
- assertEquals(tuple.getCount(), expectedCount);
- assertTrue(Precision.equalsWithRelativeTolerance(tuple.getSum(), expectedSum, RELATIVE_EPSILON));
- if (!isBiasCorrected) {
- assertTrue(Precision.equalsWithRelativeTolerance(tuple.getM2(), expectedStdDev * expectedStdDev * expectedCount,
- RELATIVE_EPSILON));
- } else {
- assertTrue(
- Precision.equalsWithRelativeTolerance(tuple.getM2(), expectedStdDev * expectedStdDev * (expectedCount - 1),
- RELATIVE_EPSILON));
- }
- }
-
- @AfterClass
- public void tearDown()
- throws IOException {
- _indexSegment.destroy();
- for (List<IndexSegment> indexList : _distinctInstances) {
- for (IndexSegment seg : indexList) {
- seg.destroy();
- }
- }
- FileUtils.deleteDirectory(INDEX_DIR);
- }
-}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/VarianceTuple.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/VarianceTuple.java
deleted file mode 100644
index 09594364b4..0000000000
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/VarianceTuple.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.customobject;
-
-import java.nio.ByteBuffer;
-
-
-public class VarianceTuple implements Comparable<VarianceTuple> {
- private long _count;
- private double _sum;
- private double _m2;
-
- public VarianceTuple(long count, double sum, double m2) {
- _count = count;
- _sum = sum;
- _m2 = m2;
- }
-
- public void apply(long count, double sum, double m2) {
- if (count == 0) {
- return;
- }
- double delta = (sum / count) - (_sum / _count);
- _m2 += m2 + delta * delta * count * _count / (count + _count);
- _count += count;
- _sum += sum;
- }
-
- public void apply(VarianceTuple varianceTuple) {
- if (varianceTuple._count == 0) {
- return;
- }
- double delta = (varianceTuple._sum / varianceTuple._count) - (_sum / _count);
- _m2 += varianceTuple._m2 + delta * delta * varianceTuple._count * _count / (varianceTuple._count + _count);
- _count += varianceTuple._count;
- _sum += varianceTuple._sum;
- }
-
- public long getCount() {
- return _count;
- }
-
- public double getSum() {
- return _sum;
- }
-
- public double getM2() {
- return _m2;
- }
-
- public byte[] toBytes() {
- ByteBuffer byteBuffer = ByteBuffer.allocate(Double.BYTES * 2 + Long.BYTES);
- byteBuffer.putLong(_count);
- byteBuffer.putDouble(_sum);
- byteBuffer.putDouble(_m2);
- return byteBuffer.array();
- }
-
- public static VarianceTuple fromBytes(byte[] bytes) {
- return fromByteBuffer(ByteBuffer.wrap(bytes));
- }
-
- public static VarianceTuple fromByteBuffer(ByteBuffer byteBuffer) {
- return new VarianceTuple(byteBuffer.getLong(), byteBuffer.getDouble(), byteBuffer.getDouble());
- }
-
- @Override
- public int compareTo(VarianceTuple varianceTuple) {
- if (_count == 0) {
- if (varianceTuple._count == 0) {
- return 0;
- } else {
- return -1;
- }
- } else {
- if (varianceTuple._count == 0) {
- return 1;
- } else {
- if (_m2 > varianceTuple._m2) {
- return 1;
- }
- if (_m2 < varianceTuple._m2) {
- return -1;
- }
- return 0;
- }
- }
- }
-}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 584c99a7b2..19d2f4d6e5 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -59,10 +59,6 @@ public enum AggregationFunctionType {
HISTOGRAM("histogram"),
COVARPOP("covarPop"),
COVARSAMP("covarSamp"),
- VARPOP("varPop"),
- VARSAMP("varSamp"),
- STDDEVPOP("stdDevPop"),
- STDDEVSAMP("stdDevSamp"),
// Geo aggregation functions
STUNION("STUnion"),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org