You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/07/10 03:44:28 UTC

[incubator-pinot] branch master updated: Rewrite non-aggregate group by query to distinct query (#5671)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b327509  Rewrite non-aggregate group by query to distinct query (#5671)
b327509 is described below

commit b32750951f0fb24923a6280f066a6cd037e1f0cb
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Thu Jul 9 20:44:13 2020 -0700

    Rewrite non-aggregate group by query to distinct query (#5671)
    
    * Rewrite non-aggregate group by query to distinct query
    
    * Adding tests to DistinctQueriesTest
    
    * Update DistinctQueriesTest.java
---
 .../apache/pinot/sql/parsers/CalciteSqlParser.java |  74 +++++++-
 .../pinot/sql/parsers/CalciteSqlCompilerTest.java  |  69 +++++++
 .../org/apache/pinot/queries/BaseQueriesTest.java  |  25 ++-
 .../apache/pinot/queries/DistinctQueriesTest.java  | 208 ++++++++++++++++-----
 .../apache/pinot/queries/FastHllQueriesTest.java   |   6 +-
 ...nerSegmentAggregationMultiValueQueriesTest.java |  24 +--
 ...erSegmentAggregationSingleValueQueriesTest.java |  24 +--
 ...InnerSegmentSelectionMultiValueQueriesTest.java |  16 +-
 ...nnerSegmentSelectionSingleValueQueriesTest.java |  24 +--
 .../queries/PercentileTDigestQueriesTest.java      |   4 +-
 .../RangePredicateWithSortedInvertedIndexTest.java |   2 +-
 .../pinot/queries/SerializedBytesQueriesTest.java  |   6 +-
 .../pinot/queries/TextSearchQueriesTest.java       |   4 +-
 .../apache/pinot/queries/TransformQueriesTest.java |   4 +-
 .../tests/ClusterIntegrationTestUtils.java         |  18 +-
 .../tests/OfflineClusterIntegrationTest.java       |  41 ++++
 16 files changed, 437 insertions(+), 112 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
index de14a51..b56698b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
+++ b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.sql.parsers;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -196,13 +198,25 @@ public class CalciteSqlParser {
     return false;
   }
 
-  public static Set<String> extractIdentifiers(List<Expression> expressions) {
+  /**
+   * Extract all the identifiers from given expressions.
+   *
+   * @param expressions
+   * @param excludeAs if true, ignores the right side identifier for AS function.
+   * @return all the identifier names.
+   */
+  public static Set<String> extractIdentifiers(List<Expression> expressions, boolean excludeAs) {
     Set<String> identifiers = new HashSet<>();
     for (Expression expression : expressions) {
       if (expression.getIdentifier() != null) {
         identifiers.add(expression.getIdentifier().getName());
       } else if (expression.getFunctionCall() != null) {
-        identifiers.addAll(extractIdentifiers(expression.getFunctionCall().getOperands()));
+        if (excludeAs && expression.getFunctionCall().getOperator().equalsIgnoreCase("AS")) {
+          identifiers.addAll(extractIdentifiers(Arrays.asList(expression.getFunctionCall().getOperands().get(0)), true));
+          continue;
+        } else {
+          identifiers.addAll(extractIdentifiers(expression.getFunctionCall().getOperands(), excludeAs));
+        }
       }
     }
     return identifiers;
@@ -331,12 +345,68 @@ public class CalciteSqlParser {
       pinotQuery.setFilterExpression(updatedFilterExpression);
     }
 
+    // Rewrite GroupBy to Distinct
+    rewriteNonAggregationGroupByToDistinct(pinotQuery);
+
     // Update alias
     Map<Identifier, Expression> aliasMap = extractAlias(pinotQuery.getSelectList());
     applyAlias(aliasMap, pinotQuery);
     validate(aliasMap, pinotQuery);
   }
 
+  /**
+   * Rewrite non-aggregate group by query to distinct query.
+   * E.g.
+   * ```
+   *   SELECT col1+col2*5 FROM foo GROUP BY col1, col2 => SELECT distinct col1+col2*5 FROM foo
+   *   SELECT col1, col2 FROM foo GROUP BY col1, col2 => SELECT distinct col1, col2 FROM foo
+   * ```
+   * @param pinotQuery
+   */
+  private static void rewriteNonAggregationGroupByToDistinct(PinotQuery pinotQuery) {
+    boolean hasAggregation = false;
+    for (Expression select : pinotQuery.getSelectList()) {
+      if (isAggregateExpression(select)) {
+        hasAggregation = true;
+      }
+    }
+    if (pinotQuery.getOrderByList() != null) {
+      for (Expression orderBy : pinotQuery.getOrderByList()) {
+        if (isAggregateExpression(orderBy)) {
+          hasAggregation = true;
+        }
+      }
+    }
+    if (!hasAggregation && pinotQuery.getGroupByListSize() > 0) {
+      Set<String> selectIdentifiers = extractIdentifiers(pinotQuery.getSelectList(), true);
+      Set<String> groupByIdentifiers = extractIdentifiers(pinotQuery.getGroupByList(), true);
+      if (groupByIdentifiers.containsAll(selectIdentifiers)) {
+        Expression distinctExpression = RequestUtils.getFunctionExpression("DISTINCT");
+        for (Expression select : pinotQuery.getSelectList()) {
+          if (isAsFunction(select)) {
+            Function asFunc = select.getFunctionCall();
+            distinctExpression.getFunctionCall().addToOperands(asFunc.getOperands().get(0));
+          } else {
+            distinctExpression.getFunctionCall().addToOperands(select);
+          }
+        }
+        pinotQuery.setSelectList(Arrays.asList(distinctExpression));
+        pinotQuery.setGroupByList(Collections.emptyList());
+      } else {
+        selectIdentifiers.removeAll(groupByIdentifiers);
+        throw new SqlCompilationException(String.format("For non-aggregation group by query, all the identifiers in select clause should be in groupBys. Found identifier: %s",
+            Arrays.toString(selectIdentifiers.toArray(new String[0]))));
+      }
+    }
+  }
+
+  private static boolean isAsFunction(Expression expression) {
+    if (expression.getFunctionCall() != null && expression.getFunctionCall().getOperator().equalsIgnoreCase("AS")) {
+      return true;
+    }
+    return false;
+  }
+
   private static void invokeCompileTimeFunctions(PinotQuery pinotQuery) {
     for (int i = 0; i < pinotQuery.getSelectListSize(); i++) {
       Expression expression = invokeCompileTimeFunctionExpression(pinotQuery.getSelectList().get(i));
diff --git a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
index 826741c..810e384 100644
--- a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
@@ -1724,4 +1724,73 @@ public class CalciteSqlCompilerTest {
     Assert.assertEquals(brokerRequest.getFilterQuery().getOperator(), FilterOperator.IS_NULL);
     Assert.assertEquals(brokerRequest.getFilterQuery().getColumn(), "col");
   }
+
+  @Test
+  public void testNonAggregationGroupByQuery() {
+    PinotQuery2BrokerRequestConverter converter = new PinotQuery2BrokerRequestConverter();
+    String query = "SELECT col1 FROM foo GROUP BY col1";
+    PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    BrokerRequest brokerRequest = converter.convert(pinotQuery);
+    Assert.assertEquals(pinotQuery.getSelectListSize(), 1);
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperator().toUpperCase(), "DISTINCT");
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getIdentifier().getName(), "col1");
+
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().size(), 1);
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationType().toUpperCase(), "DISTINCT");
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationParams().get("column"), "col1");
+
+    query = "SELECT col1, col2 FROM foo GROUP BY col1, col2";
+    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    brokerRequest = converter.convert(pinotQuery);
+    Assert.assertEquals(pinotQuery.getSelectListSize(), 1);
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperator().toUpperCase(), "DISTINCT");
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getIdentifier().getName(), "col1");
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(1).getIdentifier().getName(), "col2");
+
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().size(), 1);
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationType().toUpperCase(), "DISTINCT");
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationParams().get("column"), "col1:col2");
+
+    query = "SELECT col1+col2*5 FROM foo GROUP BY col1, col2";
+    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    brokerRequest = converter.convert(pinotQuery);
+    Assert.assertEquals(pinotQuery.getSelectListSize(), 1);
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperator().toUpperCase(), "DISTINCT");
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperator(), "PLUS");
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(0).getIdentifier().getName(), "col1");
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(1).getFunctionCall().getOperator(), "TIMES");
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(1).getFunctionCall().getOperands().get(0).getIdentifier().getName(), "col2");
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(1).getFunctionCall().getOperands().get(1).getLiteral().getLongValue(), 5L);
+
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().size(), 1);
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationType().toUpperCase(), "DISTINCT");
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationParams().get("column"), "plus(col1,times(col2,'5'))");
+
+    query = "SELECT col1+col2*5 AS col3 FROM foo GROUP BY col1, col2";
+    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    brokerRequest = converter.convert(pinotQuery);
+    Assert.assertEquals(pinotQuery.getSelectListSize(), 1);
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperator().toUpperCase(), "DISTINCT");
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperator(), "PLUS");
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(0).getIdentifier().getName(), "col1");
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(1).getFunctionCall().getOperator(), "TIMES");
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(1).getFunctionCall().getOperands().get(0).getIdentifier().getName(), "col2");
+    Assert.assertEquals(pinotQuery.getSelectList().get(0).getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(1).getFunctionCall().getOperands().get(1).getLiteral().getLongValue(), 5L);
+
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().size(), 1);
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationType().toUpperCase(), "DISTINCT");
+    Assert.assertEquals(brokerRequest.getAggregationsInfo().get(0).getAggregationParams().get("column"), "plus(col1,times(col2,'5'))");
+  }
+
+  @Test(expectedExceptions = SqlCompilationException.class)
+  public void testInvalidNonAggregationGroupBy() {
+    // Not support Aggregation functions in case statements.
+    try {
+      CalciteSqlParser.compileToPinotQuery("SELECT col1+col2 FROM foo GROUP BY col1");
+    } catch (SqlCompilationException e) {
+      Assert.assertEquals(e.getMessage(),
+          "For non-aggregation group by query, all the identifiers in select clause should be in groupBys. Found identifier: [col2]");
+      throw e;
+    }
+  }
 }
\ No newline at end of file
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index 51ad264..619302c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -64,7 +64,7 @@ public abstract class BaseQueriesTest {
    * <p>Use this to test a single operator.
    */
   @SuppressWarnings({"rawtypes", "unchecked"})
-  protected <T extends Operator> T getOperatorForQuery(String pqlQuery) {
+  protected <T extends Operator> T getOperatorForPqlQuery(String pqlQuery) {
     QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromPQL(pqlQuery);
     return (T) PLAN_MAKER.makeSegmentPlanNode(getIndexSegment(), queryContext).run();
   }
@@ -74,8 +74,27 @@ public abstract class BaseQueriesTest {
    * <p>Use this to test a single operator.
    */
   @SuppressWarnings("rawtypes")
-  protected <T extends Operator> T getOperatorForQueryWithFilter(String pqlQuery) {
-    return getOperatorForQuery(pqlQuery + getFilter());
+  protected <T extends Operator> T getOperatorForPqlQueryWithFilter(String pqlQuery) {
+    return getOperatorForPqlQuery(pqlQuery + getFilter());
+  }
+
+  /**
+   * Run SQL query on single index segment.
+   * <p>Use this to test a single operator.
+   */
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  protected <T extends Operator> T getOperatorForSqlQuery(String sqlQuery) {
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromSQL(sqlQuery);
+    return (T) PLAN_MAKER.makeSegmentPlanNode(getIndexSegment(), queryContext).run();
+  }
+
+  /**
+   * Run SQL query with hard-coded filter on single index segment.
+   * <p>Use this to test a single operator.
+   */
+  @SuppressWarnings("rawtypes")
+  protected <T extends Operator> T getOperatorForSqlQueryWithFilter(String sqlQuery) {
+    return getOperatorForSqlQuery(sqlQuery + getFilter());
   }
 
   /**
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index d0be665..19afd73 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -179,18 +179,15 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    *   <li>Selecting some columns with filter that does not match any record</li>
    * </ul>
    */
-  @Test
-  public void testDistinctInnerSegment()
+  private void testDistinctInnerSegmentHelper(String[] queries, boolean isPql)
       throws Exception {
     _indexSegment = createSegment(0, generateRecords(0));
     try {
       {
         // Test selecting all columns
-        String query =
-            "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn) FROM testTable LIMIT 10000";
 
         // Check data schema
-        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[0], isPql);
         DataSchema dataSchema = distinctTable.getDataSchema();
         assertEquals(dataSchema.getColumnNames(),
             new String[]{"intColumn", "longColumn", "floatColumn", "doubleColumn", "stringColumn", "bytesColumn"});
@@ -220,11 +217,9 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns with filter
-        String query =
-            "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM testTable WHERE intColumn >= 60 LIMIT 10000";
 
         // Check data schema
-        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[1], isPql);
         DataSchema dataSchema = distinctTable.getDataSchema();
         assertEquals(dataSchema.getColumnNames(), new String[]{"stringColumn", "bytesColumn", "floatColumn"});
         assertEquals(dataSchema.getColumnDataTypes(),
@@ -250,10 +245,9 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns order by BYTES column
-        String query = "SELECT DISTINCT(intColumn, bytesColumn) FROM testTable ORDER BY bytesColumn LIMIT 5";
 
         // Check data schema
-        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[2], isPql);
         DataSchema dataSchema = distinctTable.getDataSchema();
         assertEquals(dataSchema.getColumnNames(), new String[]{"intColumn", "bytesColumn"});
         assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.BYTES});
@@ -274,11 +268,9 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       {
         // Test selecting some columns with transform, filter, order-by and limit. Spaces in 'add' are intentional
         // to ensure that AggregationFunction arguments are standardized (to remove spaces).
-        String query =
-            "SELECT DISTINCT(ADD ( intColumn,  floatColumn  ), stringColumn) FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10";
 
         // Check data schema
-        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[3], isPql);
         DataSchema dataSchema = distinctTable.getDataSchema();
         assertEquals(dataSchema.getColumnNames(), new String[]{"add(intColumn,floatColumn)", "stringColumn"});
         assertEquals(dataSchema.getColumnDataTypes(),
@@ -297,11 +289,9 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns with filter that does not match any record
-        String query =
-            "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10";
 
         // Check data schema, where data type should be STRING for all columns
-        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[4], isPql);
         DataSchema dataSchema = distinctTable.getDataSchema();
         assertEquals(dataSchema.getColumnNames(), new String[]{"floatColumn", "longColumn"});
         assertEquals(dataSchema.getColumnDataTypes(),
@@ -315,11 +305,63 @@ public class DistinctQueriesTest extends BaseQueriesTest {
     }
   }
 
+
+  /**
+   * Test DISTINCT query within a single segment.
+   * <p>The following query types are tested:
+   * <ul>
+   *   <li>Selecting all columns</li>
+   *   <li>Selecting some columns with filter</li>
+   *   <li>Selecting some columns order by BYTES column</li>
+   *   <li>Selecting some columns transform, filter, order-by and limit</li>
+   *   <li>Selecting some columns with filter that does not match any record</li>
+   * </ul>
+   */
+  @Test
+  public void testDistinctInnerSegment()
+      throws Exception {
+    testDistinctInnerSegmentHelper(new String[]{
+        "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn) FROM testTable LIMIT 10000",
+        "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM testTable WHERE intColumn >= 60 LIMIT 10000",
+        "SELECT DISTINCT(intColumn, bytesColumn) FROM testTable ORDER BY bytesColumn LIMIT 5",
+        "SELECT DISTINCT(ADD ( intColumn,  floatColumn  ), stringColumn) FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
+        "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10"
+    }, true);
+  }
+
+  /**
+   * Test Non-Aggregation GroupBy query rewrite to Distinct query within a single segment.
+   * <p>The following query types are tested:
+   * <ul>
+   *   <li>Selecting all columns</li>
+   *   <li>Selecting some columns with filter</li>
+   *   <li>Selecting some columns order by BYTES column</li>
+   *   <li>Selecting some columns transform, filter, order-by and limit</li>
+   *   <li>Selecting some columns with filter that does not match any record</li>
+   * </ul>
+   */
+  @Test
+  public void testNonAggGroupByRewriteToDistinctInnerSegment()
+      throws Exception {
+    testDistinctInnerSegmentHelper(new String[]{
+        "SELECT intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn FROM testTable GROUP BY intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn LIMIT 10000",
+        "SELECT stringColumn, bytesColumn, floatColumn FROM testTable WHERE intColumn >= 60 GROUP BY stringColumn, bytesColumn, floatColumn LIMIT 10000",
+        "SELECT intColumn, bytesColumn FROM testTable GROUP BY intColumn, bytesColumn ORDER BY bytesColumn LIMIT 5",
+        "SELECT ADD ( intColumn,  floatColumn  ), stringColumn FROM testTable WHERE longColumn < 60 GROUP BY ADD ( intColumn,  floatColumn  ), stringColumn ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
+        "SELECT floatColumn, longColumn FROM testTable WHERE stringColumn = 'a' GROUP BY floatColumn, longColumn ORDER BY longColumn LIMIT 10"
+    }, false);
+  }
+
   /**
    * Helper method to get the DistinctTable result for one single segment for the given query.
    */
-  private DistinctTable getDistinctTableInnerSegment(String query) {
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+  private DistinctTable getDistinctTableInnerSegment(String query, boolean isPql) {
+    AggregationOperator aggregationOperator;
+    if (isPql) {
+      aggregationOperator = getOperatorForPqlQuery(query);
+    } else {
+      aggregationOperator = getOperatorForSqlQuery(query);
+    }
     List<Object> aggregationResult = aggregationOperator.nextBlock().getAggregationResult();
     assertNotNull(aggregationResult);
     assertEquals(aggregationResult.size(), 1);
@@ -347,8 +389,7 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    *   </li>
    * </ul>
    */
-  @Test
-  public void testDistinctInterSegment()
+  private void testDistinctInterSegmentHelper(String[] pqlQueries, String[] sqlQueries)
       throws Exception {
     ImmutableSegment segment0 = createSegment(0, generateRecords(0));
     ImmutableSegment segment1 = createSegment(1, generateRecords(1000));
@@ -356,10 +397,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
     try {
       {
         // Test selecting all columns
-        String pqlQuery =
-            "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn) FROM testTable LIMIT 10000";
-        String sqlQuery =
-            "SELECT DISTINCT intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn FROM testTable LIMIT 10000";
+        String pqlQuery = pqlQueries[0];
+        String sqlQuery = sqlQueries[0];
 
         // Check data schema
         BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
@@ -411,10 +450,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns with filter
-        String pqlQuery =
-            "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM testTable WHERE intColumn >= 60 LIMIT 10000";
-        String sqlQuery =
-            "SELECT DISTINCT stringColumn, bytesColumn, floatColumn FROM testTable WHERE intColumn >= 60 LIMIT 10000";
+        String pqlQuery = pqlQueries[1];
+        String sqlQuery = sqlQueries[1];
 
         // Check data schema
         BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
@@ -460,8 +497,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns order by BYTES column
-        String pqlQuery = "SELECT DISTINCT(intColumn, bytesColumn) FROM testTable ORDER BY bytesColumn LIMIT 5";
-        String sqlQuery = "SELECT DISTINCT intColumn, bytesColumn FROM testTable ORDER BY bytesColumn LIMIT 5";
+        String pqlQuery = pqlQueries[2];
+        String sqlQuery = sqlQueries[2];
 
         // Check data schema
         BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
@@ -498,10 +535,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns with transform, filter, order-by and limit
-        String pqlQuery =
-            "SELECT DISTINCT(ADD(intColumn, floatColumn), stringColumn) FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10";
-        String sqlQuery =
-            "SELECT DISTINCT ADD(intColumn, floatColumn), stringColumn FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10";
+        String pqlQuery = pqlQueries[3];
+        String sqlQuery = sqlQueries[3];
 
         // Check data schema
         BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
@@ -537,10 +572,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       }
       {
         // Test selecting some columns with filter that does not match any record
-        String pqlQuery =
-            "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10";
-        String sqlQuery =
-            "SELECT DISTINCT floatColumn, longColumn FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10";
+        String pqlQuery = pqlQueries[4];
+        String sqlQuery = sqlQueries[4];
 
         // Check data schema, where data type should be STRING for all columns
         BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
@@ -564,10 +597,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       {
         // Test selecting some columns with filter that does not match any record in one segment but matches some
         // records in the other segment
-        String pqlQuery =
-            "SELECT DISTINCT(intColumn) FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5";
-        String sqlQuery =
-            "SELECT DISTINCT intColumn FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5";
+        String pqlQuery = pqlQueries[5];
+        String sqlQuery = sqlQueries[5];
 
         // Check data schema
         BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
@@ -599,10 +630,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       {
         // Test electing some columns with filter that does not match any record in one server but matches some records
         // in the other server
-        String pqlQuery =
-            "SELECT DISTINCT(longColumn) FROM testTable WHERE doubleColumn < 200 ORDER BY longColumn DESC LIMIT 5";
-        String sqlQuery =
-            "SELECT DISTINCT longColumn FROM testTable WHERE doubleColumn < 200 ORDER BY longColumn DESC LIMIT 5";
+        String pqlQuery = pqlQueries[6];
+        String sqlQuery = sqlQueries[6];
 
         QueryContext pqlQueryContext = QueryContextConverterUtils.getQueryContextFromPQL(pqlQuery);
         BrokerResponseNative pqlResponse = queryServersWithDifferentSegments(pqlQueryContext, segment0, segment1);
@@ -644,6 +673,93 @@ public class DistinctQueriesTest extends BaseQueriesTest {
   }
 
   /**
+   * Test DISTINCT query across multiple segments and servers (2 servers, each with 2 segments).
+   * <p>Both PQL and SQL format are tested.
+   * <p>The following query types are tested:
+   * <ul>
+   *   <li>Selecting all columns</li>
+   *   <li>Selecting some columns with filter</li>
+   *   <li>Selecting some columns order by BYTES column</li>
+   *   <li>Selecting some columns transform, filter, order-by and limit</li>
+   *   <li>Selecting some columns with filter that does not match any record</li>
+   *   <li>
+   *     Selecting some columns with filter that does not match any record in one segment but matches some records in
+   *     the other segment
+   *   </li>
+   *   <li>
+   *     Selecting some columns with filter that does not match any record in one server but matches some records in the
+   *     other server
+   *   </li>
+   * </ul>
+   */
+  @Test
+  public void testDistinctInterSegment()
+      throws Exception {
+    String[] pqlQueries = new String[] {
+        "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn) FROM testTable LIMIT 10000",
+        "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM testTable WHERE intColumn >= 60 LIMIT 10000",
+        "SELECT DISTINCT(intColumn, bytesColumn) FROM testTable ORDER BY bytesColumn LIMIT 5",
+        "SELECT DISTINCT(ADD(intColumn, floatColumn), stringColumn) FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
+        "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10",
+        "SELECT DISTINCT(intColumn) FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5",
+        "SELECT DISTINCT(longColumn) FROM testTable WHERE doubleColumn < 200 ORDER BY longColumn DESC LIMIT 5",
+    };
+    String[] sqlQueries = new String[] {
+        "SELECT DISTINCT intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn FROM testTable LIMIT 10000",
+        "SELECT DISTINCT stringColumn, bytesColumn, floatColumn FROM testTable WHERE intColumn >= 60 LIMIT 10000",
+        "SELECT DISTINCT intColumn, bytesColumn FROM testTable ORDER BY bytesColumn LIMIT 5",
+        "SELECT DISTINCT ADD(intColumn, floatColumn), stringColumn FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
+        "SELECT DISTINCT floatColumn, longColumn FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10",
+        "SELECT DISTINCT intColumn FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5",
+        "SELECT DISTINCT longColumn FROM testTable WHERE doubleColumn < 200 ORDER BY longColumn DESC LIMIT 5",
+    };
+    testDistinctInterSegmentHelper(pqlQueries, sqlQueries);
+  }
+
+  /**
+   * Test Non-Aggregation GroupBy query rewrite to Distinct query across multiple segments and servers (2 servers, each with 2 segments).
+   * <p>Only SQL format are tested.
+   * <p>The following query types are tested:
+   * <ul>
+   *   <li>Selecting all columns</li>
+   *   <li>Selecting some columns with filter</li>
+   *   <li>Selecting some columns order by BYTES column</li>
+   *   <li>Selecting some columns transform, filter, order-by and limit</li>
+   *   <li>Selecting some columns with filter that does not match any record</li>
+   *   <li>
+   *     Selecting some columns with filter that does not match any record in one segment but matches some records in
+   *     the other segment
+   *   </li>
+   *   <li>
+   *     Selecting some columns with filter that does not match any record in one server but matches some records in the
+   *     other server
+   *   </li>
+   * </ul>
+   */
+  @Test
+  public void testNonAggGroupByRewriteToDistinctInterSegment()
+      throws Exception {
+    String[] pqlQueries = new String[] {
+        "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn) FROM testTable LIMIT 10000",
+        "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM testTable WHERE intColumn >= 60 LIMIT 10000",
+        "SELECT DISTINCT(intColumn, bytesColumn) FROM testTable ORDER BY bytesColumn LIMIT 5",
+        "SELECT DISTINCT(ADD(intColumn, floatColumn), stringColumn) FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
+        "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10",
+        "SELECT DISTINCT(intColumn) FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5",
+        "SELECT DISTINCT(longColumn) FROM testTable WHERE doubleColumn < 200 ORDER BY longColumn DESC LIMIT 5",
+    };
+    String[] sqlQueries = new String[] {
+        "SELECT intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn FROM testTable GROUP BY intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn LIMIT 10000",
+        "SELECT stringColumn, bytesColumn, floatColumn FROM testTable WHERE intColumn >= 60 GROUP BY stringColumn, bytesColumn, floatColumn LIMIT 10000",
+        "SELECT intColumn, bytesColumn FROM testTable GROUP BY intColumn, bytesColumn ORDER BY bytesColumn LIMIT 5",
+        "SELECT ADD(intColumn, floatColumn), stringColumn FROM testTable WHERE longColumn < 60 GROUP BY ADD(intColumn, floatColumn), stringColumn ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
+        "SELECT floatColumn, longColumn FROM testTable WHERE stringColumn = 'a' GROUP BY floatColumn, longColumn ORDER BY longColumn LIMIT 10",
+        "SELECT intColumn FROM testTable WHERE floatColumn > 200 GROUP BY intColumn ORDER BY intColumn ASC LIMIT 5",
+        "SELECT longColumn FROM testTable WHERE doubleColumn < 200 GROUP BY longColumn ORDER BY longColumn DESC LIMIT 5",
+    };
+    testDistinctInterSegmentHelper(pqlQueries, sqlQueries);
+  }
+  /**
    * Helper method to query 2 servers with different segments. Server0 will have 2 copies of segment0; Server1 will have
    * 2 copies of segment1.
    */
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java
index 386ae27..7429f3d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java
@@ -107,7 +107,7 @@ public class FastHllQueriesTest extends BaseQueriesTest {
 
     // Test inner segment queries
     // Test base query
-    AggregationOperator aggregationOperator = getOperatorForQuery(BASE_QUERY);
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(BASE_QUERY);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     ExecutionStatistics executionStatistics = aggregationOperator.getExecutionStatistics();
     QueriesTestUtils.testInnerSegmentExecutionStatistics(executionStatistics, 30000L, 0L, 60000L, 30000L);
@@ -115,7 +115,7 @@ public class FastHllQueriesTest extends BaseQueriesTest {
     Assert.assertEquals(((HyperLogLog) aggregationResult.get(0)).cardinality(), 21L);
     Assert.assertEquals(((HyperLogLog) aggregationResult.get(1)).cardinality(), 1762L);
     // Test query with filter
-    aggregationOperator = getOperatorForQueryWithFilter(BASE_QUERY);
+    aggregationOperator = getOperatorForPqlQueryWithFilter(BASE_QUERY);
     resultsBlock = aggregationOperator.nextBlock();
     executionStatistics = aggregationOperator.getExecutionStatistics();
     QueriesTestUtils.testInnerSegmentExecutionStatistics(executionStatistics, 6129L, 84134L, 12258L, 30000L);
@@ -123,7 +123,7 @@ public class FastHllQueriesTest extends BaseQueriesTest {
     Assert.assertEquals(((HyperLogLog) aggregationResult.get(0)).cardinality(), 17L);
     Assert.assertEquals(((HyperLogLog) aggregationResult.get(1)).cardinality(), 1197L);
     // Test query with group-by
-    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForQuery(BASE_QUERY + GROUP_BY);
+    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForPqlQuery(BASE_QUERY + GROUP_BY);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     executionStatistics = aggregationGroupByOperator.getExecutionStatistics();
     QueriesTestUtils.testInnerSegmentExecutionStatistics(executionStatistics, 30000L, 0L, 90000L, 30000L);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationMultiValueQueriesTest.java
index 16cb48e..670d6b1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationMultiValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationMultiValueQueriesTest.java
@@ -45,7 +45,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
     String query = "SELECT" + AGGREGATION + " FROM testTable";
 
     // Test query without filter.
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 100000L, 0L, 400000L,
@@ -55,7 +55,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
             1182655, 83439903673981L, 100000L);
 
     // Test query with filter.
-    aggregationOperator = getOperatorForQueryWithFilter(query);
+    aggregationOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 15620L, 275416, 62480L,
@@ -70,7 +70,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
     String query = "SELECT" + MULTI_VALUE_AGGREGATION + " FROM testTable";
 
     // Test query without filter.
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 100000L, 0L, 200000L,
@@ -80,7 +80,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
             201, 121081150452570L, 106688L);
 
     // Test query with filter.
-    aggregationOperator = getOperatorForQueryWithFilter(query);
+    aggregationOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 15620L, 275416L, 31240L,
@@ -95,7 +95,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
     String query = "SELECT" + AGGREGATION + " FROM testTable" + SMALL_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 100000L, 0L, 500000L,
@@ -105,7 +105,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
             2100941020, 117939666, 23061775005L, 26L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 15620L, 275416L,
@@ -120,7 +120,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
     String query = "SELECT" + AGGREGATION + " FROM testTable" + MEDIUM_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 100000L, 0L, 700000L,
@@ -130,7 +130,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
             1095214422L, 1547156787, 528554902, 52058876L, 1L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 15620L, 275416L,
@@ -145,7 +145,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
     String query = "SELECT" + AGGREGATION + " FROM testTable" + LARGE_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 100000L, 0L, 700000L,
@@ -154,7 +154,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
         "240129976\tL\t2147483647\t2147483647", 1L, 240129976L, 1649812746, 2077178039, 1952924139L, 1L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 15620L, 275416L,
@@ -168,7 +168,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
     String query = "SELECT" + AGGREGATION + " FROM testTable" + VERY_LARGE_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 100000L, 0L, 700000L,
@@ -178,7 +178,7 @@ public class InnerSegmentAggregationMultiValueQueriesTest extends BaseMultiValue
         675163196L, 1L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 15620L, 275416L,
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
index 27dde46..9d5408c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
@@ -49,7 +49,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
     String query = "SELECT" + AGGREGATION + " FROM testTable";
 
     // Test query without filter.
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 30000L, 0L, 120000L, 30000L);
@@ -58,7 +58,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
             1689277, 28175373944314L, 30000L);
 
     // Test query with filter.
-    aggregationOperator = getOperatorForQueryWithFilter(query);
+    aggregationOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), 6129L, 84134L, 24516L,
@@ -73,7 +73,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
     String query = "SELECT" + AGGREGATION + " FROM testTable" + SMALL_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 30000L, 0L, 150000L,
@@ -83,7 +83,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
             1215316262, 1328642550, 788414092L, 1L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 6129L, 84134L, 30645L,
@@ -98,7 +98,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
     String query = "SELECT" + AGGREGATION + " FROM testTable" + MEDIUM_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 30000L, 0L, 210000L,
@@ -108,7 +108,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
             4L, 2062187196L, 1988589001, 394608493, 4782388964L, 4L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 6129L, 84134L, 42903L,
@@ -122,7 +122,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
     String query = "SELECT" + AGGREGATION + " FROM testTable" + LARGE_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 30000L, 0L, 210000L,
@@ -131,7 +131,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
         "484569489\t16200443\t1159557463\tP\tMaztCmmxxgguBUxPti", 2L, 969138978L, 995355481, 16200443, 2222394270L, 2L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 6129L, 84134L, 42903L,
@@ -145,7 +145,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
     String query = "SELECT" + AGGREGATION + " FROM testTable" + VERY_LARGE_GROUP_BY;
 
     // Test query without filter.
-    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 30000L, 0L, 270000L,
@@ -155,7 +155,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
         204243323, 628170461, 1985159279L, 1L);
 
     // Test query with filter.
-    aggregationGroupByOperator = getOperatorForQueryWithFilter(query);
+    aggregationGroupByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = aggregationGroupByOperator.nextBlock();
     QueriesTestUtils
         .testInnerSegmentExecutionStatistics(aggregationGroupByOperator.getExecutionStatistics(), 6129L, 84134L, 55161L,
@@ -174,7 +174,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
   @Test
   public void testSingleColumnDistinct() {
     String query = "SELECT DISTINCT(column1) FROM testTable LIMIT 1000000";
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     List<Object> operatorResult = resultsBlock.getAggregationResult();
 
@@ -206,7 +206,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
   @Test
   public void testMultiColumnDistinct() {
     String query = "SELECT DISTINCT(column1, column3) FROM testTable LIMIT 1000000";
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     List<Object> operatorResult = resultsBlock.getAggregationResult();
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
index 8f11ee9..c264a85 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
@@ -41,7 +41,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     String query = "SELECT * FROM testTable LIMIT 0";
 
     // Test query without filter
-    EmptySelectionOperator emptySelectionOperator = getOperatorForQuery(query);
+    EmptySelectionOperator emptySelectionOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = emptySelectionOperator.nextBlock();
     ExecutionStatistics executionStatistics = emptySelectionOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 0L);
@@ -61,7 +61,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertTrue(resultsBlock.getSelectionResult().isEmpty());
 
     // Test query with filter
-    emptySelectionOperator = getOperatorForQueryWithFilter(query);
+    emptySelectionOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = emptySelectionOperator.nextBlock();
     executionStatistics = emptySelectionOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 0L);
@@ -85,7 +85,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     String query = "SELECT * FROM testTable";
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -110,7 +110,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(firstRow[columnIndexMap.get("column6")], new int[]{2147483647});
 
     // Test query with filter
-    selectionOnlyOperator = getOperatorForQueryWithFilter(query);
+    selectionOnlyOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOnlyOperator.nextBlock();
     executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -140,7 +140,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     String query = "SELECT" + SELECTION + " FROM testTable";
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -165,7 +165,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(firstRow[columnIndexMap.get("column6")], new int[]{2147483647});
 
     // Test query with filter
-    selectionOnlyOperator = getOperatorForQueryWithFilter(query);
+    selectionOnlyOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOnlyOperator.nextBlock();
     executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -195,7 +195,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     String query = "SELECT" + SELECTION + " FROM testTable" + ORDER_BY;
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 100000L);
@@ -221,7 +221,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{1252});
 
     // Test query with filter
-    selectionOrderByOperator = getOperatorForQueryWithFilter(query);
+    selectionOrderByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOrderByOperator.nextBlock();
     executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 15620L);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index 76e22dd..607857e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -41,7 +41,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     String query = "SELECT * FROM testTable LIMIT 0";
 
     // Test query without filter
-    EmptySelectionOperator emptySelectionOperator = getOperatorForQuery(query);
+    EmptySelectionOperator emptySelectionOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = emptySelectionOperator.nextBlock();
     ExecutionStatistics executionStatistics = emptySelectionOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 0L);
@@ -60,7 +60,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertTrue(resultsBlock.getSelectionResult().isEmpty());
 
     // Test query with filter
-    emptySelectionOperator = getOperatorForQueryWithFilter(query);
+    emptySelectionOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = emptySelectionOperator.nextBlock();
     executionStatistics = emptySelectionOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 0L);
@@ -85,7 +85,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     String query = "SELECT * FROM testTable";
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -110,7 +110,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals((String) firstRow[columnIndexMap.get("column11")], "P");
 
     // Test query with filter
-    selectionOnlyOperator = getOperatorForQueryWithFilter(query);
+    selectionOnlyOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOnlyOperator.nextBlock();
     executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -140,7 +140,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     String query = "SELECT" + SELECTION + " FROM testTable";
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOnlyOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOnlyOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -165,7 +165,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals((String) firstRow[2], "P");
 
     // Test query with filter
-    selectionOnlyOperator = getOperatorForQueryWithFilter(query);
+    selectionOnlyOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOnlyOperator.nextBlock();
     executionStatistics = selectionOnlyOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 10L);
@@ -194,7 +194,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     String query = "SELECT" + SELECTION + " FROM testTable" + ORDER_BY;
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
@@ -220,7 +220,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 10542595);
 
     // Test query with filter
-    selectionOrderByOperator = getOperatorForQueryWithFilter(query);
+    selectionOrderByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOrderByOperator.nextBlock();
     executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 6129L);
@@ -251,7 +251,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     String query = "SELECT * " + " FROM testTable" + ORDER_BY;
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
@@ -278,7 +278,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 10542595);
 
     // Test query with filter
-    selectionOrderByOperator = getOperatorForQueryWithFilter(query);
+    selectionOrderByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOrderByOperator.nextBlock();
     executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 6129L);
@@ -310,7 +310,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     String query = "SELECT * " + " FROM testTable" + ORDER_BY + " LIMIT 5000, 7000";
 
     // Test query without filter
-    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForQuery(query);
+    BaseOperator<IntermediateResultsBlock> selectionOrderByOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = selectionOrderByOperator.nextBlock();
     ExecutionStatistics executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 30000L);
@@ -337,7 +337,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals((int) lastRow[columnIndexMap.get("column1")], 1715964282);
 
     // Test query with filter
-    selectionOrderByOperator = getOperatorForQueryWithFilter(query);
+    selectionOrderByOperator = getOperatorForPqlQueryWithFilter(query);
     resultsBlock = selectionOrderByOperator.nextBlock();
     executionStatistics = selectionOrderByOperator.getExecutionStatistics();
     Assert.assertEquals(executionStatistics.getNumDocsScanned(), 6129L);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
index d1e7023..bb17808 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
@@ -164,7 +164,7 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
   @Test
   public void testInnerSegmentAggregation() {
     // For inner segment case, percentile does not affect the intermediate result
-    AggregationOperator aggregationOperator = getOperatorForQuery(getAggregationQuery(0));
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(getAggregationQuery(0));
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     List<Object> aggregationResult = resultsBlock.getAggregationResult();
     Assert.assertNotNull(aggregationResult);
@@ -193,7 +193,7 @@ public class PercentileTDigestQueriesTest extends BaseQueriesTest {
   @Test
   public void testInnerSegmentGroupBy() {
     // For inner segment case, percentile does not affect the intermediate result
-    AggregationGroupByOperator groupByOperator = getOperatorForQuery(getGroupByQuery(0));
+    AggregationGroupByOperator groupByOperator = getOperatorForPqlQuery(getGroupByQuery(0));
     IntermediateResultsBlock resultsBlock = groupByOperator.nextBlock();
     AggregationGroupByResult groupByResult = resultsBlock.getAggregationGroupByResult();
     Assert.assertNotNull(groupByResult);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
index 99f3a90..cd97723 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
@@ -195,7 +195,7 @@ public class RangePredicateWithSortedInvertedIndexTest extends BaseQueriesTest {
   }
 
   private void runQuery(String query, int count, List<Pairs.IntPair> intPairs, int numColumns) {
-    SelectionOnlyOperator operator = getOperatorForQuery(query);
+    SelectionOnlyOperator operator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock block = operator.nextBlock();
     Collection<Object[]> rows = block.getSelectionResult();
     assertNotNull(rows, ERROR_MESSAGE);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
index b1572b2..6b7bafb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
@@ -225,7 +225,7 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest {
   @Test
   public void testInnerSegmentAggregation()
       throws Exception {
-    AggregationOperator aggregationOperator = getOperatorForQuery(getAggregationQuery());
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(getAggregationQuery());
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     List<Object> aggregationResult = resultsBlock.getAggregationResult();
     assertNotNull(aggregationResult);
@@ -386,7 +386,7 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest {
   @Test
   public void testInnerSegmentSVGroupBy()
       throws Exception {
-    AggregationGroupByOperator groupByOperator = getOperatorForQuery(getSVGroupByQuery());
+    AggregationGroupByOperator groupByOperator = getOperatorForPqlQuery(getSVGroupByQuery());
     IntermediateResultsBlock resultsBlock = groupByOperator.nextBlock();
     AggregationGroupByResult groupByResult = resultsBlock.getAggregationGroupByResult();
     assertNotNull(groupByResult);
@@ -582,7 +582,7 @@ public class SerializedBytesQueriesTest extends BaseQueriesTest {
   @Test
   public void testInnerSegmentMVGroupBy()
       throws Exception {
-    AggregationGroupByOperator groupByOperator = getOperatorForQuery(getMVGroupByQuery());
+    AggregationGroupByOperator groupByOperator = getOperatorForPqlQuery(getMVGroupByQuery());
     IntermediateResultsBlock resultsBlock = groupByOperator.nextBlock();
     AggregationGroupByResult groupByResult = resultsBlock.getAggregationGroupByResult();
     assertNotNull(groupByResult);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
index e8ab6d4..fb44226 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
@@ -1066,7 +1066,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {  private static fin
   private void testTextSearchSelectQueryHelper(String query, int expectedResultSize, boolean compareGrepOutput,
       List<Serializable[]> expectedResults)
       throws Exception {
-    SelectionOnlyOperator operator = getOperatorForQuery(query);
+    SelectionOnlyOperator operator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock operatorResult = operator.nextBlock();
     List<Object[]> resultset = (List<Object[]>) operatorResult.getSelectionResult();
     Assert.assertNotNull(resultset);
@@ -1108,7 +1108,7 @@ public class TextSearchQueriesTest extends BaseQueriesTest {  private static fin
   }
 
   private void testTextSearchAggregationQueryHelper(String query, int expectedCount) {
-    AggregationOperator operator = getOperatorForQuery(query);
+    AggregationOperator operator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock operatorResult = operator.nextBlock();
     long count = (Long) operatorResult.getAggregationResult().get(0);
     Assert.assertEquals(expectedCount, count);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
index 0b5c9f3..1127f93 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
@@ -185,7 +185,7 @@ public class TransformQueriesTest extends BaseQueriesTest {
   }
 
   private void runAndVerifyInnerSegmentQuery(String query, double expectedSum, int expectedCount) {
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
     List<Object> aggregationResult = resultsBlock.getAggregationResult();
     assertNotNull(aggregationResult);
@@ -210,7 +210,7 @@ public class TransformQueriesTest extends BaseQueriesTest {
   }
 
   private void verifyDateTruncationResult(String query, String expectedStringKey) {
-    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForQuery(query);
+    AggregationGroupByOperator aggregationGroupByOperator = getOperatorForPqlQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationGroupByOperator.nextBlock();
     AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
     assertNotNull(aggregationGroupByResult);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 64b1c47..c9f5bac 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -857,7 +857,7 @@ public class ClusterIntegrationTestUtils {
     // compare results
     BrokerRequest brokerRequest =
         PinotQueryParserFactory.get(CommonConstants.Broker.Request.SQL).compileToBrokerRequest(pinotQuery);
-    if (brokerRequest.getSelections() != null) { // selection
+    if (isSelectionQuery(brokerRequest)) { // selection
       // TODO: compare results for selection queries, w/o order by
 
       // Compare results for selection queries, with order by
@@ -867,9 +867,9 @@ public class ClusterIntegrationTestUtils {
           return;
         }
         Set<String> orderByColumns =
-            CalciteSqlParser.extractIdentifiers(brokerRequest.getPinotQuery().getOrderByList());
+            CalciteSqlParser.extractIdentifiers(brokerRequest.getPinotQuery().getOrderByList(), false);
         Set<String> selectionColumns =
-            CalciteSqlParser.extractIdentifiers(brokerRequest.getPinotQuery().getSelectList());
+            CalciteSqlParser.extractIdentifiers(brokerRequest.getPinotQuery().getSelectList(), false);
         if (!selectionColumns.containsAll(orderByColumns)) {
           // Selection columns has no overlap with order by column, don't compare.
           return;
@@ -881,7 +881,7 @@ public class ClusterIntegrationTestUtils {
               String brokerValue = brokerResponseRows.get(i).get(c).asText();
               String connectionValue = resultTableResultSet.getString(i, c);
               if (orderByColumns.containsAll(CalciteSqlParser
-                  .extractIdentifiers(Arrays.asList(brokerRequest.getPinotQuery().getSelectList().get(c))))) {
+                  .extractIdentifiers(Arrays.asList(brokerRequest.getPinotQuery().getSelectList().get(c)), false))) {
                 boolean error = fuzzyCompare(h2Value, brokerValue, connectionValue);
                 if (error) {
                   String failureMessage =
@@ -962,6 +962,16 @@ public class ClusterIntegrationTestUtils {
     }
   }
 
+  private static boolean isSelectionQuery(BrokerRequest brokerRequest) {
+    if (brokerRequest.getSelections() != null) {
+      return true;
+    }
+    if (brokerRequest.getAggregationsInfo() != null && brokerRequest.getAggregationsInfo().get(0).getAggregationType().equalsIgnoreCase("DISTINCT")) {
+      return true;
+    }
+    return false;
+  }
+
   private static boolean fuzzyCompare(String h2Value, String brokerValue, String connectionValue) {
     // Fuzzy compare expected value and actual value
     boolean error = false;
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 3cde6b5..774f209 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1074,18 +1074,59 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     String pql = "SELECT DISTINCT(Carrier) FROM mytable LIMIT 1000000";
     String sql = "SELECT DISTINCT Carrier FROM mytable";
     testQuery(pql, Collections.singletonList(sql));
+    pql = "SELECT DISTINCT Carrier FROM mytable LIMIT 1000000";
+    testSqlQuery(pql, Collections.singletonList(sql));
 
     pql = "SELECT DISTINCT(Carrier, DestAirportID) FROM mytable LIMIT 1000000";
     sql = "SELECT DISTINCT Carrier, DestAirportID FROM mytable";
     testQuery(pql, Collections.singletonList(sql));
+    pql = "SELECT DISTINCT Carrier, DestAirportID FROM mytable LIMIT 1000000";
+    testSqlQuery(pql, Collections.singletonList(sql));
 
     pql = "SELECT DISTINCT(Carrier, DestAirportID, DestStateName) FROM mytable LIMIT 1000000";
     sql = "SELECT DISTINCT Carrier, DestAirportID, DestStateName FROM mytable";
     testQuery(pql, Collections.singletonList(sql));
+    pql = "SELECT DISTINCT Carrier, DestAirportID, DestStateName FROM mytable LIMIT 1000000";
+    testSqlQuery(pql, Collections.singletonList(sql));
 
     pql = "SELECT DISTINCT(Carrier, DestAirportID, DestCityName) FROM mytable LIMIT 1000000";
     sql = "SELECT DISTINCT Carrier, DestAirportID, DestCityName FROM mytable";
     testQuery(pql, Collections.singletonList(sql));
+    pql = "SELECT DISTINCT Carrier, DestAirportID, DestCityName FROM mytable LIMIT 1000000";
+    testSqlQuery(pql, Collections.singletonList(sql));
+  }
+
+  @Test
+  public void testNonAggregationGroupByQuery()
+      throws Exception {
+    // by default 10 rows will be returned, so use high limit
+    String pql = "SELECT Carrier FROM mytable GROUP BY Carrier LIMIT 1000000";
+    String sql = "SELECT Carrier FROM mytable GROUP BY Carrier";
+    testSqlQuery(pql, Collections.singletonList(sql));
+
+    pql = "SELECT Carrier, DestAirportID FROM mytable GROUP BY Carrier, DestAirportID LIMIT 1000000";
+    sql = "SELECT Carrier, DestAirportID FROM mytable GROUP BY Carrier, DestAirportID";
+    testSqlQuery(pql, Collections.singletonList(sql));
+
+    pql = "SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY Carrier, DestAirportID, DestStateName LIMIT 1000000";
+    sql = "SELECT Carrier, DestAirportID, DestStateName FROM mytable GROUP BY Carrier, DestAirportID, DestStateName";
+    testSqlQuery(pql, Collections.singletonList(sql));
+
+    pql = "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY Carrier, DestAirportID, DestCityName LIMIT 1000000";
+    sql = "SELECT Carrier, DestAirportID, DestCityName FROM mytable GROUP BY Carrier, DestAirportID, DestCityName";
+    testSqlQuery(pql, Collections.singletonList(sql));
+
+    pql = "SELECT ArrTime-DepTime FROM mytable GROUP BY ArrTime, DepTime LIMIT 1000000";
+    sql = "SELECT ArrTime-DepTime FROM mytable GROUP BY ArrTime, DepTime";
+    testSqlQuery(pql, Collections.singletonList(sql));
+
+    pql = "SELECT ArrTime-DepTime,ArrTime/3,DepTime*2 FROM mytable GROUP BY ArrTime, DepTime LIMIT 1000000";
+    sql = "SELECT ArrTime-DepTime,ArrTime/3,DepTime*2 FROM mytable GROUP BY ArrTime, DepTime";
+    testSqlQuery(pql, Collections.singletonList(sql));
+
+    pql = "SELECT ArrTime+DepTime FROM mytable GROUP BY ArrTime + DepTime LIMIT 1000000";
+    sql = "SELECT ArrTime+DepTime FROM mytable GROUP BY ArrTime + DepTime";
+    testSqlQuery(pql, Collections.singletonList(sql));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org