You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2019/06/25 18:40:47 UTC
[incubator-pinot] branch master updated: Adding support for udfs in
Selection (#4346)
This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b75a4c0 Adding support for udfs in Selection (#4346)
b75a4c0 is described below
commit b75a4c00169d68f9aabc87b01424664b73081ed3
Author: Kishore Gopalakrishna <g....@gmail.com>
AuthorDate: Tue Jun 25 11:40:43 2019 -0700
Adding support for udfs in Selection (#4346)
* Adding support for udfs in Selection (#22)
* Adding support for udfs in Selection
* Addressing review comments
* Adding license headers
* Adding support for bytes type in order by
* Minor fixes for selection-udf. Adding test cases (#25)
* More fixes for UDF in order by
* Fixing bugs in Order By. Enhancing test case to be independent of the column order in the result
* Addressing review comments and deleting old SelectionOnly and SelectionOrderBy operators
* Removing commented line
---
.../common}/function/AggregationFunctionType.java | 5 +-
.../function/FunctionDefinitionRegistry.java | 36 +++
.../pinot/common/utils/request/RequestUtils.java | 3 +-
.../parsers/PinotQuery2BrokerRequestConverter.java | 19 +-
.../pinot/pql/parsers/pql2/ast/OrderByAstNode.java | 5 +-
.../pql/parsers/pql2/ast/OutputColumnAstNode.java | 20 +-
.../core/data/aggregator/AvgValueAggregator.java | 2 +-
.../core/data/aggregator/CountValueAggregator.java | 2 +-
.../DistinctCountHLLValueAggregator.java | 2 +-
.../core/data/aggregator/MaxValueAggregator.java | 2 +-
.../aggregator/MinMaxRangeValueAggregator.java | 2 +-
.../core/data/aggregator/MinValueAggregator.java | 2 +-
.../aggregator/PercentileEstValueAggregator.java | 2 +-
.../PercentileTDigestValueAggregator.java | 2 +-
.../core/data/aggregator/SumValueAggregator.java | 2 +-
.../core/data/aggregator/ValueAggregator.java | 2 +-
.../data/aggregator/ValueAggregatorFactory.java | 2 +-
.../query/DictionaryBasedAggregationOperator.java | 2 +-
.../query/MetadataBasedAggregationOperator.java | 2 +-
.../core/operator/query/SelectionOnlyOperator.java | 109 -------
.../core/operator/query/SelectionOperator.java | 282 +++++++++++++++++
.../operator/query/SelectionOrderByOperator.java | 116 -------
.../transform/TransformBlockDataFetcher.java | 340 +++++++++++++++++++++
.../core/operator/transform/TransformOperator.java | 13 +-
.../plan/MetadataBasedAggregationPlanNode.java | 2 +-
.../apache/pinot/core/plan/SelectionPlanNode.java | 38 +--
.../apache/pinot/core/plan/TransformPlanNode.java | 50 ++-
.../core/plan/maker/InstancePlanMakerImplV2.java | 2 +-
.../aggregation/DefaultAggregationExecutor.java | 2 +-
.../aggregation/function/AggregationFunction.java | 1 +
.../function/AggregationFunctionFactory.java | 1 +
.../function/AggregationFunctionUtils.java | 1 +
.../function/AvgAggregationFunction.java | 1 +
.../function/AvgMVAggregationFunction.java | 1 +
.../function/CountAggregationFunction.java | 1 +
.../function/CountMVAggregationFunction.java | 1 +
.../function/DistinctCountAggregationFunction.java | 1 +
.../DistinctCountHLLAggregationFunction.java | 1 +
.../DistinctCountHLLMVAggregationFunction.java | 1 +
.../DistinctCountMVAggregationFunction.java | 1 +
.../DistinctCountRawHLLAggregationFunction.java | 1 +
.../DistinctCountRawHLLMVAggregationFunction.java | 1 +
.../function/FastHLLAggregationFunction.java | 1 +
.../function/MaxAggregationFunction.java | 1 +
.../function/MaxMVAggregationFunction.java | 1 +
.../function/MinAggregationFunction.java | 1 +
.../function/MinMVAggregationFunction.java | 1 +
.../function/MinMaxRangeAggregationFunction.java | 1 +
.../function/MinMaxRangeMVAggregationFunction.java | 1 +
.../function/PercentileAggregationFunction.java | 1 +
.../function/PercentileEstAggregationFunction.java | 1 +
.../PercentileEstMVAggregationFunction.java | 1 +
.../function/PercentileMVAggregationFunction.java | 1 +
.../PercentileTDigestAggregationFunction.java | 1 +
.../PercentileTDigestMVAggregationFunction.java | 1 +
.../function/SumAggregationFunction.java | 1 +
.../function/SumMVAggregationFunction.java | 1 +
.../groupby/DefaultGroupByExecutor.java | 2 +-
.../core/query/request/ServerQueryRequest.java | 18 +-
.../query/selection/SelectionOperatorUtils.java | 4 +-
.../executor/StarTreeAggregationExecutor.java | 2 +-
.../startree/executor/StarTreeGroupByExecutor.java | 2 +-
.../startree/plan/StarTreeTransformPlanNode.java | 3 +-
.../startree/v2/AggregationFunctionColumnPair.java | 2 +-
.../startree/v2/builder/BaseSingleTreeBuilder.java | 2 +-
.../startree/v2/store/StarTreeLoaderUtils.java | 2 +-
.../function/AggregationFunctionFactoryTest.java | 1 +
.../function/AggregationFunctionTypeTest.java | 1 +
.../v2/AggregationFunctionColumnPairTest.java | 2 +-
.../pinot/core/startree/v2/BaseStarTreeV2Test.java | 2 +-
...InnerSegmentSelectionMultiValueQueriesTest.java | 123 +++++---
...nnerSegmentSelectionSingleValueQueriesTest.java | 146 +++++----
.../DefaultAggregationExecutorTest.java | 2 +-
.../tests/OfflineClusterIntegrationTest.java | 86 +++++-
.../tests/StarTreeV2ClusterIntegrationTest.java | 2 +-
75 files changed, 1072 insertions(+), 426 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
similarity index 93%
rename from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionType.java
rename to pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index d4b7ef8..03b39c9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -16,10 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.core.query.aggregation.function;
+package org.apache.pinot.common.function;
import javax.annotation.Nonnull;
-import org.apache.pinot.core.query.exception.BadQueryRequestException;
public enum AggregationFunctionType {
@@ -99,7 +98,7 @@ public enum AggregationFunctionType {
return AggregationFunctionType.valueOf(upperCaseFunctionName);
}
} catch (Exception e) {
- throw new BadQueryRequestException("Invalid aggregation function name: " + functionName);
+ throw new UnsupportedOperationException("Invalid aggregation function name: " + functionName);
}
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionDefinitionRegistry.java b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionDefinitionRegistry.java
new file mode 100644
index 0000000..f651162
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionDefinitionRegistry.java
@@ -0,0 +1,36 @@
+package org.apache.pinot.common.function;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Place where all functions are registered.
+ */
+public class FunctionDefinitionRegistry {
+
+ public static boolean isAggFunc(String functionName) {
+
+ try {
+ AggregationFunctionType.getAggregationFunctionType(functionName);
+ return true;
+ } catch (Exception e) {
+ //ignore
+ }
+ return false;
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
index 1b3f110..58a2074 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
@@ -21,6 +21,7 @@ package org.apache.pinot.common.utils.request;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -248,7 +249,7 @@ public class RequestUtils {
* Extracts all columns from the given selection, '*' will be ignored.
*/
public static Set<String> extractSelectionColumns(Selection selection) {
- Set<String> selectionColumns = new HashSet<>();
+ Set<String> selectionColumns = new LinkedHashSet<>();
for (String selectionColumn : selection.getSelectionColumns()) {
if (!selectionColumn.equals("*")) {
selectionColumns.add(selectionColumn);
diff --git a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/PinotQuery2BrokerRequestConverter.java b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/PinotQuery2BrokerRequestConverter.java
index 7724d87..a157d8e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/PinotQuery2BrokerRequestConverter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/PinotQuery2BrokerRequestConverter.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.common.function.FunctionDefinitionRegistry;
import org.apache.pinot.common.request.AggregationInfo;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Expression;
@@ -127,11 +128,21 @@ public class PinotQuery2BrokerRequestConverter {
selection.addToSelectionColumns(expression.getIdentifier().getName());
break;
case FUNCTION:
- AggregationInfo aggInfo = buildAggregationInfo(expression.getFunctionCall());
- if (aggregationInfoList == null) {
- aggregationInfoList = new ArrayList<>();
+
+ Function functionCall = expression.getFunctionCall();
+ String functionName = functionCall.getOperator();
+ if (FunctionDefinitionRegistry.isAggFunc(functionName)) {
+ AggregationInfo aggInfo = buildAggregationInfo(functionCall);
+ if (aggregationInfoList == null) {
+ aggregationInfoList = new ArrayList<>();
+ }
+ aggregationInfoList.add(aggInfo);
+ } else {
+ if (selection == null) {
+ selection = new Selection();
+ }
+ selection.addToSelectionColumns(standardizeExpression(expression, false));
}
- aggregationInfoList.add(aggInfo);
break;
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OrderByAstNode.java b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OrderByAstNode.java
index ee0677a..e776c24 100644
--- a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OrderByAstNode.java
+++ b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OrderByAstNode.java
@@ -24,6 +24,7 @@ import org.apache.pinot.common.request.Function;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.request.Selection;
import org.apache.pinot.common.request.SelectionSort;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.pql.parsers.Pql2CompilationException;
@@ -40,7 +41,9 @@ public class OrderByAstNode extends BaseAstNode {
if (astNode instanceof OrderByExpressionAstNode) {
OrderByExpressionAstNode node = (OrderByExpressionAstNode) astNode;
SelectionSort elem = new SelectionSort();
- elem.setColumn(node.getColumn());
+ TransformExpressionTree transformExpressionTree =
+ TransformExpressionTree.compileToExpressionTree(node.getColumn());
+ elem.setColumn(transformExpressionTree.toString());
elem.setIsAsc("asc".equalsIgnoreCase(node.getOrdering()));
selections.addToSelectionSortSequence(elem);
} else {
diff --git a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OutputColumnAstNode.java b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OutputColumnAstNode.java
index 3798fbd..d53e201 100644
--- a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OutputColumnAstNode.java
+++ b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OutputColumnAstNode.java
@@ -18,10 +18,13 @@
*/
package org.apache.pinot.pql.parsers.pql2.ast;
+import org.apache.pinot.common.function.FunctionDefinitionRegistry;
+import org.apache.pinot.common.request.AggregationInfo;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.request.Selection;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.pql.parsers.Pql2CompilationException;
@@ -33,10 +36,21 @@ public class OutputColumnAstNode extends BaseAstNode {
@Override
public void updateBrokerRequest(BrokerRequest brokerRequest) {
for (AstNode astNode : getChildren()) {
- // If the column is a function call, it must be an aggregation function
if (astNode instanceof FunctionCallAstNode) {
- FunctionCallAstNode node = (FunctionCallAstNode) astNode;
- brokerRequest.addToAggregationsInfo(node.buildAggregationInfo());
+ String functionName = ((FunctionCallAstNode) astNode).getName();
+ if (FunctionDefinitionRegistry.isAggFunc(functionName)) {
+ FunctionCallAstNode node = (FunctionCallAstNode) astNode;
+ brokerRequest.addToAggregationsInfo(node.buildAggregationInfo());
+ } else {
+ Selection selection = brokerRequest.getSelections();
+ if (selection == null) {
+ selection = new Selection();
+ brokerRequest.setSelections(selection);
+ }
+ TransformExpressionTree transformExpressionTree =
+ TransformExpressionTree.compileToExpressionTree(((FunctionCallAstNode) astNode).getExpression());
+ selection.addToSelectionColumns(transformExpressionTree.toString());
+ }
} else if (astNode instanceof IdentifierAstNode) {
Selection selection = brokerRequest.getSelections();
if (selection == null) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
index e5841f7..7e57ddb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
@@ -20,7 +20,7 @@ package org.apache.pinot.core.data.aggregator;
import org.apache.pinot.common.data.FieldSpec.DataType;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/CountValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/CountValueAggregator.java
index dfd130b..820e303 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/CountValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/CountValueAggregator.java
@@ -19,7 +19,7 @@
package org.apache.pinot.core.data.aggregator;
import org.apache.pinot.common.data.FieldSpec.DataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
public class CountValueAggregator implements ValueAggregator<Void, Long> {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
index 15ad278..5b5e8c8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
@@ -22,7 +22,7 @@ import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import org.apache.pinot.common.data.FieldSpec.DataType;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.query.aggregation.function.DistinctCountHLLAggregationFunction;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MaxValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MaxValueAggregator.java
index 24ee224..b6b2280 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MaxValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MaxValueAggregator.java
@@ -19,7 +19,7 @@
package org.apache.pinot.core.data.aggregator;
import org.apache.pinot.common.data.FieldSpec.DataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
public class MaxValueAggregator implements ValueAggregator<Number, Double> {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
index a6a8d55..60e2d94 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
@@ -20,7 +20,7 @@ package org.apache.pinot.core.data.aggregator;
import org.apache.pinot.common.data.FieldSpec.DataType;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinValueAggregator.java
index b5153a7..5b15f9a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinValueAggregator.java
@@ -19,7 +19,7 @@
package org.apache.pinot.core.data.aggregator;
import org.apache.pinot.common.data.FieldSpec.DataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
public class MinValueAggregator implements ValueAggregator<Number, Double> {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
index a57f324..857f54c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
@@ -20,7 +20,7 @@ package org.apache.pinot.core.data.aggregator;
import org.apache.pinot.common.data.FieldSpec.DataType;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.query.aggregation.function.PercentileEstAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
index 9d398eb..6988921 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
@@ -21,7 +21,7 @@ package org.apache.pinot.core.data.aggregator;
import com.tdunning.math.stats.TDigest;
import org.apache.pinot.common.data.FieldSpec.DataType;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/SumValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/SumValueAggregator.java
index fcbc596..7712b40 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/SumValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/SumValueAggregator.java
@@ -19,7 +19,7 @@
package org.apache.pinot.core.data.aggregator;
import org.apache.pinot.common.data.FieldSpec.DataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
public class SumValueAggregator implements ValueAggregator<Number, Double> {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregator.java
index 67ae929..7abcf1e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregator.java
@@ -19,7 +19,7 @@
package org.apache.pinot.core.data.aggregator;
import org.apache.pinot.common.data.FieldSpec.DataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java
index eb3c688..2b4946d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java
@@ -19,7 +19,7 @@
package org.apache.pinot.core.data.aggregator;
import org.apache.pinot.common.data.FieldSpec.DataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
index 192d0b8..0bb3de5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
@@ -29,7 +29,7 @@ import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.DoubleAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
import org.apache.pinot.core.segment.index.readers.Dictionary;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
index 63b4a9a..5c9c36c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
@@ -28,7 +28,7 @@ import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
deleted file mode 100644
index 01615ce..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.query;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.apache.pinot.common.request.Selection;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Block;
-import org.apache.pinot.core.indexsegment.IndexSegment;
-import org.apache.pinot.core.operator.BaseOperator;
-import org.apache.pinot.core.operator.ExecutionStatistics;
-import org.apache.pinot.core.operator.ProjectionOperator;
-import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
-import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.operator.blocks.ProjectionBlock;
-import org.apache.pinot.core.query.selection.SelectionFetcher;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
-
-
-/**
- * This SelectionOnlyOperator will take care of applying a selection query to one IndexSegment.
- * nextBlock() will return an IntermediateResultBlock for the given IndexSegment.
- *
- *
- */
-public class SelectionOnlyOperator extends BaseOperator<IntermediateResultsBlock> {
- private static final String OPERATOR_NAME = "SelectionOnlyOperator";
-
- private final IndexSegment _indexSegment;
- private final ProjectionOperator _projectionOperator;
- private final DataSchema _dataSchema;
- private final Block[] _blocks;
- private final int _limitDocs;
- private final Collection<Serializable[]> _rowEvents;
- private ExecutionStatistics _executionStatistics;
-
- public SelectionOnlyOperator(IndexSegment indexSegment, Selection selection, ProjectionOperator projectionOperator) {
- _indexSegment = indexSegment;
- _limitDocs = selection.getSize();
- _projectionOperator = projectionOperator;
- List<String> selectionColumns =
- SelectionOperatorUtils.getSelectionColumns(selection.getSelectionColumns(), indexSegment);
- _dataSchema = SelectionOperatorUtils.extractDataSchema(null, selectionColumns, indexSegment);
- _blocks = new Block[selectionColumns.size()];
- _rowEvents = new ArrayList<>();
- }
-
- @Override
- protected IntermediateResultsBlock getNextBlock() {
- int numDocsScanned = 0;
-
- ProjectionBlock projectionBlock;
- while ((projectionBlock = _projectionOperator.nextBlock()) != null) {
- for (int i = 0; i < _dataSchema.size(); i++) {
- _blocks[i] = projectionBlock.getBlock(_dataSchema.getColumnName(i));
- }
- SelectionFetcher selectionFetcher = new SelectionFetcher(_blocks, _dataSchema);
- DocIdSetBlock docIdSetBlock = projectionBlock.getDocIdSetBlock();
- int numDocsToFetch = Math.min(docIdSetBlock.getSearchableLength(), _limitDocs - _rowEvents.size());
- numDocsScanned += numDocsToFetch;
- int[] docIdSet = docIdSetBlock.getDocIdSet();
- for (int i = 0; i < numDocsToFetch; i++) {
- _rowEvents.add(selectionFetcher.getRow(docIdSet[i]));
- }
- if (_rowEvents.size() == _limitDocs) {
- break;
- }
- }
-
- // Create execution statistics.
- long numEntriesScannedInFilter = _projectionOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
- long numEntriesScannedPostFilter = numDocsScanned * _projectionOperator.getNumColumnsProjected();
- long numTotalRawDocs = _indexSegment.getSegmentMetadata().getTotalRawDocs();
- _executionStatistics =
- new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
- numTotalRawDocs);
-
- return new IntermediateResultsBlock(_dataSchema, _rowEvents);
- }
-
- @Override
- public String getOperatorName() {
- return OPERATOR_NAME;
- }
-
- @Override
- public ExecutionStatistics getExecutionStatistics() {
- return _executionStatistics;
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOperator.java
new file mode 100644
index 0000000..c20d194
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOperator.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import org.apache.pinot.common.data.FieldSpec.DataType;
+import org.apache.pinot.common.request.Selection;
+import org.apache.pinot.common.request.SelectionSort;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.primitive.ByteArray;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformBlockDataFetcher;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+
+
+/**
+ * This SelectionOnlyOperator will take care of applying a selection query to one IndexSegment.
+ * nextBlock() will return an IntermediateResultBlock for the given IndexSegment.
+ */
+public class SelectionOperator extends BaseOperator<IntermediateResultsBlock> {
+
+ private static final String OPERATOR_NAME = "SelectionOnlyOperator";
+
+ private final IndexSegment _indexSegment;
+ private final TransformOperator _transformOperator;
+ private final DataSchema _dataSchema;
+ private final BlockValSet[] _blockValSets;
+ private final int _limit;
+ private Selection _selection;
+ private final int _offset;
+ private final int _maxRows;
+ private final List<TransformExpressionTree> _expressions;
+ private final List<TransformExpressionTree> _selectExpressions;
+ private final List<TransformExpressionTree> _orderByExpressions;
+ private final List<Integer> _orderByIndices;
+ private final TransformResultMetadata[] _expressionResultMetadata;
+ private final Dictionary[] _dictionaries;
+ private Collection<Serializable[]> _rowEvents;
+ private PriorityQueue<Serializable[]> _priorityQueue;
+
+ private ExecutionStatistics _executionStatistics;
+
+ public SelectionOperator(IndexSegment indexSegment, Selection selection, TransformOperator transformOperator) {
+ _indexSegment = indexSegment;
+ _offset = selection.getOffset();
+ _limit = selection.getSize();
+ _selection = selection;
+ _maxRows = _offset + _limit;
+ _transformOperator = transformOperator;
+ _expressions = _transformOperator.getExpressions();
+
+ List<String> selectColumns = selection.getSelectionColumns();
+ if (selectColumns.size() == 1 && selectColumns.get(0).equals("*")) {
+ selectColumns = new LinkedList<>(indexSegment.getPhysicalColumnNames());
+ }
+
+ List<String> orderByColumns = new ArrayList<>();
+ if (selection.getSelectionSortSequence() != null) {
+ for (SelectionSort selectionSort : selection.getSelectionSortSequence()) {
+ String expression = selectionSort.getColumn();
+ orderByColumns.add(expression);
+ }
+ }
+ _selectExpressions = new ArrayList<>();
+ _orderByExpressions = new ArrayList<>();
+ _orderByIndices = new ArrayList<>();
+ for (int i = 0; i < _expressions.size(); i++) {
+ TransformExpressionTree expression = _expressions.get(i);
+ if (selectColumns.contains(expression.toString())) {
+ _selectExpressions.add(expression);
+ }
+ }
+
+ for (String orderByColumn : orderByColumns) {
+ for (int i = 0; i < _expressions.size(); i++) {
+ TransformExpressionTree expression = _expressions.get(i);
+ if (orderByColumn.equalsIgnoreCase(expression.toString())) {
+ _orderByExpressions.add(expression);
+ _orderByIndices.add(i);
+ }
+ }
+ }
+
+ _blockValSets = new BlockValSet[_expressions.size()];
+ _expressionResultMetadata = new TransformResultMetadata[_expressions.size()];
+ _dictionaries = new Dictionary[_expressions.size()];
+ DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[_expressions.size()];
+ String[] columnNames = new String[_expressions.size()];
+ for (int i = 0; i < _expressions.size(); i++) {
+ TransformExpressionTree expression = _expressions.get(i);
+ _expressionResultMetadata[i] = _transformOperator.getResultMetadata(expression);
+ columnDataTypes[i] = DataSchema.ColumnDataType
+ .fromDataType(_expressionResultMetadata[i].getDataType(), _expressionResultMetadata[i].isSingleValue());
+ columnNames[i] = expression.toString();
+ if (_expressionResultMetadata[i].hasDictionary()) {
+ _dictionaries[i] = _transformOperator.getDictionary(expression);
+ }
+ }
+ _dataSchema = new DataSchema(columnNames, columnDataTypes);
+ if (_orderByExpressions.isEmpty()) {
+ _rowEvents = new ArrayList<>();
+ } else {
+ Comparator<Serializable[]> comparator = getStrictComparator();
+ _priorityQueue = new PriorityQueue<>(_maxRows, comparator);
+ }
+ }
+
+ private Comparator<Serializable[]> getStrictComparator() {
+ return new Comparator<Serializable[]>() {
+ @Override
+ public int compare(Serializable[] o1, Serializable[] o2) {
+ List<SelectionSort> sortSequence = _selection.getSelectionSortSequence();
+ int numSortColumns = sortSequence.size();
+ for (int i = 0; i < numSortColumns; i++) {
+ int ret = 0;
+ SelectionSort selectionSort = sortSequence.get(i);
+ int index = _orderByIndices.get(i);
+ // Only compare single-value columns.
+ if (!_expressionResultMetadata[index].isSingleValue()) {
+ continue;
+ }
+
+ Serializable v1 = o1[index];
+ Serializable v2 = o2[index];
+
+ DataType dataType = _expressionResultMetadata[index].getDataType();
+ switch (dataType) {
+ case INT:
+ if (!selectionSort.isIsAsc()) {
+ ret = ((Integer) v1).compareTo((Integer) v2);
+ } else {
+ ret = ((Integer) v2).compareTo((Integer) v1);
+ }
+ break;
+ case LONG:
+ if (!selectionSort.isIsAsc()) {
+ ret = ((Long) v1).compareTo((Long) v2);
+ } else {
+ ret = ((Long) v2).compareTo((Long) v1);
+ }
+ break;
+ case FLOAT:
+ if (!selectionSort.isIsAsc()) {
+ ret = ((Float) v1).compareTo((Float) v2);
+ } else {
+ ret = ((Float) v2).compareTo((Float) v1);
+ }
+ break;
+ case DOUBLE:
+ if (!selectionSort.isIsAsc()) {
+ ret = ((Double) v1).compareTo((Double) v2);
+ } else {
+ ret = ((Double) v2).compareTo((Double) v1);
+ }
+ break;
+ case BOOLEAN:
+ case STRING:
+ if (!selectionSort.isIsAsc()) {
+ ret = ((String) v1).compareTo((String) v2);
+ } else {
+ ret = ((String) v2).compareTo((String) v1);
+ }
+ break;
+ case BYTES:
+ if (!selectionSort.isIsAsc()) {
+ ret = ByteArray.compare((byte[]) v1, (byte[]) v2);
+ } else {
+ ret = ByteArray.compare((byte[]) v2, (byte[]) v1);
+ }
+ default:
+ break;
+ }
+
+ if (ret != 0) {
+ return ret;
+ }
+ }
+ return 0;
+ }
+ };
+ }
+
+ @Override
+ protected IntermediateResultsBlock getNextBlock() {
+ int numDocsScanned = 0;
+
+ TransformBlock transformBlock;
+ boolean selectionOnly = _orderByExpressions.isEmpty();
+ while ((transformBlock = _transformOperator.nextBlock()) != null) {
+ for (int i = 0; i < _expressions.size(); i++) {
+ TransformExpressionTree expression = _expressions.get(i);
+ _blockValSets[i] = transformBlock.getBlockValueSet(expression);
+ }
+ TransformBlockDataFetcher dataFetcher;
+ dataFetcher = new TransformBlockDataFetcher(_blockValSets, _dictionaries, _expressionResultMetadata);
+ if (selectionOnly) {
+ int docId = 0;
+ while (_rowEvents.size() < _limit && docId < transformBlock.getNumDocs()) {
+ Serializable[] row = dataFetcher.getRow(docId);
+ _rowEvents.add(row);
+ docId = docId + 1;
+ }
+ if (_rowEvents.size() >= _limit) {
+ numDocsScanned += docId;
+ break;
+ }
+ } else {
+ //Selection + orderby
+ int docId = 0;
+ while (docId < transformBlock.getNumDocs()) {
+ Serializable[] row = dataFetcher.getRow(docId);
+ if (_priorityQueue.size() < _maxRows) {
+ _priorityQueue.add(row);
+ } else if (_priorityQueue.comparator().compare(_priorityQueue.peek(), row) < 0) {
+ _priorityQueue.poll();
+ _priorityQueue.offer(row);
+ }
+ docId = docId + 1;
+ }
+ numDocsScanned += docId;
+ }
+ }
+
+ // Create execution statistics.
+ long numEntriesScannedInFilter = _transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+ long numEntriesScannedPostFilter = numDocsScanned * _transformOperator.getNumColumnsProjected();
+ long numTotalRawDocs = _indexSegment.getSegmentMetadata().getTotalRawDocs();
+ _executionStatistics =
+ new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+ numTotalRawDocs);
+ if (selectionOnly) {
+ return new IntermediateResultsBlock(_dataSchema, _rowEvents);
+ } else {
+ return new IntermediateResultsBlock(_dataSchema, _priorityQueue);
+ }
+ }
+
+ @Override
+ public String getOperatorName() {
+ return OPERATOR_NAME;
+ }
+
+ @Override
+ public ExecutionStatistics getExecutionStatistics() {
+ return _executionStatistics;
+ }
+}
+
+
+
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
deleted file mode 100644
index b7ddd1e..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.query;
-
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.pinot.common.request.Selection;
-import org.apache.pinot.common.request.SelectionSort;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Block;
-import org.apache.pinot.core.indexsegment.IndexSegment;
-import org.apache.pinot.core.operator.BaseOperator;
-import org.apache.pinot.core.operator.ExecutionStatistics;
-import org.apache.pinot.core.operator.ProjectionOperator;
-import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
-import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.operator.blocks.ProjectionBlock;
-import org.apache.pinot.core.query.selection.SelectionOperatorService;
-
-
-/**
- * This MSelectionOperator will take care of applying a selection query to one IndexSegment.
- * nextBlock() will return an IntermediateResultBlock for the given IndexSegment.
- *
- *
- */
-public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBlock> {
- private static final String OPERATOR_NAME = "SelectionOrderByOperator";
-
- private final IndexSegment _indexSegment;
- private final ProjectionOperator _projectionOperator;
- private final Selection _selection;
- private final SelectionOperatorService _selectionOperatorService;
- private final DataSchema _dataSchema;
- private final Block[] _blocks;
- private final Set<String> _selectionColumns = new HashSet<>();
- private ExecutionStatistics _executionStatistics;
-
- public SelectionOrderByOperator(IndexSegment indexSegment, Selection selection,
- ProjectionOperator projectionOperator) {
- _indexSegment = indexSegment;
- _selection = selection;
- _projectionOperator = projectionOperator;
-
- initColumnarDataSourcePlanNodeMap(indexSegment);
- _selectionOperatorService = new SelectionOperatorService(_selection, indexSegment);
- _dataSchema = _selectionOperatorService.getDataSchema();
- _blocks = new Block[_selectionColumns.size()];
- }
-
- private void initColumnarDataSourcePlanNodeMap(IndexSegment indexSegment) {
- _selectionColumns.addAll(_selection.getSelectionColumns());
- if ((_selectionColumns.size() == 1) && ((_selectionColumns.toArray(new String[0]))[0].equals("*"))) {
- _selectionColumns.clear();
- for (String columnName : indexSegment.getPhysicalColumnNames()) {
- _selectionColumns.add(columnName);
- }
- }
- if (_selection.getSelectionSortSequence() != null) {
- for (SelectionSort selectionSort : _selection.getSelectionSortSequence()) {
- _selectionColumns.add(selectionSort.getColumn());
- }
- }
- }
-
- @Override
- protected IntermediateResultsBlock getNextBlock() {
- int numDocsScanned = 0;
-
- ProjectionBlock projectionBlock;
- while ((projectionBlock = _projectionOperator.nextBlock()) != null) {
- for (int i = 0; i < _dataSchema.size(); i++) {
- _blocks[i] = projectionBlock.getBlock(_dataSchema.getColumnName(i));
- }
- DocIdSetBlock docIdSetBlock = projectionBlock.getDocIdSetBlock();
- _selectionOperatorService.iterateOnBlocksWithOrdering(docIdSetBlock.getBlockDocIdSet().iterator(), _blocks);
- }
-
- // Create execution statistics.
- numDocsScanned += _selectionOperatorService.getNumDocsScanned();
- long numEntriesScannedInFilter = _projectionOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
- long numEntriesScannedPostFilter = numDocsScanned * _projectionOperator.getNumColumnsProjected();
- long numTotalRawDocs = _indexSegment.getSegmentMetadata().getTotalRawDocs();
- _executionStatistics =
- new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
- numTotalRawDocs);
-
- return new IntermediateResultsBlock(_selectionOperatorService.getDataSchema(), _selectionOperatorService.getRows());
- }
-
- @Override
- public String getOperatorName() {
- return OPERATOR_NAME;
- }
-
- @Override
- public ExecutionStatistics getExecutionStatistics() {
- return _executionStatistics;
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformBlockDataFetcher.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformBlockDataFetcher.java
new file mode 100644
index 0000000..54e83be
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformBlockDataFetcher.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform;
+
+import java.io.Serializable;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.FieldSpec.DataType;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
+import org.apache.pinot.common.utils.BytesUtils;
+import org.apache.pinot.common.utils.primitive.ByteArray;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+
+public class TransformBlockDataFetcher {
+
+
+ private final Fetcher[] _fetchers;
+
+ public TransformBlockDataFetcher(BlockValSet[] blockValSets, Dictionary[] dictionaries,
+ TransformResultMetadata[] expressionResultMetadata) {
+ _fetchers = new Fetcher[blockValSets.length];
+ for (int i = 0; i < blockValSets.length; i++) {
+ _fetchers[i] = createFetcher(blockValSets[i], dictionaries[i],
+ expressionResultMetadata[i]);
+ }
+ }
+
+ public Serializable[] getRow(int docId) {
+ Serializable[] row = new Serializable[_fetchers.length];
+ for (int i = 0; i < _fetchers.length; i++) {
+ row[i] = _fetchers[i].getValue(docId);
+ }
+ return row;
+ }
+
+ Fetcher createFetcher(BlockValSet blockValSet,
+ Dictionary dictionary,
+ TransformResultMetadata expressionResultMetadata) {
+ if (expressionResultMetadata.hasDictionary()) {
+ if (expressionResultMetadata.isSingleValue()) {
+ return new DictionaryBasedSVValueFetcher(dictionary,
+ blockValSet.getDictionaryIdsSV(), expressionResultMetadata.getDataType());
+ } else {
+ switch (expressionResultMetadata.getDataType()) {
+ case INT:
+ return new DictionaryBasedMVIntValueFetcher(dictionary,
+ blockValSet.getDictionaryIdsMV());
+ case LONG:
+ return new DictionaryBasedMVLongValueFetcher(dictionary,
+ blockValSet.getDictionaryIdsMV());
+ case FLOAT:
+ return new DictionaryBasedMVFloatValueFetcher(dictionary,
+ blockValSet.getDictionaryIdsMV());
+ case DOUBLE:
+ return new DictionaryBasedMVDoubleValueFetcher(dictionary,
+ blockValSet.getDictionaryIdsMV());
+ case BOOLEAN:
+ case STRING:
+ return new DictionaryBasedMVStringValueFetcher(dictionary,
+ blockValSet.getDictionaryIdsMV());
+ case BYTES:
+ return new DictionaryBasedMVBytesValueFetcher(dictionary,
+ blockValSet.getDictionaryIdsMV());
+ }
+ }
+ } else {
+ switch (expressionResultMetadata.getDataType()) {
+ case INT:
+ return new SVIntValueFetcher(blockValSet.getIntValuesSV());
+ case LONG:
+ return new SVLongValueFetcher(blockValSet.getLongValuesSV());
+ case FLOAT:
+ return new SVFloatValueFetcher(blockValSet.getFloatValuesSV());
+ case DOUBLE:
+ return new SVDoubleValueFetcher(blockValSet.getDoubleValuesSV());
+ case BOOLEAN:
+ case STRING:
+ return new SVStringValueFetcher(blockValSet.getStringValuesSV());
+ case BYTES:
+ return new SVBytesValueFetcher(blockValSet.getBytesValuesSV());
+ }
+ }
+ throw new UnsupportedOperationException();
+ }
+}
+
+
+interface Fetcher {
+
+ Serializable getValue(int docId);
+}
+
+class SVIntValueFetcher implements Fetcher {
+
+ private int[] _values;
+
+ SVIntValueFetcher(int[] values) {
+ _values = values;
+ }
+
+ public Serializable getValue(int docId) {
+ return _values[docId];
+ }
+}
+
+class SVLongValueFetcher implements Fetcher {
+
+ private long[] _values;
+
+ SVLongValueFetcher(long[] values) {
+ _values = values;
+ }
+
+ public Serializable getValue(int docId) {
+ return _values[docId];
+ }
+}
+
+class SVFloatValueFetcher implements Fetcher {
+
+ private float[] _values;
+
+ SVFloatValueFetcher(float[] values) {
+ _values = values;
+ }
+
+ public Serializable getValue(int docId) {
+ return _values[docId];
+ }
+}
+
+class SVDoubleValueFetcher implements Fetcher {
+
+ private double[] _values;
+
+ SVDoubleValueFetcher(double[] values) {
+ _values = values;
+ }
+
+ public Serializable getValue(int docId) {
+ return _values[docId];
+ }
+}
+
+class SVStringValueFetcher implements Fetcher {
+
+ private String[] _values;
+
+ SVStringValueFetcher(String[] values) {
+ _values = values;
+ }
+
+ public Serializable getValue(int docId) {
+ return _values[docId];
+ }
+}
+
+class SVBytesValueFetcher implements Fetcher {
+
+ private byte[][] _values;
+
+ SVBytesValueFetcher(byte[][] values) {
+ _values = values;
+ }
+
+ public Serializable getValue(int docId) {
+ return _values[docId];
+ }
+}
+
+class DictionaryBasedSVValueFetcher implements Fetcher {
+
+ private Dictionary _dictionary;
+ private int[] _dictionaryIds;
+ private DataType _dataType;
+
+ DictionaryBasedSVValueFetcher(Dictionary dictionary, int[] dictionaryIds,
+ DataType dataType) {
+
+ _dictionary = dictionary;
+ _dictionaryIds = dictionaryIds;
+ _dataType = dataType;
+ }
+
+ public Serializable getValue(int docId) {
+ if (_dataType.equals(FieldSpec.DataType.BYTES)) {
+ return BytesUtils.toHexString(_dictionary.getBytesValue(_dictionaryIds[docId]));
+ }
+ return (Serializable) _dictionary.get(_dictionaryIds[docId]);
+ }
+}
+
+
+class DictionaryBasedMVIntValueFetcher implements Fetcher {
+
+ private Dictionary _dictionary;
+ private int[][] _dictionaryIdsArray;
+
+ DictionaryBasedMVIntValueFetcher(Dictionary dictionary, int[][] dictionaryIdsArray) {
+
+ _dictionary = dictionary;
+ _dictionaryIdsArray = dictionaryIdsArray;
+ }
+
+ public Serializable getValue(int docId) {
+ int[] dictIds = _dictionaryIdsArray[docId];
+ int[] values = new int[dictIds.length];
+ for (int i = 0; i < dictIds.length; i++) {
+ int dictId = dictIds[i];
+ values[i] = _dictionary.getIntValue(dictId);
+ }
+ return values;
+ }
+}
+
+class DictionaryBasedMVLongValueFetcher implements Fetcher {
+
+ private Dictionary _dictionary;
+ private int[][] _dictionaryIdsArray;
+
+ DictionaryBasedMVLongValueFetcher(Dictionary dictionary, int[][] dictionaryIdsArray) {
+
+ _dictionary = dictionary;
+ _dictionaryIdsArray = dictionaryIdsArray;
+ }
+
+ public Serializable getValue(int docId) {
+ int[] dictIds = _dictionaryIdsArray[docId];
+ long[] values = new long[dictIds.length];
+ for (int i = 0; i < dictIds.length; i++) {
+ int dictId = dictIds[i];
+ values[i] = _dictionary.getLongValue(dictId);
+ }
+ return values;
+ }
+}
+
+class DictionaryBasedMVFloatValueFetcher implements Fetcher {
+
+ private Dictionary _dictionary;
+ private int[][] _dictionaryIdsArray;
+
+ DictionaryBasedMVFloatValueFetcher(Dictionary dictionary, int[][] dictionaryIdsArray) {
+
+ _dictionary = dictionary;
+ _dictionaryIdsArray = dictionaryIdsArray;
+ }
+
+ public Serializable getValue(int docId) {
+ int[] dictIds = _dictionaryIdsArray[docId];
+ float[] values = new float[dictIds.length];
+ for (int i = 0; i < dictIds.length; i++) {
+ int dictId = dictIds[i];
+ values[i] = _dictionary.getFloatValue(dictId);
+ }
+ return values;
+ }
+}
+
+class DictionaryBasedMVDoubleValueFetcher implements Fetcher {
+
+ private Dictionary _dictionary;
+ private int[][] _dictionaryIdsArray;
+
+ DictionaryBasedMVDoubleValueFetcher(Dictionary dictionary, int[][] dictionaryIdsArray) {
+
+ _dictionary = dictionary;
+ _dictionaryIdsArray = dictionaryIdsArray;
+ }
+
+ public Serializable getValue(int docId) {
+ int[] dictIds = _dictionaryIdsArray[docId];
+ double[] values = new double[dictIds.length];
+ for (int i = 0; i < dictIds.length; i++) {
+ int dictId = dictIds[i];
+ values[i] = _dictionary.getDoubleValue(dictId);
+ }
+ return values;
+ }
+}
+
+class DictionaryBasedMVStringValueFetcher implements Fetcher {
+
+ private Dictionary _dictionary;
+ private int[][] _dictionaryIdsArray;
+
+ DictionaryBasedMVStringValueFetcher(Dictionary dictionary, int[][] dictionaryIdsArray) {
+
+ _dictionary = dictionary;
+ _dictionaryIdsArray = dictionaryIdsArray;
+ }
+
+ public Serializable getValue(int docId) {
+ int[] dictIds = _dictionaryIdsArray[docId];
+ String[] values = new String[dictIds.length];
+ for (int i = 0; i < dictIds.length; i++) {
+ int dictId = dictIds[i];
+ values[i] = _dictionary.getStringValue(dictId);
+ }
+ return values;
+ }
+}
+
+class DictionaryBasedMVBytesValueFetcher implements Fetcher {
+
+ private Dictionary _dictionary;
+ private int[][] _dictionaryIdsArray;
+
+ DictionaryBasedMVBytesValueFetcher(Dictionary dictionary, int[][] dictionaryIdsArray) {
+
+ _dictionary = dictionary;
+ _dictionaryIdsArray = dictionaryIdsArray;
+ }
+
+ public Serializable getValue(int docId) {
+ int[] dictIds = _dictionaryIdsArray[docId];
+ byte[][] values = new byte[dictIds.length][];
+ for (int i = 0; i < dictIds.length; i++) {
+ int dictId = dictIds[i];
+ values[i] = _dictionary.getBytesValue(dictId);
+ }
+ return values;
+ }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
index 1c0356a..2081684 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.transform;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
@@ -38,11 +39,13 @@ import org.apache.pinot.core.segment.index.readers.Dictionary;
* Class for evaluating transform expressions.
*/
public class TransformOperator extends BaseOperator<TransformBlock> {
+
private static final String OPERATOR_NAME = "TransformOperator";
private final ProjectionOperator _projectionOperator;
private final Map<String, DataSource> _dataSourceMap;
private final Map<TransformExpressionTree, TransformFunction> _transformFunctionMap = new HashMap<>();
+ private final List<TransformExpressionTree> _expressions;
/**
* Constructor for the class
@@ -51,11 +54,13 @@ public class TransformOperator extends BaseOperator<TransformBlock> {
* @param expressions Set of expressions to evaluate
*/
public TransformOperator(@Nonnull ProjectionOperator projectionOperator,
- @Nonnull Set<TransformExpressionTree> expressions) {
+ @Nonnull List<TransformExpressionTree> expressions) {
_projectionOperator = projectionOperator;
+ _expressions = expressions;
_dataSourceMap = projectionOperator.getDataSourceMap();
for (TransformExpressionTree expression : expressions) {
- TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+ TransformFunction transformFunction = TransformFunctionFactory
+ .get(expression, _dataSourceMap);
_transformFunctionMap.put(expression, transformFunction);
}
}
@@ -109,4 +114,8 @@ public class TransformOperator extends BaseOperator<TransformBlock> {
public ExecutionStatistics getExecutionStatistics() {
return _projectionOperator.getExecutionStatistics();
}
+
+ public List<TransformExpressionTree> getExpressions() {
+ return _expressions;
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/MetadataBasedAggregationPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/MetadataBasedAggregationPlanNode.java
index a09c004..075d2aa 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/MetadataBasedAggregationPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/MetadataBasedAggregationPlanNode.java
@@ -28,7 +28,7 @@ import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.operator.query.MetadataBasedAggregationOperator;
import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
index 4a4449b..ef6db3b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
@@ -23,54 +23,38 @@ import org.apache.pinot.common.request.Selection;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.operator.query.EmptySelectionOperator;
-import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
-import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.operator.query.SelectionOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * The <code>SelectionPlanNode</code> class provides the execution plan for selection query on a single segment.
+ * The <code>SelectionPlanNode</code> class provides the execution plan for selection query on a
+ * single segment.
*/
public class SelectionPlanNode implements PlanNode {
- private static final Logger LOGGER = LoggerFactory.getLogger(SelectionPlanNode.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SelectionPlanNode.class);
private final IndexSegment _indexSegment;
private final Selection _selection;
- private final ProjectionPlanNode _projectionPlanNode;
+ private TransformPlanNode _transformPlanNode;
public SelectionPlanNode(IndexSegment indexSegment, BrokerRequest brokerRequest) {
_indexSegment = indexSegment;
_selection = brokerRequest.getSelections();
-
if (_selection.getSize() > 0) {
- int maxDocPerNextCall = DocIdSetPlanNode.MAX_DOC_PER_CALL;
-
- // No ordering required, select minimum number of documents
- if (!_selection.isSetSelectionSortSequence()) {
- maxDocPerNextCall = Math.min(_selection.getOffset() + _selection.getSize(), maxDocPerNextCall);
- }
-
- DocIdSetPlanNode docIdSetPlanNode = new DocIdSetPlanNode(_indexSegment, brokerRequest, maxDocPerNextCall);
- _projectionPlanNode = new ProjectionPlanNode(_indexSegment,
- SelectionOperatorUtils.extractSelectionRelatedColumns(_selection, indexSegment), docIdSetPlanNode);
+ _transformPlanNode = new TransformPlanNode(_indexSegment, brokerRequest);
} else {
- _projectionPlanNode = null;
+ _transformPlanNode = null;
}
}
@Override
public Operator run() {
- if (_selection.getSize() > 0) {
- if (_selection.isSetSelectionSortSequence()) {
- return new SelectionOrderByOperator(_indexSegment, _selection, _projectionPlanNode.run());
- } else {
- return new SelectionOnlyOperator(_indexSegment, _selection, _projectionPlanNode.run());
- }
- } else {
+ if (_selection.getSize() <= 0) {
return new EmptySelectionOperator(_indexSegment, _selection);
}
+ return new SelectionOperator(_indexSegment, _selection, _transformPlanNode.run());
}
@Override
@@ -88,8 +72,8 @@ public class SelectionPlanNode implements PlanNode {
LOGGER.debug(prefix + "Argument 0: IndexSegment - " + _indexSegment.getSegmentName());
LOGGER.debug(prefix + "Argument 1: Selections - " + _selection);
if (_selection.getSize() > 0) {
- LOGGER.debug(prefix + "Argument 2: Projection -");
- _projectionPlanNode.showTree(prefix + " ");
+ LOGGER.debug(prefix + "Argument 2: Transform -");
+ _transformPlanNode.showTree(prefix + " ");
}
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
index 8dac5be..c1e19d7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
@@ -18,15 +18,24 @@
*/
package org.apache.pinot.core.plan;
+import static org.apache.pinot.core.query.selection.SelectionOperatorUtils.getSelectionColumns;
+
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.pinot.common.request.AggregationInfo;
import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.Selection;
+import org.apache.pinot.common.request.SelectionSort;
import org.apache.pinot.common.request.transform.TransformExpressionTree;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.operator.transform.TransformOperator;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +50,8 @@ public class TransformPlanNode implements PlanNode {
private final String _segmentName;
private final ProjectionPlanNode _projectionPlanNode;
private final Set<String> _projectionColumns = new HashSet<>();
- private final Set<TransformExpressionTree> _expressionTrees = new HashSet<>();
+ private final Set<TransformExpressionTree> _expressionTrees = new LinkedHashSet<>();
+ private int _maxDocPerNextCall = DocIdSetPlanNode.MAX_DOC_PER_CALL;
/**
* Constructor for the class
@@ -51,17 +61,19 @@ public class TransformPlanNode implements PlanNode {
*/
public TransformPlanNode(@Nonnull IndexSegment indexSegment, @Nonnull BrokerRequest brokerRequest) {
_segmentName = indexSegment.getSegmentName();
- extractColumnsAndTransforms(brokerRequest);
+ extractColumnsAndTransforms(brokerRequest, indexSegment);
_projectionPlanNode =
- new ProjectionPlanNode(indexSegment, _projectionColumns, new DocIdSetPlanNode(indexSegment, brokerRequest));
+ new ProjectionPlanNode(indexSegment, _projectionColumns, new DocIdSetPlanNode(indexSegment, brokerRequest, _maxDocPerNextCall));
}
/**
* Helper method to extract projection columns and transform expressions from the given broker request.
*
* @param brokerRequest Broker request to process
+ * @param indexSegment
*/
- private void extractColumnsAndTransforms(@Nonnull BrokerRequest brokerRequest) {
+ private void extractColumnsAndTransforms(@Nonnull BrokerRequest brokerRequest,
+ IndexSegment indexSegment) {
if (brokerRequest.isSetAggregationsInfo()) {
for (AggregationInfo aggregationInfo : brokerRequest.getAggregationsInfo()) {
if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
@@ -81,15 +93,35 @@ public class TransformPlanNode implements PlanNode {
}
}
} else {
- throw new UnsupportedOperationException("Transforms not supported in selection queries.");
- // TODO: Add transform support.
- // projectionColumns.addAll(brokerRequest.getSelections().getSelectionColumns());
+ Selection selection = brokerRequest.getSelections();
+ // No ordering required, select minimum number of documents
+ if (!selection.isSetSelectionSortSequence()) {
+ _maxDocPerNextCall = Math.min(selection.getOffset() + selection.getSize(), _maxDocPerNextCall);
+ }
+ List<String> expressions = selection.getSelectionColumns();
+ if (expressions.size() == 1 && expressions.get(0).equals("*")) {
+ expressions = new LinkedList<>(indexSegment.getPhysicalColumnNames());
+ Collections.sort(expressions);
+ }
+ if (selection.getSelectionSortSequence() != null) {
+ for (SelectionSort selectionSort : selection.getSelectionSortSequence()) {
+ String expression = selectionSort.getColumn();
+ if(!expressions.contains(expression)) {
+ expressions.add(expression);
+ }
+ }
+ }
+ for (String expression : expressions) {
+ TransformExpressionTree transformExpressionTree = TransformExpressionTree.compileToExpressionTree(expression);
+ transformExpressionTree.getColumns(_projectionColumns);
+ _expressionTrees.add(transformExpressionTree);
+ }
}
}
@Override
public TransformOperator run() {
- return new TransformOperator(_projectionPlanNode.run(), _expressionTrees);
+ return new TransformOperator(_projectionPlanNode.run(), new ArrayList<>(_expressionTrees));
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index bc62a20..56a20fe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -38,7 +38,7 @@ import org.apache.pinot.core.plan.MetadataBasedAggregationPlanNode;
import org.apache.pinot.core.plan.Plan;
import org.apache.pinot.core.plan.PlanNode;
import org.apache.pinot.core.plan.SelectionPlanNode;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.config.QueryExecutorConfig;
import org.apache.pinot.core.segment.index.readers.Dictionary;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutor.java
index b43d03e..c9d09b8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutor.java
@@ -24,7 +24,7 @@ import javax.annotation.Nonnull;
import org.apache.pinot.common.request.transform.TransformExpressionTree;
import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java
index 8f4600b..75e9bbe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index e1f1d69..23271c1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
import com.google.common.base.Preconditions;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.query.exception.BadQueryRequestException;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index b731bf9..a7cfd25 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Locale;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.request.AggregationInfo;
import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.core.plan.AggregationFunctionInitializer;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
index 6aef5f0..5f43415 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
index 6959290..cd0286c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountAggregationFunction.java
index c3ace67..4184be3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountAggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
index 50f3648..eecc0cb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
index 607c6c3..60b70c6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.aggregation.function;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import javax.annotation.Nonnull;
import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
index f35709c..ee04cf0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
@@ -22,6 +22,7 @@ import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import javax.annotation.Nonnull;
import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java
index f8a1d8c..1dabb6a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.aggregation.function;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import javax.annotation.Nonnull;
import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
index b951b2c..431cd39 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.aggregation.function;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import javax.annotation.Nonnull;
import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java
index ce7cae2..f07b7ba 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java
index 1da478d..fac31f9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FastHLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FastHLLAggregationFunction.java
index c071d6f..1cf3940 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FastHLLAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FastHLLAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java
index 1060b8b..3fff0ed 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxMVAggregationFunction.java
index 655382a..48ab679 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxMVAggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinAggregationFunction.java
index b1881c1..c01b2b8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinAggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMVAggregationFunction.java
index f753a04..3525f48 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMVAggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
index 717817c..259b299 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
index b9fa7a3..391016c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileAggregationFunction.java
index 0e3b09a..cf51998 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileAggregationFunction.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.aggregation.function;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import java.util.Arrays;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
index 3c2c91b..18ab698 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstMVAggregationFunction.java
index 25bb816..572d9a6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstMVAggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileMVAggregationFunction.java
index 74d7458..77fec93 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileMVAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
index 0d4db7f..a3db710 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
@@ -22,6 +22,7 @@ import com.tdunning.math.stats.TDigest;
import java.nio.ByteBuffer;
import javax.annotation.Nonnull;
import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.ObjectSerDeUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestMVAggregationFunction.java
index 9b7966e..2ee71ac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestMVAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
import com.tdunning.math.stats.TDigest;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumAggregationFunction.java
index 1071022..fed2722 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumAggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
index e0035f8..e2f423b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.aggregation.function;
import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
index 6ccb599..70e3806 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
@@ -29,7 +29,7 @@ import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index 5c4bb95..083db4d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.request;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
@@ -31,7 +32,7 @@ import org.apache.pinot.common.request.Selection;
import org.apache.pinot.common.request.transform.TransformExpressionTree;
import org.apache.pinot.common.utils.request.FilterQueryTree;
import org.apache.pinot.common.utils.request.RequestUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.request.context.TimerContext;
@@ -62,6 +63,7 @@ public class ServerQueryRequest {
private final Set<TransformExpressionTree> _groupByExpressions;
private final Set<String> _groupByColumns;
private final Set<String> _selectionColumns;
+ private final Set<TransformExpressionTree> _selectionExpressions;
// Query processing context
private volatile int _segmentCountAfterPruning = -1;
@@ -121,10 +123,17 @@ public class ServerQueryRequest {
// Selection
Selection selection = _brokerRequest.getSelections();
if (selection != null) {
- _selectionColumns = RequestUtils.extractSelectionColumns(selection);
+ _selectionExpressions = new LinkedHashSet<>();
+ Set<String> selectionColumns = RequestUtils.extractSelectionColumns(selection);
+ for (String expression : selectionColumns) {
+ _selectionExpressions.add(TransformExpressionTree.compileToExpressionTree(expression));
+ }
+ _selectionColumns = RequestUtils.extractColumnsFromExpressions(_selectionExpressions);
_allColumns.addAll(_selectionColumns);
+
} else {
_selectionColumns = null;
+ _selectionExpressions = null;
}
}
@@ -194,4 +203,9 @@ public class ServerQueryRequest {
public Set<String> getSelectionColumns() {
return _selectionColumns;
}
+
+ @Nullable
+ public Set<TransformExpressionTree> getSelectionExpressions() {
+ return _selectionExpressions;
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 783cdc5..a24f441 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -76,7 +76,7 @@ public class SelectionOperatorUtils {
private static final DecimalFormatSymbols DECIMAL_FORMAT_SYMBOLS = DecimalFormatSymbols.getInstance(Locale.US);
/**
- * Expand <code>'SELECT *'</code> to select all columns with {@link IndexSegment}, order all columns alphabatically.
+ * Expand <code>'SELECT *'</code> to select all columns with {@link IndexSegment}, order all columns alphabetically.
* (Inner segment)
*
* @param selectionColumns unexpanded selection columns (may contain '*').
@@ -88,8 +88,6 @@ public class SelectionOperatorUtils {
@Nonnull IndexSegment indexSegment) {
if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) {
List<String> allColumns = new LinkedList<>(indexSegment.getPhysicalColumnNames());
- Set<String> columnNames = indexSegment.getPhysicalColumnNames();
-
Collections.sort(allColumns);
return allColumns;
} else {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeAggregationExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeAggregationExecutor.java
index e170a43..75146ef 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeAggregationExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeAggregationExecutor.java
@@ -24,7 +24,7 @@ import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.DefaultAggregationExecutor;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.startree.StarTreeUtils;
import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeGroupByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeGroupByExecutor.java
index 042c37a..af0e83b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeGroupByExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeGroupByExecutor.java
@@ -25,7 +25,7 @@ import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.startree.StarTreeUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
index 10c2276..4463dcd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.startree.plan;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
@@ -65,7 +66,7 @@ public class StarTreeTransformPlanNode implements PlanNode {
@Override
public TransformOperator run() {
- return new TransformOperator(_starTreeProjectionPlanNode.run(), _groupByExpressions);
+ return new TransformOperator(_starTreeProjectionPlanNode.run(), new ArrayList<>(_groupByExpressions));
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPair.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPair.java
index 75a7b6c..8056187 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPair.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPair.java
@@ -19,7 +19,7 @@
package org.apache.pinot.core.startree.v2;
import javax.annotation.Nonnull;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
public class AggregationFunctionColumnPair {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/builder/BaseSingleTreeBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/builder/BaseSingleTreeBuilder.java
index 596f550..9c7cb0d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/builder/BaseSingleTreeBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/builder/BaseSingleTreeBuilder.java
@@ -37,7 +37,7 @@ import org.apache.pinot.core.data.aggregator.ValueAggregatorFactory;
import org.apache.pinot.core.data.readers.PinotSegmentColumnReader;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.io.compression.ChunkCompressorFactory.CompressionType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.segment.creator.SingleValueRawIndexCreator;
import org.apache.pinot.core.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
import org.apache.pinot.core.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeLoaderUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeLoaderUtils.java
index 074eea5..337fdd6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeLoaderUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeLoaderUtils.java
@@ -31,7 +31,7 @@ import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.segment.StarTreeMetadata;
import org.apache.pinot.core.common.DataSource;
import org.apache.pinot.core.data.aggregator.ValueAggregatorFactory;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.segment.index.ColumnMetadata;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index 7310601..7b4456a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.query.aggregation.function;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.testng.Assert;
import org.testng.annotations.Test;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionTypeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionTypeTest.java
index 0f34566..fd4f338 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionTypeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionTypeTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.query.aggregation.function;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.testng.Assert;
import org.testng.annotations.Test;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPairTest.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPairTest.java
index 0264b3b..793cb30 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPairTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPairTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.core.startree.v2;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.testng.Assert;
import org.testng.annotations.Test;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
index fb71bb1..372fb19 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
@@ -51,7 +51,7 @@ import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.plan.FilterPlanNode;
import org.apache.pinot.core.plan.PlanNode;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.core.segment.index.readers.Dictionary;
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
index 2bdd287..f8f8b9e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
@@ -19,14 +19,15 @@
package org.apache.pinot.queries;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.query.EmptySelectionOperator;
-import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
-import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -49,11 +50,13 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 0L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
Assert.assertEquals(selectionDataSchema.size(), 10);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnName(5), "column6");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(5), DataSchema.ColumnDataType.INT_ARRAY);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column6"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
Assert.assertTrue(resultsBlock.getSelectionResult().isEmpty());
// Test query with filter
@@ -65,11 +68,12 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 0L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
Assert.assertEquals(selectionDataSchema.size(), 10);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnName(5), "column6");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(5), DataSchema.ColumnDataType.INT_ARRAY);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column6"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
Assert.assertTrue(resultsBlock.getSelectionResult().isEmpty());
}
@@ -78,7 +82,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
String query = "SELECT * FROM testTable";
// Test query without filter
- SelectionOnlyOperator selectionOnlyOperator = getOperatorForQuery(query);
+ BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForQuery(query);
IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
ExecutionStatistics executionStatistics = selectionOnlyOperator.getExecutionStatistics();
Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -86,17 +90,19 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 100L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
Assert.assertEquals(selectionDataSchema.size(), 10);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnName(5), "column6");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(5), DataSchema.ColumnDataType.INT_ARRAY);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column6"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
List<Serializable[]> selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
Serializable[] firstRow = selectionResult.get(0);
Assert.assertEquals(firstRow.length, 10);
- Assert.assertEquals(((Integer) firstRow[0]).intValue(), 890282370);
- Assert.assertEquals(firstRow[5], new int[]{2147483647});
+ Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 890282370);
+ Assert.assertEquals(firstRow[columnIndexMap.get("column6")], new int[]{2147483647});
// Test query with filter
selectionOnlyOperator = getOperatorForQueryWithFilter(query);
@@ -107,17 +113,19 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 100L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
Assert.assertEquals(selectionDataSchema.size(), 10);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnName(5), "column6");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(5), DataSchema.ColumnDataType.INT_ARRAY);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column6"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
firstRow = selectionResult.get(0);
Assert.assertEquals(firstRow.length, 10);
- Assert.assertEquals(((Integer) firstRow[0]).intValue(), 890282370);
- Assert.assertEquals(firstRow[5], new int[]{2147483647});
+ Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 890282370);
+ Assert.assertEquals(firstRow[columnIndexMap.get("column6")], new int[]{2147483647});
}
@Test
@@ -125,7 +133,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
String query = "SELECT" + SELECTION + " FROM testTable";
// Test query without filter
- SelectionOnlyOperator selectionOnlyOperator = getOperatorForQuery(query);
+ BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForQuery(query);
IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
ExecutionStatistics executionStatistics = selectionOnlyOperator.getExecutionStatistics();
Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -133,17 +141,19 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
Assert.assertEquals(selectionDataSchema.size(), 3);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnName(2), "column6");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(2), DataSchema.ColumnDataType.INT_ARRAY);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column6"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
List<Serializable[]> selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
Serializable[] firstRow = selectionResult.get(0);
Assert.assertEquals(firstRow.length, 3);
- Assert.assertEquals(((Integer) firstRow[0]).intValue(), 890282370);
- Assert.assertEquals(firstRow[2], new int[]{2147483647});
+ Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 890282370);
+ Assert.assertEquals(firstRow[columnIndexMap.get("column6")], new int[]{2147483647});
// Test query with filter
selectionOnlyOperator = getOperatorForQueryWithFilter(query);
@@ -154,17 +164,19 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
Assert.assertEquals(selectionDataSchema.size(), 3);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnName(2), "column6");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(2), DataSchema.ColumnDataType.INT_ARRAY);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column6"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
firstRow = selectionResult.get(0);
Assert.assertEquals(firstRow.length, 3);
- Assert.assertEquals(((Integer) firstRow[0]).intValue(), 890282370);
- Assert.assertEquals(firstRow[2], new int[]{2147483647});
+ Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 890282370);
+ Assert.assertEquals(firstRow[columnIndexMap.get("column6")], new int[]{2147483647});
}
@Test
@@ -172,7 +184,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
String query = "SELECT" + SELECTION + " FROM testTable" + ORDER_BY;
// Test query without filter
- SelectionOrderByOperator selectionOrderByOperator = getOperatorForQuery(query);
+ BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForQuery(query);
IntermediateResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
ExecutionStatistics executionStatistics = selectionOrderByOperator.getExecutionStatistics();
Assert.assertEquals(executionStatistics.getNumDocsScanned(), 100000L);
@@ -180,17 +192,19 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 400000L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
Assert.assertEquals(selectionDataSchema.size(), 4);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column5");
- Assert.assertEquals(selectionDataSchema.getColumnName(3), "column6");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.STRING);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(3), DataSchema.ColumnDataType.INT_ARRAY);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column6"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
Queue<Serializable[]> selectionResult = (Queue<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
Serializable[] lastRow = selectionResult.peek();
Assert.assertEquals(lastRow.length, 4);
- Assert.assertEquals((String) lastRow[0], "AKXcXcIqsqOJFsdwxZ");
- Assert.assertEquals(lastRow[3], new int[]{1252});
+ Assert.assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ");
+ Assert.assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{1252});
// Test query with filter
selectionOrderByOperator = getOperatorForQueryWithFilter(query);
@@ -201,16 +215,27 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 62480L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
Assert.assertEquals(selectionDataSchema.size(), 4);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column5");
- Assert.assertEquals(selectionDataSchema.getColumnName(3), "column6");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.STRING);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(3), DataSchema.ColumnDataType.INT_ARRAY);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column6"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
selectionResult = (Queue<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
lastRow = selectionResult.peek();
Assert.assertEquals(lastRow.length, 4);
- Assert.assertEquals((String) lastRow[0], "AKXcXcIqsqOJFsdwxZ");
- Assert.assertEquals(lastRow[3], new int[]{2147483647});
+ Assert.assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ");
+ Assert.assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{2147483647});
+ }
+
+ private Map<String, Integer> computeColumnNameToIndexMap(DataSchema dataSchema) {
+ Map<String, Integer> columnIndexMap = new HashMap<>();
+
+ for (int i = 0; i < dataSchema.size(); i++) {
+ columnIndexMap.put(dataSchema.getColumnName(i), i);
+ }
+ return columnIndexMap;
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index 835975e..a731f72 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -19,14 +19,15 @@
package org.apache.pinot.queries;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.query.EmptySelectionOperator;
-import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
-import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -49,11 +50,12 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 0L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
Assert.assertEquals(selectionDataSchema.size(), 11);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnName(1), "column11");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.STRING);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column11"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column11")), DataSchema.ColumnDataType.STRING);
Assert.assertTrue(resultsBlock.getSelectionResult().isEmpty());
// Test query with filter
@@ -65,11 +67,13 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 0L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
Assert.assertEquals(selectionDataSchema.size(), 11);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnName(1), "column11");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.STRING);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column11"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column11")), DataSchema.ColumnDataType.STRING);
Assert.assertTrue(resultsBlock.getSelectionResult().isEmpty());
}
@@ -78,7 +82,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
String query = "SELECT * FROM testTable";
// Test query without filter
- SelectionOnlyOperator selectionOnlyOperator = getOperatorForQuery(query);
+ BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForQuery(query);
IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
ExecutionStatistics executionStatistics = selectionOnlyOperator.getExecutionStatistics();
Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -86,18 +90,19 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 110L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
Assert.assertEquals(selectionDataSchema.size(), 11);
Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnName(1), "column11");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.STRING);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column11"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column11")), DataSchema.ColumnDataType.STRING);
List<Serializable[]> selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
Serializable[] firstRow = selectionResult.get(0);
Assert.assertEquals(firstRow.length, 11);
- Assert.assertEquals(((Integer) firstRow[0]).intValue(), 1578964907);
- Assert.assertEquals((String) firstRow[1], "P");
+ Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 1578964907);
+ Assert.assertEquals((String) firstRow[columnIndexMap.get("column11")], "P");
// Test query with filter
selectionOnlyOperator = getOperatorForQueryWithFilter(query);
@@ -108,18 +113,19 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 110L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
Assert.assertEquals(selectionDataSchema.size(), 11);
Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnName(1), "column11");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.STRING);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column11"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column11")), DataSchema.ColumnDataType.STRING);
selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
firstRow = selectionResult.get(0);
Assert.assertEquals(firstRow.length, 11);
- Assert.assertEquals(((Integer) firstRow[0]).intValue(), 351823652);
- Assert.assertEquals((String) firstRow[1], "t");
+ Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 351823652);
+ Assert.assertEquals((String) firstRow[columnIndexMap.get("column11")], "t");
}
@Test
@@ -127,7 +133,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
String query = "SELECT" + SELECTION + " FROM testTable";
// Test query without filter
- SelectionOnlyOperator selectionOnlyOperator = getOperatorForQuery(query);
+ BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForQuery(query);
IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
ExecutionStatistics executionStatistics = selectionOnlyOperator.getExecutionStatistics();
Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -135,11 +141,13 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
Assert.assertEquals(selectionDataSchema.size(), 3);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnName(2), "column11");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.STRING);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column11"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column11")), DataSchema.ColumnDataType.STRING);
List<Serializable[]> selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
Serializable[] firstRow = selectionResult.get(0);
@@ -156,17 +164,18 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
Assert.assertEquals(selectionDataSchema.size(), 3);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnName(2), "column11");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.STRING);
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertTrue(columnIndexMap.containsKey("column11"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column11")), DataSchema.ColumnDataType.STRING);
selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
firstRow = selectionResult.get(0);
Assert.assertEquals(firstRow.length, 3);
- Assert.assertEquals(((Integer) firstRow[0]).intValue(), 351823652);
- Assert.assertEquals((String) firstRow[2], "t");
+ Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 351823652);
+ Assert.assertEquals((String) firstRow[columnIndexMap.get("column11")], "t");
}
@Test
@@ -174,7 +183,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
String query = "SELECT" + SELECTION + " FROM testTable" + ORDER_BY;
// Test query without filter
- SelectionOrderByOperator selectionOrderByOperator = getOperatorForQuery(query);
+ BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForQuery(query);
IntermediateResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
ExecutionStatistics executionStatistics = selectionOrderByOperator.getExecutionStatistics();
Assert.assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
@@ -182,17 +191,19 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 120000L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
Assert.assertEquals(selectionDataSchema.size(), 4);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column6");
- Assert.assertEquals(selectionDataSchema.getColumnName(1), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.INT);
+ Assert.assertTrue(columnIndexMap.containsKey("column6"));
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
Queue<Serializable[]> selectionResult = (Queue<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
Serializable[] lastRow = selectionResult.peek();
Assert.assertEquals(lastRow.length, 4);
- Assert.assertEquals(((Integer) lastRow[0]).intValue(), 6043515);
- Assert.assertEquals(((Integer) lastRow[1]).intValue(), 10542595);
+ Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
+ Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 10542595);
// Test query with filter
selectionOrderByOperator = getOperatorForQueryWithFilter(query);
@@ -203,17 +214,19 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 24516L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
Assert.assertEquals(selectionDataSchema.size(), 4);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column6");
- Assert.assertEquals(selectionDataSchema.getColumnName(1), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.INT);
+ Assert.assertTrue(columnIndexMap.containsKey("column6"));
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
selectionResult = (Queue<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
lastRow = selectionResult.peek();
Assert.assertEquals(lastRow.length, 4);
- Assert.assertEquals(((Integer) lastRow[0]).intValue(), 6043515);
- Assert.assertEquals(((Integer) lastRow[1]).intValue(), 462769197);
+ Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
+ Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 462769197);
}
@Test
@@ -221,7 +234,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
String query = "SELECT * " + " FROM testTable" + ORDER_BY;
// Test query without filter
- SelectionOrderByOperator selectionOrderByOperator = getOperatorForQuery(query);
+ BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForQuery(query);
IntermediateResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
ExecutionStatistics executionStatistics = selectionOrderByOperator.getExecutionStatistics();
Assert.assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
@@ -229,18 +242,20 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 330000L);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
Assert.assertEquals(selectionDataSchema.size(), 11);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column6");
- Assert.assertEquals(selectionDataSchema.getColumnName(1), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.INT);
+ Assert.assertTrue(columnIndexMap.containsKey("column6"));
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
Queue<Serializable[]> selectionResult = (Queue<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
Serializable[] lastRow = selectionResult.peek();
Assert.assertEquals(lastRow.length, 11);
- Assert.assertEquals(((Integer) lastRow[0]).intValue(), 6043515);
- Assert.assertEquals(((Integer) lastRow[1]).intValue(), 10542595);
+ Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
+ Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 10542595);
// Test query with filter
selectionOrderByOperator = getOperatorForQueryWithFilter(query);
@@ -251,18 +266,20 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 67419);
Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
selectionDataSchema = resultsBlock.getSelectionDataSchema();
+ columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
Assert.assertEquals(selectionDataSchema.size(), 11);
- Assert.assertEquals(selectionDataSchema.getColumnName(0), "column6");
- Assert.assertEquals(selectionDataSchema.getColumnName(1), "column1");
- Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
- Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.INT);
+ Assert.assertTrue(columnIndexMap.containsKey("column6"));
+ Assert.assertTrue(columnIndexMap.containsKey("column1"));
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT);
+ Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
selectionResult = (Queue<Serializable[]>) resultsBlock.getSelectionResult();
Assert.assertEquals(selectionResult.size(), 10);
lastRow = selectionResult.peek();
Assert.assertEquals(lastRow.length, 11);
- Assert.assertEquals(((Integer) lastRow[0]).intValue(), 6043515);
- Assert.assertEquals(((Integer) lastRow[1]).intValue(), 462769197);
+ Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
+ Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 462769197);
}
private int getVirtualColumns(DataSchema selectionDataSchema) {
@@ -274,4 +291,13 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
}
return virtualCols;
}
+
+ private Map<String, Integer> computeColumnNameToIndexMap(DataSchema dataSchema) {
+ Map<String, Integer> columnIndexMap = new HashMap<>();
+
+ for (int i = 0; i < dataSchema.size(); i++) {
+ columnIndexMap.put(dataSchema.getColumnName(i), i);
+ }
+ return columnIndexMap;
+ }
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/DefaultAggregationExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/DefaultAggregationExecutorTest.java
index 62402aa..2d8e0ea 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/DefaultAggregationExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/DefaultAggregationExecutorTest.java
@@ -138,7 +138,7 @@ public class DefaultAggregationExecutorTest {
MatchAllFilterOperator matchAllFilterOperator = new MatchAllFilterOperator(totalRawDocs);
DocIdSetOperator docIdSetOperator = new DocIdSetOperator(matchAllFilterOperator, DocIdSetPlanNode.MAX_DOC_PER_CALL);
ProjectionOperator projectionOperator = new ProjectionOperator(dataSourceMap, docIdSetOperator);
- TransformOperator transformOperator = new TransformOperator(projectionOperator, expressionTrees);
+ TransformOperator transformOperator = new TransformOperator(projectionOperator, new ArrayList<>(expressionTrees));
TransformBlock transformBlock = transformOperator.nextBlock();
int numAggFuncs = _aggregationInfoList.size();
AggregationFunctionContext[] aggrFuncContextArray = new AggregationFunctionContext[numAggFuncs];
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index ac6a445..8d9d144 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
@@ -40,7 +41,9 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.plan.SelectionPlanNode;
import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -135,11 +138,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
private void registerCallbackHandlers() {
List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName);
- instances.removeIf(instance -> (!instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) && !instance.startsWith(
- CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)));
+ instances.removeIf(instance -> (!instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) && !instance
+ .startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)));
List<String> resourcesInCluster = _helixAdmin.getResourcesInCluster(_clusterName);
- resourcesInCluster.removeIf(resource -> (!TableNameBuilder.isTableResource(resource)
- && !CommonConstants.Helix.BROKER_RESOURCE_INSTANCE.equals(resource)));
+ resourcesInCluster.removeIf(
+ resource -> (!TableNameBuilder.isTableResource(resource) && !CommonConstants.Helix.BROKER_RESOURCE_INSTANCE
+ .equals(resource)));
for (String instance : instances) {
List<String> resourcesToMonitor = new ArrayList<>();
for (String resourceName : resourcesInCluster) {
@@ -151,11 +155,11 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}
}
}
- _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of(
- new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _clusterName, instance,
- resourcesToMonitor, 100.0),
- new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName, instance,
- resourcesToMonitor, 100.0))));
+ _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
+ .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _clusterName,
+ instance, resourcesToMonitor, 100.0),
+ new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName,
+ instance, resourcesToMonitor, 100.0))));
}
}
@@ -459,7 +463,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}
@Test
- public void testUDF()
+ public void testGroupByUDF()
throws Exception {
String pqlQuery = "SELECT COUNT(*) FROM mytable GROUP BY timeConvert(DaysSinceEpoch,'DAYS','SECONDS')";
JsonNode response = postQuery(pqlQuery);
@@ -532,6 +536,68 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
assertEquals(aggregationResult.get("value").asDouble(), 16071.0 / 2);
}
+ @Test
+ public void testAggregationUDF()
+ throws Exception {
+
+ String pqlQuery = "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable";
+ JsonNode response = postQuery(pqlQuery);
+ JsonNode aggregationResult = response.get("aggregationResults").get(0);
+ assertEquals(aggregationResult.get("function").asText(), "max_timeconvert(DaysSinceEpoch,'DAYS','SECONDS')");
+ assertEquals(aggregationResult.get("value").asDouble(), 16435.0 * 24 * 3600);
+
+ pqlQuery = "SELECT MIN(div(DaysSinceEpoch,2)) FROM mytable";
+ response = postQuery(pqlQuery);
+ aggregationResult = response.get("aggregationResults").get(0);
+ assertEquals(aggregationResult.get("function").asText(), "min_div(DaysSinceEpoch,'2')");
+ assertEquals(aggregationResult.get("value").asDouble(), 16071.0 / 2);
+ }
+
+ @Test
+ public void testSelectionUDF()
+ throws Exception {
+ String pqlQuery = "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable";
+ JsonNode response = postQuery(pqlQuery);
+ ArrayNode selectionResults = (ArrayNode) response.get("selectionResults").get("results");
+ Assert.assertNotNull(selectionResults);
+ Assert.assertTrue(selectionResults.size() > 0);
+ for (int i = 0; i < selectionResults.size(); i++) {
+ long daysSinceEpoch = selectionResults.get(i).get(0).asLong();
+ long secondsSinceEpoch = selectionResults.get(i).get(1).asLong();
+ Assert.assertEquals(daysSinceEpoch * 24 * 60 * 60, secondsSinceEpoch);
+ }
+
+ pqlQuery =
+ "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000";
+ response = postQuery(pqlQuery);
+ selectionResults = (ArrayNode) response.get("selectionResults").get("results");
+ Assert.assertNotNull(selectionResults);
+ Assert.assertTrue(selectionResults.size() > 0);
+ long prevValue = -1;
+ for (int i = 0; i < selectionResults.size(); i++) {
+ long daysSinceEpoch = selectionResults.get(i).get(0).asLong();
+ long secondsSinceEpoch = selectionResults.get(i).get(1).asLong();
+ Assert.assertEquals(daysSinceEpoch * 24 * 60 * 60, secondsSinceEpoch);
+ Assert.assertTrue(daysSinceEpoch >= prevValue);
+ prevValue = daysSinceEpoch;
+ }
+
+ pqlQuery =
+ "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000";
+ response = postQuery(pqlQuery);
+ selectionResults = (ArrayNode) response.get("selectionResults").get("results");
+ Assert.assertNotNull(selectionResults);
+ Assert.assertTrue(selectionResults.size() > 0);
+ prevValue = Long.MAX_VALUE;
+ for (int i = 0; i < selectionResults.size(); i++) {
+ long daysSinceEpoch = selectionResults.get(i).get(0).asLong();
+ long secondsSinceEpoch = selectionResults.get(i).get(1).asLong();
+ Assert.assertEquals(daysSinceEpoch * 24 * 60 * 60, secondsSinceEpoch);
+ Assert.assertTrue(secondsSinceEpoch <= prevValue);
+ prevValue = secondsSinceEpoch;
+ }
+ }
+
@AfterClass
public void tearDown()
throws Exception {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeV2ClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeV2ClusterIntegrationTest.java
index 2a13eb6..5485dc3 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeV2ClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeV2ClusterIntegrationTest.java
@@ -29,7 +29,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig;
import org.apache.pinot.tools.query.comparison.SegmentInfoProvider;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org