You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2019/06/25 18:40:47 UTC

[incubator-pinot] branch master updated: Adding support for udfs in Selection (#4346)

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b75a4c0  Adding support for udfs in Selection (#4346)
b75a4c0 is described below

commit b75a4c00169d68f9aabc87b01424664b73081ed3
Author: Kishore Gopalakrishna <g....@gmail.com>
AuthorDate: Tue Jun 25 11:40:43 2019 -0700

    Adding support for udfs in Selection (#4346)
    
    * Adding support for udfs in Selection (#22)
    
    * Adding support for udfs in Selection
    
    * Addressing review comments
    
    * Adding license headers
    
    * Adding support for bytes type in order by
    
    * Minor fixes for selection-udf. Adding test cases (#25)
    
    * More fixes for UDF in order by
    
    * Fixing bugs in Order By. Enhancing test case to be independent of the column order in the result
    
    * Addressing review comments and deleting old SelectionOnly and SelectionOrderBy operators
    
    * Removing commented line
---
 .../common}/function/AggregationFunctionType.java  |   5 +-
 .../function/FunctionDefinitionRegistry.java       |  36 +++
 .../pinot/common/utils/request/RequestUtils.java   |   3 +-
 .../parsers/PinotQuery2BrokerRequestConverter.java |  19 +-
 .../pinot/pql/parsers/pql2/ast/OrderByAstNode.java |   5 +-
 .../pql/parsers/pql2/ast/OutputColumnAstNode.java  |  20 +-
 .../core/data/aggregator/AvgValueAggregator.java   |   2 +-
 .../core/data/aggregator/CountValueAggregator.java |   2 +-
 .../DistinctCountHLLValueAggregator.java           |   2 +-
 .../core/data/aggregator/MaxValueAggregator.java   |   2 +-
 .../aggregator/MinMaxRangeValueAggregator.java     |   2 +-
 .../core/data/aggregator/MinValueAggregator.java   |   2 +-
 .../aggregator/PercentileEstValueAggregator.java   |   2 +-
 .../PercentileTDigestValueAggregator.java          |   2 +-
 .../core/data/aggregator/SumValueAggregator.java   |   2 +-
 .../core/data/aggregator/ValueAggregator.java      |   2 +-
 .../data/aggregator/ValueAggregatorFactory.java    |   2 +-
 .../query/DictionaryBasedAggregationOperator.java  |   2 +-
 .../query/MetadataBasedAggregationOperator.java    |   2 +-
 .../core/operator/query/SelectionOnlyOperator.java | 109 -------
 .../core/operator/query/SelectionOperator.java     | 282 +++++++++++++++++
 .../operator/query/SelectionOrderByOperator.java   | 116 -------
 .../transform/TransformBlockDataFetcher.java       | 340 +++++++++++++++++++++
 .../core/operator/transform/TransformOperator.java |  13 +-
 .../plan/MetadataBasedAggregationPlanNode.java     |   2 +-
 .../apache/pinot/core/plan/SelectionPlanNode.java  |  38 +--
 .../apache/pinot/core/plan/TransformPlanNode.java  |  50 ++-
 .../core/plan/maker/InstancePlanMakerImplV2.java   |   2 +-
 .../aggregation/DefaultAggregationExecutor.java    |   2 +-
 .../aggregation/function/AggregationFunction.java  |   1 +
 .../function/AggregationFunctionFactory.java       |   1 +
 .../function/AggregationFunctionUtils.java         |   1 +
 .../function/AvgAggregationFunction.java           |   1 +
 .../function/AvgMVAggregationFunction.java         |   1 +
 .../function/CountAggregationFunction.java         |   1 +
 .../function/CountMVAggregationFunction.java       |   1 +
 .../function/DistinctCountAggregationFunction.java |   1 +
 .../DistinctCountHLLAggregationFunction.java       |   1 +
 .../DistinctCountHLLMVAggregationFunction.java     |   1 +
 .../DistinctCountMVAggregationFunction.java        |   1 +
 .../DistinctCountRawHLLAggregationFunction.java    |   1 +
 .../DistinctCountRawHLLMVAggregationFunction.java  |   1 +
 .../function/FastHLLAggregationFunction.java       |   1 +
 .../function/MaxAggregationFunction.java           |   1 +
 .../function/MaxMVAggregationFunction.java         |   1 +
 .../function/MinAggregationFunction.java           |   1 +
 .../function/MinMVAggregationFunction.java         |   1 +
 .../function/MinMaxRangeAggregationFunction.java   |   1 +
 .../function/MinMaxRangeMVAggregationFunction.java |   1 +
 .../function/PercentileAggregationFunction.java    |   1 +
 .../function/PercentileEstAggregationFunction.java |   1 +
 .../PercentileEstMVAggregationFunction.java        |   1 +
 .../function/PercentileMVAggregationFunction.java  |   1 +
 .../PercentileTDigestAggregationFunction.java      |   1 +
 .../PercentileTDigestMVAggregationFunction.java    |   1 +
 .../function/SumAggregationFunction.java           |   1 +
 .../function/SumMVAggregationFunction.java         |   1 +
 .../groupby/DefaultGroupByExecutor.java            |   2 +-
 .../core/query/request/ServerQueryRequest.java     |  18 +-
 .../query/selection/SelectionOperatorUtils.java    |   4 +-
 .../executor/StarTreeAggregationExecutor.java      |   2 +-
 .../startree/executor/StarTreeGroupByExecutor.java |   2 +-
 .../startree/plan/StarTreeTransformPlanNode.java   |   3 +-
 .../startree/v2/AggregationFunctionColumnPair.java |   2 +-
 .../startree/v2/builder/BaseSingleTreeBuilder.java |   2 +-
 .../startree/v2/store/StarTreeLoaderUtils.java     |   2 +-
 .../function/AggregationFunctionFactoryTest.java   |   1 +
 .../function/AggregationFunctionTypeTest.java      |   1 +
 .../v2/AggregationFunctionColumnPairTest.java      |   2 +-
 .../pinot/core/startree/v2/BaseStarTreeV2Test.java |   2 +-
 ...InnerSegmentSelectionMultiValueQueriesTest.java | 123 +++++---
 ...nnerSegmentSelectionSingleValueQueriesTest.java | 146 +++++----
 .../DefaultAggregationExecutorTest.java            |   2 +-
 .../tests/OfflineClusterIntegrationTest.java       |  86 +++++-
 .../tests/StarTreeV2ClusterIntegrationTest.java    |   2 +-
 75 files changed, 1072 insertions(+), 426 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
similarity index 93%
rename from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionType.java
rename to pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index d4b7ef8..03b39c9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -16,10 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.query.aggregation.function;
+package org.apache.pinot.common.function;
 
 import javax.annotation.Nonnull;
-import org.apache.pinot.core.query.exception.BadQueryRequestException;
 
 
 public enum AggregationFunctionType {
@@ -99,7 +98,7 @@ public enum AggregationFunctionType {
         return AggregationFunctionType.valueOf(upperCaseFunctionName);
       }
     } catch (Exception e) {
-      throw new BadQueryRequestException("Invalid aggregation function name: " + functionName);
+      throw new UnsupportedOperationException("Invalid aggregation function name: " + functionName);
     }
   }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionDefinitionRegistry.java b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionDefinitionRegistry.java
new file mode 100644
index 0000000..f651162
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionDefinitionRegistry.java
@@ -0,0 +1,36 @@
+package org.apache.pinot.common.function;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Place where all functions are registered.
+ */
+public class FunctionDefinitionRegistry {
+
+  public static boolean isAggFunc(String functionName) {
+
+    try {
+      AggregationFunctionType.getAggregationFunctionType(functionName);
+      return true;
+    } catch (Exception e) {
+      //ignore
+    }
+    return false;
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
index 1b3f110..58a2074 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
@@ -21,6 +21,7 @@ package org.apache.pinot.common.utils.request;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -248,7 +249,7 @@ public class RequestUtils {
    * Extracts all columns from the given selection, '*' will be ignored.
    */
   public static Set<String> extractSelectionColumns(Selection selection) {
-    Set<String> selectionColumns = new HashSet<>();
+    Set<String> selectionColumns = new LinkedHashSet<>();
     for (String selectionColumn : selection.getSelectionColumns()) {
       if (!selectionColumn.equals("*")) {
         selectionColumns.add(selectionColumn);
diff --git a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/PinotQuery2BrokerRequestConverter.java b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/PinotQuery2BrokerRequestConverter.java
index 7724d87..a157d8e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/PinotQuery2BrokerRequestConverter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/PinotQuery2BrokerRequestConverter.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.common.function.FunctionDefinitionRegistry;
 import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.Expression;
@@ -127,11 +128,21 @@ public class PinotQuery2BrokerRequestConverter {
           selection.addToSelectionColumns(expression.getIdentifier().getName());
           break;
         case FUNCTION:
-          AggregationInfo aggInfo = buildAggregationInfo(expression.getFunctionCall());
-          if (aggregationInfoList == null) {
-            aggregationInfoList = new ArrayList<>();
+
+          Function functionCall = expression.getFunctionCall();
+          String functionName = functionCall.getOperator();
+          if (FunctionDefinitionRegistry.isAggFunc(functionName)) {
+            AggregationInfo aggInfo = buildAggregationInfo(functionCall);
+            if (aggregationInfoList == null) {
+              aggregationInfoList = new ArrayList<>();
+            }
+            aggregationInfoList.add(aggInfo);
+          } else {
+            if (selection == null) {
+              selection = new Selection();
+            }
+            selection.addToSelectionColumns(standardizeExpression(expression, false));
           }
-          aggregationInfoList.add(aggInfo);
           break;
       }
     }
diff --git a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OrderByAstNode.java b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OrderByAstNode.java
index ee0677a..e776c24 100644
--- a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OrderByAstNode.java
+++ b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OrderByAstNode.java
@@ -24,6 +24,7 @@ import org.apache.pinot.common.request.Function;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.request.Selection;
 import org.apache.pinot.common.request.SelectionSort;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.pql.parsers.Pql2CompilationException;
 
@@ -40,7 +41,9 @@ public class OrderByAstNode extends BaseAstNode {
       if (astNode instanceof OrderByExpressionAstNode) {
         OrderByExpressionAstNode node = (OrderByExpressionAstNode) astNode;
         SelectionSort elem = new SelectionSort();
-        elem.setColumn(node.getColumn());
+        TransformExpressionTree transformExpressionTree =
+            TransformExpressionTree.compileToExpressionTree(node.getColumn());
+        elem.setColumn(transformExpressionTree.toString());
         elem.setIsAsc("asc".equalsIgnoreCase(node.getOrdering()));
         selections.addToSelectionSortSequence(elem);
       } else {
diff --git a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OutputColumnAstNode.java b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OutputColumnAstNode.java
index 3798fbd..d53e201 100644
--- a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OutputColumnAstNode.java
+++ b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OutputColumnAstNode.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pinot.pql.parsers.pql2.ast;
 
+import org.apache.pinot.common.function.FunctionDefinitionRegistry;
+import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.request.Selection;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.pql.parsers.Pql2CompilationException;
 
@@ -33,10 +36,21 @@ public class OutputColumnAstNode extends BaseAstNode {
   @Override
   public void updateBrokerRequest(BrokerRequest brokerRequest) {
     for (AstNode astNode : getChildren()) {
-      // If the column is a function call, it must be an aggregation function
       if (astNode instanceof FunctionCallAstNode) {
-        FunctionCallAstNode node = (FunctionCallAstNode) astNode;
-        brokerRequest.addToAggregationsInfo(node.buildAggregationInfo());
+        String functionName = ((FunctionCallAstNode) astNode).getName();
+        if (FunctionDefinitionRegistry.isAggFunc(functionName)) {
+          FunctionCallAstNode node = (FunctionCallAstNode) astNode;
+          brokerRequest.addToAggregationsInfo(node.buildAggregationInfo());
+        } else {
+          Selection selection = brokerRequest.getSelections();
+          if (selection == null) {
+            selection = new Selection();
+            brokerRequest.setSelections(selection);
+          }
+          TransformExpressionTree transformExpressionTree =
+              TransformExpressionTree.compileToExpressionTree(((FunctionCallAstNode) astNode).getExpression());
+          selection.addToSelectionColumns(transformExpressionTree.toString());
+        }
       } else if (astNode instanceof IdentifierAstNode) {
         Selection selection = brokerRequest.getSelections();
         if (selection == null) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
index e5841f7..7e57ddb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/AvgValueAggregator.java
@@ -20,7 +20,7 @@ package org.apache.pinot.core.data.aggregator;
 
 import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
 
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/CountValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/CountValueAggregator.java
index dfd130b..820e303 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/CountValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/CountValueAggregator.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.data.aggregator;
 
 import org.apache.pinot.common.data.FieldSpec.DataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 
 
 public class CountValueAggregator implements ValueAggregator<Void, Long> {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
index 15ad278..5b5e8c8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/DistinctCountHLLValueAggregator.java
@@ -22,7 +22,7 @@ import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.query.aggregation.function.DistinctCountHLLAggregationFunction;
 
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MaxValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MaxValueAggregator.java
index 24ee224..b6b2280 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MaxValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MaxValueAggregator.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.data.aggregator;
 
 import org.apache.pinot.common.data.FieldSpec.DataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 
 
 public class MaxValueAggregator implements ValueAggregator<Number, Double> {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
index a6a8d55..60e2d94 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinMaxRangeValueAggregator.java
@@ -20,7 +20,7 @@ package org.apache.pinot.core.data.aggregator;
 
 import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
 
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinValueAggregator.java
index b5153a7..5b15f9a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/MinValueAggregator.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.data.aggregator;
 
 import org.apache.pinot.common.data.FieldSpec.DataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 
 
 public class MinValueAggregator implements ValueAggregator<Number, Double> {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
index a57f324..857f54c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileEstValueAggregator.java
@@ -20,7 +20,7 @@ package org.apache.pinot.core.data.aggregator;
 
 import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.query.aggregation.function.PercentileEstAggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
index 9d398eb..6988921 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/PercentileTDigestValueAggregator.java
@@ -21,7 +21,7 @@ package org.apache.pinot.core.data.aggregator;
 import com.tdunning.math.stats.TDigest;
 import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
 
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/SumValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/SumValueAggregator.java
index fcbc596..7712b40 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/SumValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/SumValueAggregator.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.data.aggregator;
 
 import org.apache.pinot.common.data.FieldSpec.DataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 
 
 public class SumValueAggregator implements ValueAggregator<Number, Double> {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregator.java
index 67ae929..7abcf1e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregator.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.data.aggregator;
 
 import org.apache.pinot.common.data.FieldSpec.DataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 
 
 /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java
index eb3c688..2b4946d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/aggregator/ValueAggregatorFactory.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.data.aggregator;
 
 import org.apache.pinot.common.data.FieldSpec.DataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 
 
 /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
index 192d0b8..0bb3de5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
@@ -29,7 +29,7 @@ import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.DoubleAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
index 63b4a9a..5c9c36c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/MetadataBasedAggregationOperator.java
@@ -28,7 +28,7 @@ import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.ExecutionStatistics;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 
 
 /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
deleted file mode 100644
index 01615ce..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOnlyOperator.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.query;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.apache.pinot.common.request.Selection;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Block;
-import org.apache.pinot.core.indexsegment.IndexSegment;
-import org.apache.pinot.core.operator.BaseOperator;
-import org.apache.pinot.core.operator.ExecutionStatistics;
-import org.apache.pinot.core.operator.ProjectionOperator;
-import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
-import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.operator.blocks.ProjectionBlock;
-import org.apache.pinot.core.query.selection.SelectionFetcher;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
-
-
-/**
- * This SelectionOnlyOperator will take care of applying a selection query to one IndexSegment.
- * nextBlock() will return an IntermediateResultBlock for the given IndexSegment.
- *
- *
- */
-public class SelectionOnlyOperator extends BaseOperator<IntermediateResultsBlock> {
-  private static final String OPERATOR_NAME = "SelectionOnlyOperator";
-
-  private final IndexSegment _indexSegment;
-  private final ProjectionOperator _projectionOperator;
-  private final DataSchema _dataSchema;
-  private final Block[] _blocks;
-  private final int _limitDocs;
-  private final Collection<Serializable[]> _rowEvents;
-  private ExecutionStatistics _executionStatistics;
-
-  public SelectionOnlyOperator(IndexSegment indexSegment, Selection selection, ProjectionOperator projectionOperator) {
-    _indexSegment = indexSegment;
-    _limitDocs = selection.getSize();
-    _projectionOperator = projectionOperator;
-    List<String> selectionColumns =
-        SelectionOperatorUtils.getSelectionColumns(selection.getSelectionColumns(), indexSegment);
-    _dataSchema = SelectionOperatorUtils.extractDataSchema(null, selectionColumns, indexSegment);
-    _blocks = new Block[selectionColumns.size()];
-    _rowEvents = new ArrayList<>();
-  }
-
-  @Override
-  protected IntermediateResultsBlock getNextBlock() {
-    int numDocsScanned = 0;
-
-    ProjectionBlock projectionBlock;
-    while ((projectionBlock = _projectionOperator.nextBlock()) != null) {
-      for (int i = 0; i < _dataSchema.size(); i++) {
-        _blocks[i] = projectionBlock.getBlock(_dataSchema.getColumnName(i));
-      }
-      SelectionFetcher selectionFetcher = new SelectionFetcher(_blocks, _dataSchema);
-      DocIdSetBlock docIdSetBlock = projectionBlock.getDocIdSetBlock();
-      int numDocsToFetch = Math.min(docIdSetBlock.getSearchableLength(), _limitDocs - _rowEvents.size());
-      numDocsScanned += numDocsToFetch;
-      int[] docIdSet = docIdSetBlock.getDocIdSet();
-      for (int i = 0; i < numDocsToFetch; i++) {
-        _rowEvents.add(selectionFetcher.getRow(docIdSet[i]));
-      }
-      if (_rowEvents.size() == _limitDocs) {
-        break;
-      }
-    }
-
-    // Create execution statistics.
-    long numEntriesScannedInFilter = _projectionOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
-    long numEntriesScannedPostFilter = numDocsScanned * _projectionOperator.getNumColumnsProjected();
-    long numTotalRawDocs = _indexSegment.getSegmentMetadata().getTotalRawDocs();
-    _executionStatistics =
-        new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
-            numTotalRawDocs);
-
-    return new IntermediateResultsBlock(_dataSchema, _rowEvents);
-  }
-
-  @Override
-  public String getOperatorName() {
-    return OPERATOR_NAME;
-  }
-
-  @Override
-  public ExecutionStatistics getExecutionStatistics() {
-    return _executionStatistics;
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOperator.java
new file mode 100644
index 0000000..c20d194
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOperator.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import org.apache.pinot.common.data.FieldSpec.DataType;
+import org.apache.pinot.common.request.Selection;
+import org.apache.pinot.common.request.SelectionSort;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.primitive.ByteArray;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformBlockDataFetcher;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+
+
+/**
+ * This SelectionOnlyOperator will take care of applying a selection query to one IndexSegment.
+ * nextBlock() will return an IntermediateResultBlock for the given IndexSegment.
+ */
+public class SelectionOperator extends BaseOperator<IntermediateResultsBlock> {
+
+  private static final String OPERATOR_NAME = "SelectionOnlyOperator";
+
+  private final IndexSegment _indexSegment;
+  private final TransformOperator _transformOperator;
+  private final DataSchema _dataSchema;
+  private final BlockValSet[] _blockValSets;
+  private final int _limit;
+  private Selection _selection;
+  private final int _offset;
+  private final int _maxRows;
+  private final List<TransformExpressionTree> _expressions;
+  private final List<TransformExpressionTree> _selectExpressions;
+  private final List<TransformExpressionTree> _orderByExpressions;
+  private final List<Integer> _orderByIndices;
+  private final TransformResultMetadata[] _expressionResultMetadata;
+  private final Dictionary[] _dictionaries;
+  private Collection<Serializable[]> _rowEvents;
+  private PriorityQueue<Serializable[]> _priorityQueue;
+
+  private ExecutionStatistics _executionStatistics;
+
+  public SelectionOperator(IndexSegment indexSegment, Selection selection, TransformOperator transformOperator) {
+    _indexSegment = indexSegment;
+    _offset = selection.getOffset();
+    _limit = selection.getSize();
+    _selection = selection;
+    _maxRows = _offset + _limit;
+    _transformOperator = transformOperator;
+    _expressions = _transformOperator.getExpressions();
+
+    List<String> selectColumns = selection.getSelectionColumns();
+    if (selectColumns.size() == 1 && selectColumns.get(0).equals("*")) {
+      selectColumns = new LinkedList<>(indexSegment.getPhysicalColumnNames());
+    }
+
+    List<String> orderByColumns = new ArrayList<>();
+    if (selection.getSelectionSortSequence() != null) {
+      for (SelectionSort selectionSort : selection.getSelectionSortSequence()) {
+        String expression = selectionSort.getColumn();
+        orderByColumns.add(expression);
+      }
+    }
+    _selectExpressions = new ArrayList<>();
+    _orderByExpressions = new ArrayList<>();
+    _orderByIndices = new ArrayList<>();
+    for (int i = 0; i < _expressions.size(); i++) {
+      TransformExpressionTree expression = _expressions.get(i);
+      if (selectColumns.contains(expression.toString())) {
+        _selectExpressions.add(expression);
+      }
+    }
+
+    for (String orderByColumn : orderByColumns) {
+      for (int i = 0; i < _expressions.size(); i++) {
+        TransformExpressionTree expression = _expressions.get(i);
+        if (orderByColumn.equalsIgnoreCase(expression.toString())) {
+          _orderByExpressions.add(expression);
+          _orderByIndices.add(i);
+        }
+      }
+    }
+
+    _blockValSets = new BlockValSet[_expressions.size()];
+    _expressionResultMetadata = new TransformResultMetadata[_expressions.size()];
+    _dictionaries = new Dictionary[_expressions.size()];
+    DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[_expressions.size()];
+    String[] columnNames = new String[_expressions.size()];
+    for (int i = 0; i < _expressions.size(); i++) {
+      TransformExpressionTree expression = _expressions.get(i);
+      _expressionResultMetadata[i] = _transformOperator.getResultMetadata(expression);
+      columnDataTypes[i] = DataSchema.ColumnDataType
+          .fromDataType(_expressionResultMetadata[i].getDataType(), _expressionResultMetadata[i].isSingleValue());
+      columnNames[i] = expression.toString();
+      if (_expressionResultMetadata[i].hasDictionary()) {
+        _dictionaries[i] = _transformOperator.getDictionary(expression);
+      }
+    }
+    _dataSchema = new DataSchema(columnNames, columnDataTypes);
+    if (_orderByExpressions.isEmpty()) {
+      _rowEvents = new ArrayList<>();
+    } else {
+      Comparator<Serializable[]> comparator = getStrictComparator();
+      _priorityQueue = new PriorityQueue<>(_maxRows, comparator);
+    }
+  }
+
+  private Comparator<Serializable[]> getStrictComparator() {
+    return new Comparator<Serializable[]>() {
+      @Override
+      public int compare(Serializable[] o1, Serializable[] o2) {
+        List<SelectionSort> sortSequence = _selection.getSelectionSortSequence();
+        int numSortColumns = sortSequence.size();
+        for (int i = 0; i < numSortColumns; i++) {
+          int ret = 0;
+          SelectionSort selectionSort = sortSequence.get(i);
+          int index = _orderByIndices.get(i);
+          // Only compare single-value columns.
+          if (!_expressionResultMetadata[index].isSingleValue()) {
+            continue;
+          }
+
+          Serializable v1 = o1[index];
+          Serializable v2 = o2[index];
+
+          DataType dataType = _expressionResultMetadata[index].getDataType();
+          switch (dataType) {
+            case INT:
+              if (!selectionSort.isIsAsc()) {
+                ret = ((Integer) v1).compareTo((Integer) v2);
+              } else {
+                ret = ((Integer) v2).compareTo((Integer) v1);
+              }
+              break;
+            case LONG:
+              if (!selectionSort.isIsAsc()) {
+                ret = ((Long) v1).compareTo((Long) v2);
+              } else {
+                ret = ((Long) v2).compareTo((Long) v1);
+              }
+              break;
+            case FLOAT:
+              if (!selectionSort.isIsAsc()) {
+                ret = ((Float) v1).compareTo((Float) v2);
+              } else {
+                ret = ((Float) v2).compareTo((Float) v1);
+              }
+              break;
+            case DOUBLE:
+              if (!selectionSort.isIsAsc()) {
+                ret = ((Double) v1).compareTo((Double) v2);
+              } else {
+                ret = ((Double) v2).compareTo((Double) v1);
+              }
+              break;
+            case BOOLEAN:
+            case STRING:
+              if (!selectionSort.isIsAsc()) {
+                ret = ((String) v1).compareTo((String) v2);
+              } else {
+                ret = ((String) v2).compareTo((String) v1);
+              }
+              break;
+            case BYTES:
+              if (!selectionSort.isIsAsc()) {
+                ret = ByteArray.compare((byte[]) v1, (byte[]) v2);
+              } else {
+                ret = ByteArray.compare((byte[]) v2, (byte[]) v1);
+              }
+            default:
+              break;
+          }
+
+          if (ret != 0) {
+            return ret;
+          }
+        }
+        return 0;
+      }
+    };
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    int numDocsScanned = 0;
+
+    TransformBlock transformBlock;
+    boolean selectionOnly = _orderByExpressions.isEmpty();
+    while ((transformBlock = _transformOperator.nextBlock()) != null) {
+      for (int i = 0; i < _expressions.size(); i++) {
+        TransformExpressionTree expression = _expressions.get(i);
+        _blockValSets[i] = transformBlock.getBlockValueSet(expression);
+      }
+      TransformBlockDataFetcher dataFetcher;
+      dataFetcher = new TransformBlockDataFetcher(_blockValSets, _dictionaries, _expressionResultMetadata);
+      if (selectionOnly) {
+        int docId = 0;
+        while (_rowEvents.size() < _limit && docId < transformBlock.getNumDocs()) {
+          Serializable[] row = dataFetcher.getRow(docId);
+          _rowEvents.add(row);
+          docId = docId + 1;
+        }
+        if (_rowEvents.size() >= _limit) {
+          numDocsScanned += docId;
+          break;
+        }
+      } else {
+        //Selection + orderby
+        int docId = 0;
+        while (docId < transformBlock.getNumDocs()) {
+          Serializable[] row = dataFetcher.getRow(docId);
+          if (_priorityQueue.size() < _maxRows) {
+            _priorityQueue.add(row);
+          } else if (_priorityQueue.comparator().compare(_priorityQueue.peek(), row) < 0) {
+            _priorityQueue.poll();
+            _priorityQueue.offer(row);
+          }
+          docId = docId + 1;
+        }
+        numDocsScanned += docId;
+      }
+    }
+
+    // Create execution statistics.
+    long numEntriesScannedInFilter = _transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+    long numEntriesScannedPostFilter = numDocsScanned * _transformOperator.getNumColumnsProjected();
+    long numTotalRawDocs = _indexSegment.getSegmentMetadata().getTotalRawDocs();
+    _executionStatistics =
+        new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+            numTotalRawDocs);
+    if (selectionOnly) {
+      return new IntermediateResultsBlock(_dataSchema, _rowEvents);
+    } else {
+      return new IntermediateResultsBlock(_dataSchema, _priorityQueue);
+    }
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  public ExecutionStatistics getExecutionStatistics() {
+    return _executionStatistics;
+  }
+}
+
+
+
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
deleted file mode 100644
index b7ddd1e..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.operator.query;
-
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.pinot.common.request.Selection;
-import org.apache.pinot.common.request.SelectionSort;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.common.Block;
-import org.apache.pinot.core.indexsegment.IndexSegment;
-import org.apache.pinot.core.operator.BaseOperator;
-import org.apache.pinot.core.operator.ExecutionStatistics;
-import org.apache.pinot.core.operator.ProjectionOperator;
-import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
-import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
-import org.apache.pinot.core.operator.blocks.ProjectionBlock;
-import org.apache.pinot.core.query.selection.SelectionOperatorService;
-
-
-/**
- * This MSelectionOperator will take care of applying a selection query to one IndexSegment.
- * nextBlock() will return an IntermediateResultBlock for the given IndexSegment.
- *
- *
- */
-public class SelectionOrderByOperator extends BaseOperator<IntermediateResultsBlock> {
-  private static final String OPERATOR_NAME = "SelectionOrderByOperator";
-
-  private final IndexSegment _indexSegment;
-  private final ProjectionOperator _projectionOperator;
-  private final Selection _selection;
-  private final SelectionOperatorService _selectionOperatorService;
-  private final DataSchema _dataSchema;
-  private final Block[] _blocks;
-  private final Set<String> _selectionColumns = new HashSet<>();
-  private ExecutionStatistics _executionStatistics;
-
-  public SelectionOrderByOperator(IndexSegment indexSegment, Selection selection,
-      ProjectionOperator projectionOperator) {
-    _indexSegment = indexSegment;
-    _selection = selection;
-    _projectionOperator = projectionOperator;
-
-    initColumnarDataSourcePlanNodeMap(indexSegment);
-    _selectionOperatorService = new SelectionOperatorService(_selection, indexSegment);
-    _dataSchema = _selectionOperatorService.getDataSchema();
-    _blocks = new Block[_selectionColumns.size()];
-  }
-
-  private void initColumnarDataSourcePlanNodeMap(IndexSegment indexSegment) {
-    _selectionColumns.addAll(_selection.getSelectionColumns());
-    if ((_selectionColumns.size() == 1) && ((_selectionColumns.toArray(new String[0]))[0].equals("*"))) {
-      _selectionColumns.clear();
-      for (String columnName : indexSegment.getPhysicalColumnNames()) {
-        _selectionColumns.add(columnName);
-      }
-    }
-    if (_selection.getSelectionSortSequence() != null) {
-      for (SelectionSort selectionSort : _selection.getSelectionSortSequence()) {
-        _selectionColumns.add(selectionSort.getColumn());
-      }
-    }
-  }
-
-  @Override
-  protected IntermediateResultsBlock getNextBlock() {
-    int numDocsScanned = 0;
-
-    ProjectionBlock projectionBlock;
-    while ((projectionBlock = _projectionOperator.nextBlock()) != null) {
-      for (int i = 0; i < _dataSchema.size(); i++) {
-        _blocks[i] = projectionBlock.getBlock(_dataSchema.getColumnName(i));
-      }
-      DocIdSetBlock docIdSetBlock = projectionBlock.getDocIdSetBlock();
-      _selectionOperatorService.iterateOnBlocksWithOrdering(docIdSetBlock.getBlockDocIdSet().iterator(), _blocks);
-    }
-
-    // Create execution statistics.
-    numDocsScanned += _selectionOperatorService.getNumDocsScanned();
-    long numEntriesScannedInFilter = _projectionOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
-    long numEntriesScannedPostFilter = numDocsScanned * _projectionOperator.getNumColumnsProjected();
-    long numTotalRawDocs = _indexSegment.getSegmentMetadata().getTotalRawDocs();
-    _executionStatistics =
-        new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
-            numTotalRawDocs);
-
-    return new IntermediateResultsBlock(_selectionOperatorService.getDataSchema(), _selectionOperatorService.getRows());
-  }
-
-  @Override
-  public String getOperatorName() {
-    return OPERATOR_NAME;
-  }
-
-  @Override
-  public ExecutionStatistics getExecutionStatistics() {
-    return _executionStatistics;
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformBlockDataFetcher.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformBlockDataFetcher.java
new file mode 100644
index 0000000..54e83be
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformBlockDataFetcher.java
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform;
+
+import java.io.Serializable;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.FieldSpec.DataType;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
+import org.apache.pinot.common.utils.BytesUtils;
+import org.apache.pinot.common.utils.primitive.ByteArray;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+
+public class TransformBlockDataFetcher {
+
+
+  private final Fetcher[] _fetchers;
+
+  public TransformBlockDataFetcher(BlockValSet[] blockValSets, Dictionary[] dictionaries,
+      TransformResultMetadata[] expressionResultMetadata) {
+    _fetchers = new Fetcher[blockValSets.length];
+    for (int i = 0; i < blockValSets.length; i++) {
+      _fetchers[i] = createFetcher(blockValSets[i], dictionaries[i],
+          expressionResultMetadata[i]);
+    }
+  }
+
+  public Serializable[] getRow(int docId) {
+    Serializable[] row = new Serializable[_fetchers.length];
+    for (int i = 0; i < _fetchers.length; i++) {
+      row[i] = _fetchers[i].getValue(docId);
+    }
+    return row;
+  }
+
+  Fetcher createFetcher(BlockValSet blockValSet,
+      Dictionary dictionary,
+      TransformResultMetadata expressionResultMetadata) {
+    if (expressionResultMetadata.hasDictionary()) {
+      if (expressionResultMetadata.isSingleValue()) {
+        return new DictionaryBasedSVValueFetcher(dictionary,
+            blockValSet.getDictionaryIdsSV(), expressionResultMetadata.getDataType());
+      } else {
+        switch (expressionResultMetadata.getDataType()) {
+          case INT:
+            return new DictionaryBasedMVIntValueFetcher(dictionary,
+                blockValSet.getDictionaryIdsMV());
+          case LONG:
+            return new DictionaryBasedMVLongValueFetcher(dictionary,
+                blockValSet.getDictionaryIdsMV());
+          case FLOAT:
+            return new DictionaryBasedMVFloatValueFetcher(dictionary,
+                blockValSet.getDictionaryIdsMV());
+          case DOUBLE:
+            return new DictionaryBasedMVDoubleValueFetcher(dictionary,
+                blockValSet.getDictionaryIdsMV());
+          case BOOLEAN:
+          case STRING:
+            return new DictionaryBasedMVStringValueFetcher(dictionary,
+                blockValSet.getDictionaryIdsMV());
+          case BYTES:
+            return new DictionaryBasedMVBytesValueFetcher(dictionary,
+                blockValSet.getDictionaryIdsMV());
+        }
+      }
+    } else {
+      switch (expressionResultMetadata.getDataType()) {
+        case INT:
+          return new SVIntValueFetcher(blockValSet.getIntValuesSV());
+        case LONG:
+          return new SVLongValueFetcher(blockValSet.getLongValuesSV());
+        case FLOAT:
+          return new SVFloatValueFetcher(blockValSet.getFloatValuesSV());
+        case DOUBLE:
+          return new SVDoubleValueFetcher(blockValSet.getDoubleValuesSV());
+        case BOOLEAN:
+        case STRING:
+          return new SVStringValueFetcher(blockValSet.getStringValuesSV());
+        case BYTES:
+          return new SVBytesValueFetcher(blockValSet.getBytesValuesSV());
+      }
+    }
+    throw new UnsupportedOperationException();
+  }
+}
+
+
+interface Fetcher {
+
+  Serializable getValue(int docId);
+}
+
+class SVIntValueFetcher implements Fetcher {
+
+  private int[] _values;
+
+  SVIntValueFetcher(int[] values) {
+    _values = values;
+  }
+
+  public Serializable getValue(int docId) {
+    return _values[docId];
+  }
+}
+
+class SVLongValueFetcher implements Fetcher {
+
+  private long[] _values;
+
+  SVLongValueFetcher(long[] values) {
+    _values = values;
+  }
+
+  public Serializable getValue(int docId) {
+    return _values[docId];
+  }
+}
+
+class SVFloatValueFetcher implements Fetcher {
+
+  private float[] _values;
+
+  SVFloatValueFetcher(float[] values) {
+    _values = values;
+  }
+
+  public Serializable getValue(int docId) {
+    return _values[docId];
+  }
+}
+
+class SVDoubleValueFetcher implements Fetcher {
+
+  private double[] _values;
+
+  SVDoubleValueFetcher(double[] values) {
+    _values = values;
+  }
+
+  public Serializable getValue(int docId) {
+    return _values[docId];
+  }
+}
+
+class SVStringValueFetcher implements Fetcher {
+
+  private String[] _values;
+
+  SVStringValueFetcher(String[] values) {
+    _values = values;
+  }
+
+  public Serializable getValue(int docId) {
+    return _values[docId];
+  }
+}
+
+class SVBytesValueFetcher implements Fetcher {
+
+  private byte[][] _values;
+
+  SVBytesValueFetcher(byte[][] values) {
+    _values = values;
+  }
+
+  public Serializable getValue(int docId) {
+    return _values[docId];
+  }
+}
+
+class DictionaryBasedSVValueFetcher implements Fetcher {
+
+  private Dictionary _dictionary;
+  private int[] _dictionaryIds;
+  private DataType _dataType;
+
+  DictionaryBasedSVValueFetcher(Dictionary dictionary, int[] dictionaryIds,
+      DataType dataType) {
+
+    _dictionary = dictionary;
+    _dictionaryIds = dictionaryIds;
+    _dataType = dataType;
+  }
+
+  public Serializable getValue(int docId) {
+    if (_dataType.equals(FieldSpec.DataType.BYTES)) {
+      return BytesUtils.toHexString(_dictionary.getBytesValue(_dictionaryIds[docId]));
+    }
+    return (Serializable) _dictionary.get(_dictionaryIds[docId]);
+  }
+}
+
+
+class DictionaryBasedMVIntValueFetcher implements Fetcher {
+
+  private Dictionary _dictionary;
+  private int[][] _dictionaryIdsArray;
+
+  DictionaryBasedMVIntValueFetcher(Dictionary dictionary, int[][] dictionaryIdsArray) {
+
+    _dictionary = dictionary;
+    _dictionaryIdsArray = dictionaryIdsArray;
+  }
+
+  public Serializable getValue(int docId) {
+    int[] dictIds = _dictionaryIdsArray[docId];
+    int[] values = new int[dictIds.length];
+    for (int i = 0; i < dictIds.length; i++) {
+      int dictId = dictIds[i];
+      values[i] = _dictionary.getIntValue(dictId);
+    }
+    return values;
+  }
+}
+
+class DictionaryBasedMVLongValueFetcher implements Fetcher {
+
+  private Dictionary _dictionary;
+  private int[][] _dictionaryIdsArray;
+
+  DictionaryBasedMVLongValueFetcher(Dictionary dictionary, int[][] dictionaryIdsArray) {
+
+    _dictionary = dictionary;
+    _dictionaryIdsArray = dictionaryIdsArray;
+  }
+
+  public Serializable getValue(int docId) {
+    int[] dictIds = _dictionaryIdsArray[docId];
+    long[] values = new long[dictIds.length];
+    for (int i = 0; i < dictIds.length; i++) {
+      int dictId = dictIds[i];
+      values[i] = _dictionary.getLongValue(dictId);
+    }
+    return values;
+  }
+}
+
+class DictionaryBasedMVFloatValueFetcher implements Fetcher {
+
+  private Dictionary _dictionary;
+  private int[][] _dictionaryIdsArray;
+
+  DictionaryBasedMVFloatValueFetcher(Dictionary dictionary, int[][] dictionaryIdsArray) {
+
+    _dictionary = dictionary;
+    _dictionaryIdsArray = dictionaryIdsArray;
+  }
+
+  public Serializable getValue(int docId) {
+    int[] dictIds = _dictionaryIdsArray[docId];
+    float[] values = new float[dictIds.length];
+    for (int i = 0; i < dictIds.length; i++) {
+      int dictId = dictIds[i];
+      values[i] = _dictionary.getFloatValue(dictId);
+    }
+    return values;
+  }
+}
+
+class DictionaryBasedMVDoubleValueFetcher implements Fetcher {
+
+  private Dictionary _dictionary;
+  private int[][] _dictionaryIdsArray;
+
+  DictionaryBasedMVDoubleValueFetcher(Dictionary dictionary, int[][] dictionaryIdsArray) {
+
+    _dictionary = dictionary;
+    _dictionaryIdsArray = dictionaryIdsArray;
+  }
+
+  public Serializable getValue(int docId) {
+    int[] dictIds = _dictionaryIdsArray[docId];
+    double[] values = new double[dictIds.length];
+    for (int i = 0; i < dictIds.length; i++) {
+      int dictId = dictIds[i];
+      values[i] = _dictionary.getDoubleValue(dictId);
+    }
+    return values;
+  }
+}
+
+class DictionaryBasedMVStringValueFetcher implements Fetcher {
+
+  private Dictionary _dictionary;
+  private int[][] _dictionaryIdsArray;
+
+  DictionaryBasedMVStringValueFetcher(Dictionary dictionary, int[][] dictionaryIdsArray) {
+
+    _dictionary = dictionary;
+    _dictionaryIdsArray = dictionaryIdsArray;
+  }
+
+  public Serializable getValue(int docId) {
+    int[] dictIds = _dictionaryIdsArray[docId];
+    String[] values = new String[dictIds.length];
+    for (int i = 0; i < dictIds.length; i++) {
+      int dictId = dictIds[i];
+      values[i] = _dictionary.getStringValue(dictId);
+    }
+    return values;
+  }
+}
+
+class DictionaryBasedMVBytesValueFetcher implements Fetcher {
+
+  private Dictionary _dictionary;
+  private int[][] _dictionaryIdsArray;
+
+  DictionaryBasedMVBytesValueFetcher(Dictionary dictionary, int[][] dictionaryIdsArray) {
+
+    _dictionary = dictionary;
+    _dictionaryIdsArray = dictionaryIdsArray;
+  }
+
+  public Serializable getValue(int docId) {
+    int[] dictIds = _dictionaryIdsArray[docId];
+    byte[][] values = new byte[dictIds.length][];
+    for (int i = 0; i < dictIds.length; i++) {
+      int dictId = dictIds[i];
+      values[i] = _dictionary.getBytesValue(dictId);
+    }
+    return values;
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
index 1c0356a..2081684 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.operator.transform;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nonnull;
@@ -38,11 +39,13 @@ import org.apache.pinot.core.segment.index.readers.Dictionary;
  * Class for evaluating transform expressions.
  */
 public class TransformOperator extends BaseOperator<TransformBlock> {
+
   private static final String OPERATOR_NAME = "TransformOperator";
 
   private final ProjectionOperator _projectionOperator;
   private final Map<String, DataSource> _dataSourceMap;
   private final Map<TransformExpressionTree, TransformFunction> _transformFunctionMap = new HashMap<>();
+  private final List<TransformExpressionTree> _expressions;
 
   /**
    * Constructor for the class
@@ -51,11 +54,13 @@ public class TransformOperator extends BaseOperator<TransformBlock> {
    * @param expressions Set of expressions to evaluate
    */
   public TransformOperator(@Nonnull ProjectionOperator projectionOperator,
-      @Nonnull Set<TransformExpressionTree> expressions) {
+      @Nonnull List<TransformExpressionTree> expressions) {
     _projectionOperator = projectionOperator;
+    _expressions = expressions;
     _dataSourceMap = projectionOperator.getDataSourceMap();
     for (TransformExpressionTree expression : expressions) {
-      TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+      TransformFunction transformFunction = TransformFunctionFactory
+          .get(expression, _dataSourceMap);
       _transformFunctionMap.put(expression, transformFunction);
     }
   }
@@ -109,4 +114,8 @@ public class TransformOperator extends BaseOperator<TransformBlock> {
   public ExecutionStatistics getExecutionStatistics() {
     return _projectionOperator.getExecutionStatistics();
   }
+
+  public List<TransformExpressionTree> getExpressions() {
+    return _expressions;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/MetadataBasedAggregationPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/MetadataBasedAggregationPlanNode.java
index a09c004..075d2aa 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/MetadataBasedAggregationPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/MetadataBasedAggregationPlanNode.java
@@ -28,7 +28,7 @@ import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.operator.query.MetadataBasedAggregationOperator;
 import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
index 4a4449b..ef6db3b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java
@@ -23,54 +23,38 @@ import org.apache.pinot.common.request.Selection;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.operator.query.EmptySelectionOperator;
-import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
-import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.operator.query.SelectionOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * The <code>SelectionPlanNode</code> class provides the execution plan for selection query on a single segment.
+ * The <code>SelectionPlanNode</code> class provides the execution plan for selection query on a
+ * single segment.
  */
 public class SelectionPlanNode implements PlanNode {
-  private static final Logger LOGGER = LoggerFactory.getLogger(SelectionPlanNode.class);
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(SelectionPlanNode.class);
   private final IndexSegment _indexSegment;
   private final Selection _selection;
-  private final ProjectionPlanNode _projectionPlanNode;
+  private TransformPlanNode _transformPlanNode;
 
   public SelectionPlanNode(IndexSegment indexSegment, BrokerRequest brokerRequest) {
     _indexSegment = indexSegment;
     _selection = brokerRequest.getSelections();
-
     if (_selection.getSize() > 0) {
-      int maxDocPerNextCall = DocIdSetPlanNode.MAX_DOC_PER_CALL;
-
-      // No ordering required, select minimum number of documents
-      if (!_selection.isSetSelectionSortSequence()) {
-        maxDocPerNextCall = Math.min(_selection.getOffset() + _selection.getSize(), maxDocPerNextCall);
-      }
-
-      DocIdSetPlanNode docIdSetPlanNode = new DocIdSetPlanNode(_indexSegment, brokerRequest, maxDocPerNextCall);
-      _projectionPlanNode = new ProjectionPlanNode(_indexSegment,
-          SelectionOperatorUtils.extractSelectionRelatedColumns(_selection, indexSegment), docIdSetPlanNode);
+      _transformPlanNode = new TransformPlanNode(_indexSegment, brokerRequest);
     } else {
-      _projectionPlanNode = null;
+      _transformPlanNode = null;
     }
   }
 
   @Override
   public Operator run() {
-    if (_selection.getSize() > 0) {
-      if (_selection.isSetSelectionSortSequence()) {
-        return new SelectionOrderByOperator(_indexSegment, _selection, _projectionPlanNode.run());
-      } else {
-        return new SelectionOnlyOperator(_indexSegment, _selection, _projectionPlanNode.run());
-      }
-    } else {
+    if (_selection.getSize() <= 0) {
       return new EmptySelectionOperator(_indexSegment, _selection);
     }
+    return new SelectionOperator(_indexSegment, _selection, _transformPlanNode.run());
   }
 
   @Override
@@ -88,8 +72,8 @@ public class SelectionPlanNode implements PlanNode {
     LOGGER.debug(prefix + "Argument 0: IndexSegment - " + _indexSegment.getSegmentName());
     LOGGER.debug(prefix + "Argument 1: Selections - " + _selection);
     if (_selection.getSize() > 0) {
-      LOGGER.debug(prefix + "Argument 2: Projection -");
-      _projectionPlanNode.showTree(prefix + "    ");
+      LOGGER.debug(prefix + "Argument 2: Transform -");
+      _transformPlanNode.showTree(prefix + "    ");
     }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
index 8dac5be..c1e19d7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/TransformPlanNode.java
@@ -18,15 +18,24 @@
  */
 package org.apache.pinot.core.plan;
 
+import static org.apache.pinot.core.query.selection.SelectionOperatorUtils.getSelectionColumns;
+
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Set;
 import javax.annotation.Nonnull;
 import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.Selection;
+import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.request.transform.TransformExpressionTree;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.operator.transform.TransformOperator;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +50,8 @@ public class TransformPlanNode implements PlanNode {
   private final String _segmentName;
   private final ProjectionPlanNode _projectionPlanNode;
   private final Set<String> _projectionColumns = new HashSet<>();
-  private final Set<TransformExpressionTree> _expressionTrees = new HashSet<>();
+  private final Set<TransformExpressionTree> _expressionTrees = new LinkedHashSet<>();
+  private int _maxDocPerNextCall = DocIdSetPlanNode.MAX_DOC_PER_CALL;
 
   /**
    * Constructor for the class
@@ -51,17 +61,19 @@ public class TransformPlanNode implements PlanNode {
    */
   public TransformPlanNode(@Nonnull IndexSegment indexSegment, @Nonnull BrokerRequest brokerRequest) {
     _segmentName = indexSegment.getSegmentName();
-    extractColumnsAndTransforms(brokerRequest);
+    extractColumnsAndTransforms(brokerRequest, indexSegment);
     _projectionPlanNode =
-        new ProjectionPlanNode(indexSegment, _projectionColumns, new DocIdSetPlanNode(indexSegment, brokerRequest));
+        new ProjectionPlanNode(indexSegment, _projectionColumns, new DocIdSetPlanNode(indexSegment, brokerRequest, _maxDocPerNextCall));
   }
 
   /**
    * Helper method to extract projection columns and transform expressions from the given broker request.
    *
    * @param brokerRequest Broker request to process
+   * @param indexSegment
    */
-  private void extractColumnsAndTransforms(@Nonnull BrokerRequest brokerRequest) {
+  private void extractColumnsAndTransforms(@Nonnull BrokerRequest brokerRequest,
+      IndexSegment indexSegment) {
     if (brokerRequest.isSetAggregationsInfo()) {
       for (AggregationInfo aggregationInfo : brokerRequest.getAggregationsInfo()) {
         if (!aggregationInfo.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
@@ -81,15 +93,35 @@ public class TransformPlanNode implements PlanNode {
         }
       }
     } else {
-      throw new UnsupportedOperationException("Transforms not supported in selection queries.");
-      // TODO: Add transform support.
-      // projectionColumns.addAll(brokerRequest.getSelections().getSelectionColumns());
+      Selection selection = brokerRequest.getSelections();
+      // No ordering required, select minimum number of documents
+      if (!selection.isSetSelectionSortSequence()) {
+        _maxDocPerNextCall = Math.min(selection.getOffset() + selection.getSize(), _maxDocPerNextCall);
+      }
+      List<String> expressions = selection.getSelectionColumns();
+      if (expressions.size() == 1 && expressions.get(0).equals("*")) {
+        expressions = new LinkedList<>(indexSegment.getPhysicalColumnNames());
+        Collections.sort(expressions);
+      }
+      if (selection.getSelectionSortSequence() != null) {
+        for (SelectionSort selectionSort : selection.getSelectionSortSequence()) {
+          String expression = selectionSort.getColumn();
+          if(!expressions.contains(expression)) {
+            expressions.add(expression);
+          }
+        }
+      }
+      for (String expression : expressions) {
+        TransformExpressionTree transformExpressionTree = TransformExpressionTree.compileToExpressionTree(expression);
+        transformExpressionTree.getColumns(_projectionColumns);
+        _expressionTrees.add(transformExpressionTree);
+      }
     }
   }
 
   @Override
   public TransformOperator run() {
-    return new TransformOperator(_projectionPlanNode.run(), _expressionTrees);
+    return new TransformOperator(_projectionPlanNode.run(), new ArrayList<>(_expressionTrees));
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index bc62a20..56a20fe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -38,7 +38,7 @@ import org.apache.pinot.core.plan.MetadataBasedAggregationPlanNode;
 import org.apache.pinot.core.plan.Plan;
 import org.apache.pinot.core.plan.PlanNode;
 import org.apache.pinot.core.plan.SelectionPlanNode;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.config.QueryExecutorConfig;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutor.java
index b43d03e..c9d09b8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DefaultAggregationExecutor.java
@@ -24,7 +24,7 @@ import javax.annotation.Nonnull;
 import org.apache.pinot.common.request.transform.TransformExpressionTree;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 
 
 /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java
index 8f4600b..75e9bbe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index e1f1d69..23271c1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
 
 import com.google.common.base.Preconditions;
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.query.exception.BadQueryRequestException;
 
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index b731bf9..a7cfd25 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Locale;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.core.plan.AggregationFunctionInitializer;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
index 6aef5f0..5f43415 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
 import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
index 6959290..cd0286c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgMVAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountAggregationFunction.java
index c3ace67..4184be3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
index 50f3648..eecc0cb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/CountMVAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
index 607c6c3..60b70c6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.aggregation.function;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import javax.annotation.Nonnull;
 import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
index f35709c..ee04cf0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java
@@ -22,6 +22,7 @@ import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import javax.annotation.Nonnull;
 import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java
index f8a1d8c..1dabb6a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLMVAggregationFunction.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.aggregation.function;
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import javax.annotation.Nonnull;
 import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
index b951b2c..431cd39 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.aggregation.function;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import javax.annotation.Nonnull;
 import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java
index ce7cae2..f07b7ba 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java
index 1da478d..fac31f9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountRawHLLMVAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FastHLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FastHLLAggregationFunction.java
index c071d6f..1cf3940 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FastHLLAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FastHLLAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java
index 1060b8b..3fff0ed 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxMVAggregationFunction.java
index 655382a..48ab679 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxMVAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinAggregationFunction.java
index b1881c1..c01b2b8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMVAggregationFunction.java
index f753a04..3525f48 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMVAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
index 717817c..259b299 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
 import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
index b9fa7a3..391016c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MinMaxRangeMVAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileAggregationFunction.java
index 0e3b09a..cf51998 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileAggregationFunction.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.query.aggregation.function;
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
 import java.util.Arrays;
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
index 3c2c91b..18ab698 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
 import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstMVAggregationFunction.java
index 25bb816..572d9a6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileEstMVAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileMVAggregationFunction.java
index 74d7458..77fec93 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileMVAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
 
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
index 0d4db7f..a3db710 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestAggregationFunction.java
@@ -22,6 +22,7 @@ import com.tdunning.math.stats.TDigest;
 import java.nio.ByteBuffer;
 import javax.annotation.Nonnull;
 import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestMVAggregationFunction.java
index 9b7966e..2ee71ac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileTDigestMVAggregationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.query.aggregation.function;
 
 import com.tdunning.math.stats.TDigest;
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumAggregationFunction.java
index 1071022..fed2722 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
index e0035f8..e2f423b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/SumMVAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import javax.annotation.Nonnull;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
index 6ccb599..70e3806 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java
@@ -29,7 +29,7 @@ import org.apache.pinot.core.operator.transform.TransformResultMetadata;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 
 
 /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index 5c4bb95..083db4d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.request;
 
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
@@ -31,7 +32,7 @@ import org.apache.pinot.common.request.Selection;
 import org.apache.pinot.common.request.transform.TransformExpressionTree;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.TimerContext;
 
@@ -62,6 +63,7 @@ public class ServerQueryRequest {
   private final Set<TransformExpressionTree> _groupByExpressions;
   private final Set<String> _groupByColumns;
   private final Set<String> _selectionColumns;
+  private final Set<TransformExpressionTree> _selectionExpressions;
 
   // Query processing context
   private volatile int _segmentCountAfterPruning = -1;
@@ -121,10 +123,17 @@ public class ServerQueryRequest {
     // Selection
     Selection selection = _brokerRequest.getSelections();
     if (selection != null) {
-      _selectionColumns = RequestUtils.extractSelectionColumns(selection);
+      _selectionExpressions = new LinkedHashSet<>();
+      Set<String> selectionColumns = RequestUtils.extractSelectionColumns(selection);
+      for (String expression : selectionColumns) {
+        _selectionExpressions.add(TransformExpressionTree.compileToExpressionTree(expression));
+      }
+      _selectionColumns = RequestUtils.extractColumnsFromExpressions(_selectionExpressions);
       _allColumns.addAll(_selectionColumns);
+
     } else {
       _selectionColumns = null;
+      _selectionExpressions = null;
     }
   }
 
@@ -194,4 +203,9 @@ public class ServerQueryRequest {
   public Set<String> getSelectionColumns() {
     return _selectionColumns;
   }
+
+  @Nullable
+  public Set<TransformExpressionTree> getSelectionExpressions() {
+    return _selectionExpressions;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 783cdc5..a24f441 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -76,7 +76,7 @@ public class SelectionOperatorUtils {
   private static final DecimalFormatSymbols DECIMAL_FORMAT_SYMBOLS = DecimalFormatSymbols.getInstance(Locale.US);
 
   /**
-   * Expand <code>'SELECT *'</code> to select all columns with {@link IndexSegment}, order all columns alphabatically.
+   * Expand <code>'SELECT *'</code> to select all columns with {@link IndexSegment}, order all columns alphabetically.
    * (Inner segment)
    *
    * @param selectionColumns unexpanded selection columns (may contain '*').
@@ -88,8 +88,6 @@ public class SelectionOperatorUtils {
       @Nonnull IndexSegment indexSegment) {
     if (selectionColumns.size() == 1 && selectionColumns.get(0).equals("*")) {
       List<String> allColumns = new LinkedList<>(indexSegment.getPhysicalColumnNames());
-      Set<String> columnNames = indexSegment.getPhysicalColumnNames();
-
       Collections.sort(allColumns);
       return allColumns;
     } else {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeAggregationExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeAggregationExecutor.java
index e170a43..75146ef 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeAggregationExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeAggregationExecutor.java
@@ -24,7 +24,7 @@ import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.DefaultAggregationExecutor;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.startree.StarTreeUtils;
 import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeGroupByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeGroupByExecutor.java
index 042c37a..af0e83b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeGroupByExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeGroupByExecutor.java
@@ -25,7 +25,7 @@ import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.startree.StarTreeUtils;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
index 10c2276..4463dcd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.startree.plan;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
@@ -65,7 +66,7 @@ public class StarTreeTransformPlanNode implements PlanNode {
 
   @Override
   public TransformOperator run() {
-    return new TransformOperator(_starTreeProjectionPlanNode.run(), _groupByExpressions);
+    return new TransformOperator(_starTreeProjectionPlanNode.run(), new ArrayList<>(_groupByExpressions));
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPair.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPair.java
index 75a7b6c..8056187 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPair.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPair.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.startree.v2;
 
 import javax.annotation.Nonnull;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 
 
 public class AggregationFunctionColumnPair {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/builder/BaseSingleTreeBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/builder/BaseSingleTreeBuilder.java
index 596f550..9c7cb0d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/builder/BaseSingleTreeBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/builder/BaseSingleTreeBuilder.java
@@ -37,7 +37,7 @@ import org.apache.pinot.core.data.aggregator.ValueAggregatorFactory;
 import org.apache.pinot.core.data.readers.PinotSegmentColumnReader;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.io.compression.ChunkCompressorFactory.CompressionType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.segment.creator.SingleValueRawIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeLoaderUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeLoaderUtils.java
index 074eea5..337fdd6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeLoaderUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeLoaderUtils.java
@@ -31,7 +31,7 @@ import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.segment.StarTreeMetadata;
 import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.data.aggregator.ValueAggregatorFactory;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.segment.index.ColumnMetadata;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 import org.apache.pinot.core.segment.index.column.ColumnIndexContainer;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index 7310601..7b4456a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionTypeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionTypeTest.java
index 0f34566..fd4f338 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionTypeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionTypeTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPairTest.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPairTest.java
index 0264b3b..793cb30 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPairTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/AggregationFunctionColumnPairTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.core.startree.v2;
 
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
index fb71bb1..372fb19 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
@@ -51,7 +51,7 @@ import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.plan.FilterPlanNode;
 import org.apache.pinot.core.plan.PlanNode;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
index 2bdd287..f8f8b9e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
@@ -19,14 +19,15 @@
 package org.apache.pinot.queries;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.ExecutionStatistics;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.query.EmptySelectionOperator;
-import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
-import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -49,11 +50,13 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
     DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
     Assert.assertEquals(selectionDataSchema.size(), 10);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnName(5), "column6");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(5), DataSchema.ColumnDataType.INT_ARRAY);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
     Assert.assertTrue(resultsBlock.getSelectionResult().isEmpty());
 
     // Test query with filter
@@ -65,11 +68,12 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
     selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
     Assert.assertEquals(selectionDataSchema.size(), 10);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnName(5), "column6");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(5), DataSchema.ColumnDataType.INT_ARRAY);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
     Assert.assertTrue(resultsBlock.getSelectionResult().isEmpty());
   }
 
@@ -78,7 +82,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     String query = "SELECT * FROM testTable";
 
     // Test query without filter
-    SelectionOnlyOperator selectionOnlyOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -86,17 +90,19 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 100L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
     DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
     Assert.assertEquals(selectionDataSchema.size(), 10);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnName(5), "column6");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(5), DataSchema.ColumnDataType.INT_ARRAY);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
     List<Serializable[]> selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     Serializable[] firstRow = selectionResult.get(0);
     Assert.assertEquals(firstRow.length, 10);
-    Assert.assertEquals(((Integer) firstRow[0]).intValue(), 890282370);
-    Assert.assertEquals(firstRow[5], new int[]{2147483647});
+    Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 890282370);
+    Assert.assertEquals(firstRow[columnIndexMap.get("column6")], new int[]{2147483647});
 
     // Test query with filter
     selectionOnlyOperator = getOperatorForQueryWithFilter(query);
@@ -107,17 +113,19 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 100L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
     selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
     Assert.assertEquals(selectionDataSchema.size(), 10);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnName(5), "column6");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(5), DataSchema.ColumnDataType.INT_ARRAY);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
     selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     firstRow = selectionResult.get(0);
     Assert.assertEquals(firstRow.length, 10);
-    Assert.assertEquals(((Integer) firstRow[0]).intValue(), 890282370);
-    Assert.assertEquals(firstRow[5], new int[]{2147483647});
+    Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 890282370);
+    Assert.assertEquals(firstRow[columnIndexMap.get("column6")], new int[]{2147483647});
   }
 
   @Test
@@ -125,7 +133,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     String query = "SELECT" + SELECTION + " FROM testTable";
 
     // Test query without filter
-    SelectionOnlyOperator selectionOnlyOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -133,17 +141,19 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
     DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
     Assert.assertEquals(selectionDataSchema.size(), 3);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnName(2), "column6");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(2), DataSchema.ColumnDataType.INT_ARRAY);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
     List<Serializable[]> selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     Serializable[] firstRow = selectionResult.get(0);
     Assert.assertEquals(firstRow.length, 3);
-    Assert.assertEquals(((Integer) firstRow[0]).intValue(), 890282370);
-    Assert.assertEquals(firstRow[2], new int[]{2147483647});
+    Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 890282370);
+    Assert.assertEquals(firstRow[columnIndexMap.get("column6")], new int[]{2147483647});
 
     // Test query with filter
     selectionOnlyOperator = getOperatorForQueryWithFilter(query);
@@ -154,17 +164,19 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
     selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
     Assert.assertEquals(selectionDataSchema.size(), 3);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnName(2), "column6");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(2), DataSchema.ColumnDataType.INT_ARRAY);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
     selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     firstRow = selectionResult.get(0);
     Assert.assertEquals(firstRow.length, 3);
-    Assert.assertEquals(((Integer) firstRow[0]).intValue(), 890282370);
-    Assert.assertEquals(firstRow[2], new int[]{2147483647});
+    Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 890282370);
+    Assert.assertEquals(firstRow[columnIndexMap.get("column6")], new int[]{2147483647});
   }
 
   @Test
@@ -172,7 +184,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     String query = "SELECT" + SELECTION + " FROM testTable" + ORDER_BY;
 
     // Test query without filter
-    SelectionOrderByOperator selectionOrderByOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 100000L);
@@ -180,17 +192,19 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 400000L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
     DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
     Assert.assertEquals(selectionDataSchema.size(), 4);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column5");
-    Assert.assertEquals(selectionDataSchema.getColumnName(3), "column6");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.STRING);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(3), DataSchema.ColumnDataType.INT_ARRAY);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
     Queue<Serializable[]> selectionResult = (Queue<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     Serializable[] lastRow = selectionResult.peek();
     Assert.assertEquals(lastRow.length, 4);
-    Assert.assertEquals((String) lastRow[0], "AKXcXcIqsqOJFsdwxZ");
-    Assert.assertEquals(lastRow[3], new int[]{1252});
+    Assert.assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ");
+    Assert.assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{1252});
 
     // Test query with filter
     selectionOrderByOperator = getOperatorForQueryWithFilter(query);
@@ -201,16 +215,27 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 62480L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
     selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
     Assert.assertEquals(selectionDataSchema.size(), 4);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column5");
-    Assert.assertEquals(selectionDataSchema.getColumnName(3), "column6");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.STRING);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(3), DataSchema.ColumnDataType.INT_ARRAY);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT_ARRAY);
     selectionResult = (Queue<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     lastRow = selectionResult.peek();
     Assert.assertEquals(lastRow.length, 4);
-    Assert.assertEquals((String) lastRow[0], "AKXcXcIqsqOJFsdwxZ");
-    Assert.assertEquals(lastRow[3], new int[]{2147483647});
+    Assert.assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ");
+    Assert.assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{2147483647});
+  }
+
+  private Map<String, Integer> computeColumnNameToIndexMap(DataSchema dataSchema) {
+    Map<String, Integer> columnIndexMap = new HashMap<>();
+
+    for (int i = 0; i < dataSchema.size(); i++) {
+      columnIndexMap.put(dataSchema.getColumnName(i), i);
+    }
+    return columnIndexMap;
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index 835975e..a731f72 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -19,14 +19,15 @@
 package org.apache.pinot.queries;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.ExecutionStatistics;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.query.EmptySelectionOperator;
-import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
-import org.apache.pinot.core.operator.query.SelectionOrderByOperator;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -49,11 +50,12 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
     DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
     Assert.assertEquals(selectionDataSchema.size(), 11);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnName(1), "column11");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.STRING);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column11"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column11")), DataSchema.ColumnDataType.STRING);
     Assert.assertTrue(resultsBlock.getSelectionResult().isEmpty());
 
     // Test query with filter
@@ -65,11 +67,13 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
     selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
     Assert.assertEquals(selectionDataSchema.size(), 11);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnName(1), "column11");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.STRING);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column11"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column11")), DataSchema.ColumnDataType.STRING);
     Assert.assertTrue(resultsBlock.getSelectionResult().isEmpty());
   }
 
@@ -78,7 +82,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     String query = "SELECT * FROM testTable";
 
     // Test query without filter
-    SelectionOnlyOperator selectionOnlyOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -86,18 +90,19 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 110L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
     DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
     Assert.assertEquals(selectionDataSchema.size(), 11);
     Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnName(1), "column11");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.STRING);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column11"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column11")), DataSchema.ColumnDataType.STRING);
     List<Serializable[]> selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     Serializable[] firstRow = selectionResult.get(0);
     Assert.assertEquals(firstRow.length, 11);
-    Assert.assertEquals(((Integer) firstRow[0]).intValue(), 1578964907);
-    Assert.assertEquals((String) firstRow[1], "P");
+    Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 1578964907);
+    Assert.assertEquals((String) firstRow[columnIndexMap.get("column11")], "P");
 
     // Test query with filter
     selectionOnlyOperator = getOperatorForQueryWithFilter(query);
@@ -108,18 +113,19 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 110L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
     selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
     Assert.assertEquals(selectionDataSchema.size(), 11);
     Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnName(1), "column11");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.STRING);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column11"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column11")), DataSchema.ColumnDataType.STRING);
     selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     firstRow = selectionResult.get(0);
     Assert.assertEquals(firstRow.length, 11);
-    Assert.assertEquals(((Integer) firstRow[0]).intValue(), 351823652);
-    Assert.assertEquals((String) firstRow[1], "t");
+    Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 351823652);
+    Assert.assertEquals((String) firstRow[columnIndexMap.get("column11")], "t");
   }
 
   @Test
@@ -127,7 +133,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     String query = "SELECT" + SELECTION + " FROM testTable";
 
     // Test query without filter
-    SelectionOnlyOperator selectionOnlyOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -135,11 +141,13 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
     DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
     Assert.assertEquals(selectionDataSchema.size(), 3);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnName(2), "column11");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.STRING);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column11"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column11")), DataSchema.ColumnDataType.STRING);
     List<Serializable[]> selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     Serializable[] firstRow = selectionResult.get(0);
@@ -156,17 +164,18 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
     selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
     Assert.assertEquals(selectionDataSchema.size(), 3);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnName(2), "column11");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.STRING);
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertTrue(columnIndexMap.containsKey("column11"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column11")), DataSchema.ColumnDataType.STRING);
     selectionResult = (List<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     firstRow = selectionResult.get(0);
     Assert.assertEquals(firstRow.length, 3);
-    Assert.assertEquals(((Integer) firstRow[0]).intValue(), 351823652);
-    Assert.assertEquals((String) firstRow[2], "t");
+    Assert.assertEquals(((Integer) firstRow[columnIndexMap.get("column1")]).intValue(), 351823652);
+    Assert.assertEquals((String) firstRow[columnIndexMap.get("column11")], "t");
   }
 
   @Test
@@ -174,7 +183,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     String query = "SELECT" + SELECTION + " FROM testTable" + ORDER_BY;
 
     // Test query without filter
-    SelectionOrderByOperator selectionOrderByOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
@@ -182,17 +191,19 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 120000L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
     DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
     Assert.assertEquals(selectionDataSchema.size(), 4);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column6");
-    Assert.assertEquals(selectionDataSchema.getColumnName(1), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.INT);
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
     Queue<Serializable[]> selectionResult = (Queue<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     Serializable[] lastRow = selectionResult.peek();
     Assert.assertEquals(lastRow.length, 4);
-    Assert.assertEquals(((Integer) lastRow[0]).intValue(), 6043515);
-    Assert.assertEquals(((Integer) lastRow[1]).intValue(), 10542595);
+    Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
+    Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 10542595);
 
     // Test query with filter
     selectionOrderByOperator = getOperatorForQueryWithFilter(query);
@@ -203,17 +214,19 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 24516L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
     selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
     Assert.assertEquals(selectionDataSchema.size(), 4);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column6");
-    Assert.assertEquals(selectionDataSchema.getColumnName(1), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.INT);
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
     selectionResult = (Queue<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     lastRow = selectionResult.peek();
     Assert.assertEquals(lastRow.length, 4);
-    Assert.assertEquals(((Integer) lastRow[0]).intValue(), 6043515);
-    Assert.assertEquals(((Integer) lastRow[1]).intValue(), 462769197);
+    Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
+    Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 462769197);
   }
 
   @Test
@@ -221,7 +234,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     String query = "SELECT * " + " FROM testTable" + ORDER_BY;
 
     // Test query without filter
-    SelectionOrderByOperator selectionOrderByOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
@@ -229,18 +242,20 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 330000L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
     DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
     Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
     Assert.assertEquals(selectionDataSchema.size(), 11);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column6");
-    Assert.assertEquals(selectionDataSchema.getColumnName(1), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.INT);
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
     Queue<Serializable[]> selectionResult = (Queue<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     Serializable[] lastRow = selectionResult.peek();
     Assert.assertEquals(lastRow.length, 11);
-    Assert.assertEquals(((Integer) lastRow[0]).intValue(), 6043515);
-    Assert.assertEquals(((Integer) lastRow[1]).intValue(), 10542595);
+    Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
+    Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 10542595);
 
     // Test query with filter
     selectionOrderByOperator = getOperatorForQueryWithFilter(query);
@@ -251,18 +266,20 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 67419);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
     selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
+
     Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
     Assert.assertEquals(selectionDataSchema.size(), 11);
-    Assert.assertEquals(selectionDataSchema.getColumnName(0), "column6");
-    Assert.assertEquals(selectionDataSchema.getColumnName(1), "column1");
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
-    Assert.assertEquals(selectionDataSchema.getColumnDataType(1), DataSchema.ColumnDataType.INT);
+    Assert.assertTrue(columnIndexMap.containsKey("column6"));
+    Assert.assertTrue(columnIndexMap.containsKey("column1"));
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
     selectionResult = (Queue<Serializable[]>) resultsBlock.getSelectionResult();
     Assert.assertEquals(selectionResult.size(), 10);
     lastRow = selectionResult.peek();
     Assert.assertEquals(lastRow.length, 11);
-    Assert.assertEquals(((Integer) lastRow[0]).intValue(), 6043515);
-    Assert.assertEquals(((Integer) lastRow[1]).intValue(), 462769197);
+    Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
+    Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 462769197);
   }
 
   private int getVirtualColumns(DataSchema selectionDataSchema) {
@@ -274,4 +291,13 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     }
     return virtualCols;
   }
+
+  private Map<String, Integer> computeColumnNameToIndexMap(DataSchema dataSchema) {
+    Map<String, Integer> columnIndexMap = new HashMap<>();
+
+    for (int i = 0; i < dataSchema.size(); i++) {
+      columnIndexMap.put(dataSchema.getColumnName(i), i);
+    }
+    return columnIndexMap;
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/DefaultAggregationExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/DefaultAggregationExecutorTest.java
index 62402aa..2d8e0ea 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/DefaultAggregationExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/DefaultAggregationExecutorTest.java
@@ -138,7 +138,7 @@ public class DefaultAggregationExecutorTest {
     MatchAllFilterOperator matchAllFilterOperator = new MatchAllFilterOperator(totalRawDocs);
     DocIdSetOperator docIdSetOperator = new DocIdSetOperator(matchAllFilterOperator, DocIdSetPlanNode.MAX_DOC_PER_CALL);
     ProjectionOperator projectionOperator = new ProjectionOperator(dataSourceMap, docIdSetOperator);
-    TransformOperator transformOperator = new TransformOperator(projectionOperator, expressionTrees);
+    TransformOperator transformOperator = new TransformOperator(projectionOperator, new ArrayList<>(expressionTrees));
     TransformBlock transformBlock = transformOperator.nextBlock();
     int numAggFuncs = _aggregationInfoList.size();
     AggregationFunctionContext[] aggrFuncContextArray = new AggregationFunctionContext[numAggFuncs];
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index ac6a445..8d9d144 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.integration.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
@@ -40,7 +41,9 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.plan.SelectionPlanNode;
 import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -135,11 +138,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
   private void registerCallbackHandlers() {
     List<String> instances = _helixAdmin.getInstancesInCluster(_clusterName);
-    instances.removeIf(instance -> (!instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) && !instance.startsWith(
-        CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)));
+    instances.removeIf(instance -> (!instance.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) && !instance
+        .startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)));
     List<String> resourcesInCluster = _helixAdmin.getResourcesInCluster(_clusterName);
-    resourcesInCluster.removeIf(resource -> (!TableNameBuilder.isTableResource(resource)
-        && !CommonConstants.Helix.BROKER_RESOURCE_INSTANCE.equals(resource)));
+    resourcesInCluster.removeIf(
+        resource -> (!TableNameBuilder.isTableResource(resource) && !CommonConstants.Helix.BROKER_RESOURCE_INSTANCE
+            .equals(resource)));
     for (String instance : instances) {
       List<String> resourcesToMonitor = new ArrayList<>();
       for (String resourceName : resourcesInCluster) {
@@ -151,11 +155,11 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
           }
         }
       }
-      _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of(
-          new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _clusterName, instance,
-              resourcesToMonitor, 100.0),
-          new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName, instance,
-              resourcesToMonitor, 100.0))));
+      _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
+          .of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, _clusterName,
+                  instance, resourcesToMonitor, 100.0),
+              new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, _clusterName,
+                  instance, resourcesToMonitor, 100.0))));
     }
   }
 
@@ -459,7 +463,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testUDF()
+  public void testGroupByUDF()
       throws Exception {
     String pqlQuery = "SELECT COUNT(*) FROM mytable GROUP BY timeConvert(DaysSinceEpoch,'DAYS','SECONDS')";
     JsonNode response = postQuery(pqlQuery);
@@ -532,6 +536,68 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     assertEquals(aggregationResult.get("value").asDouble(), 16071.0 / 2);
   }
 
+  @Test
+  public void testAggregationUDF()
+      throws Exception {
+
+    String pqlQuery = "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable";
+    JsonNode response = postQuery(pqlQuery);
+    JsonNode aggregationResult = response.get("aggregationResults").get(0);
+    assertEquals(aggregationResult.get("function").asText(), "max_timeconvert(DaysSinceEpoch,'DAYS','SECONDS')");
+    assertEquals(aggregationResult.get("value").asDouble(), 16435.0 * 24 * 3600);
+
+    pqlQuery = "SELECT MIN(div(DaysSinceEpoch,2)) FROM mytable";
+    response = postQuery(pqlQuery);
+    aggregationResult = response.get("aggregationResults").get(0);
+    assertEquals(aggregationResult.get("function").asText(), "min_div(DaysSinceEpoch,'2')");
+    assertEquals(aggregationResult.get("value").asDouble(), 16071.0 / 2);
+  }
+
+  @Test
+  public void testSelectionUDF()
+      throws Exception {
+    String pqlQuery = "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable";
+    JsonNode response = postQuery(pqlQuery);
+    ArrayNode selectionResults = (ArrayNode) response.get("selectionResults").get("results");
+    Assert.assertNotNull(selectionResults);
+    Assert.assertTrue(selectionResults.size() > 0);
+    for (int i = 0; i < selectionResults.size(); i++) {
+      long daysSinceEpoch = selectionResults.get(i).get(0).asLong();
+      long secondsSinceEpoch = selectionResults.get(i).get(1).asLong();
+      Assert.assertEquals(daysSinceEpoch * 24 * 60 * 60, secondsSinceEpoch);
+    }
+
+    pqlQuery =
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000";
+    response = postQuery(pqlQuery);
+    selectionResults = (ArrayNode) response.get("selectionResults").get("results");
+    Assert.assertNotNull(selectionResults);
+    Assert.assertTrue(selectionResults.size() > 0);
+    long prevValue = -1;
+    for (int i = 0; i < selectionResults.size(); i++) {
+      long daysSinceEpoch = selectionResults.get(i).get(0).asLong();
+      long secondsSinceEpoch = selectionResults.get(i).get(1).asLong();
+      Assert.assertEquals(daysSinceEpoch * 24 * 60 * 60, secondsSinceEpoch);
+      Assert.assertTrue(daysSinceEpoch >= prevValue);
+      prevValue = daysSinceEpoch;
+    }
+
+    pqlQuery =
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000";
+    response = postQuery(pqlQuery);
+    selectionResults = (ArrayNode) response.get("selectionResults").get("results");
+    Assert.assertNotNull(selectionResults);
+    Assert.assertTrue(selectionResults.size() > 0);
+    prevValue = Long.MAX_VALUE;
+    for (int i = 0; i < selectionResults.size(); i++) {
+      long daysSinceEpoch = selectionResults.get(i).get(0).asLong();
+      long secondsSinceEpoch = selectionResults.get(i).get(1).asLong();
+      Assert.assertEquals(daysSinceEpoch * 24 * 60 * 60, secondsSinceEpoch);
+      Assert.assertTrue(secondsSinceEpoch <= prevValue);
+      prevValue = secondsSinceEpoch;
+    }
+  }
+
   @AfterClass
   public void tearDown()
       throws Exception {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeV2ClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeV2ClusterIntegrationTest.java
index 2a13eb6..5485dc3 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeV2ClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeV2ClusterIntegrationTest.java
@@ -29,7 +29,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionType;
+import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
 import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig;
 import org.apache.pinot.tools.query.comparison.SegmentInfoProvider;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org