You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/09/16 22:47:09 UTC

[pinot] branch master updated: [multistage] test using BaseClusterIntegrationTestSet (#9412)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d6665b8e5 [multistage] test using BaseClusterIntegrationTestSet (#9412)
2d6665b8e5 is described below

commit 2d6665b8e5fa0842ef67b3d9896c5e04ecad78e9
Author: Rong Rong <ro...@apache.org>
AuthorDate: Fri Sep 16 15:47:02 2022 -0700

    [multistage] test using BaseClusterIntegrationTestSet (#9412)
    
    by running the hard-coded queries list first
    
    - make relRoot info available in query plan
    - uses fields / collation / hints properly before returning results
    - fix SQL conformance
    - some other bug fixes
    
    this PR also listed out TODOs for features we don't support right now.
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../MultiStageBrokerRequestHandler.java            |  27 +----
 .../tests/ClusterIntegrationTestUtils.java         |  11 +-
 .../pinot/integration/tests/ClusterTest.java       |  15 +++
 .../tests/BaseClusterIntegrationTestSet.java       | 120 +++++++++++++--------
 .../tests/MultiStageEngineIntegrationTest.java     |  45 +++-----
 .../PinotLogicalSortFetchEliminationRule.java      |  56 ----------
 .../org/apache/pinot/query/QueryEnvironment.java   |  13 +--
 .../org/apache/pinot/query/planner/QueryPlan.java  |  19 +++-
 .../pinot/query/planner/logical/StagePlanner.java  |   7 +-
 .../org/apache/pinot/query/validate/Validator.java |   3 +-
 .../pinot/query/service/QueryDispatcher.java       |  44 +++++++-
 .../pinot/query/runtime/QueryRunnerTest.java       |   6 +-
 .../pinot/query/runtime/QueryRunnerTestBase.java   |  13 ---
 13 files changed, 194 insertions(+), 185 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 2554c9ed60..238ce36570 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -40,9 +40,7 @@ import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.request.RequestUtils;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironment;
 import org.apache.pinot.query.catalog.PinotCatalog;
@@ -156,7 +154,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
       return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
     }
 
-    List<DataTable> queryResults = null;
+    ResultTable queryResults;
     try {
       queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, _mailboxService, DEFAULT_TIMEOUT_NANO);
     } catch (Exception e) {
@@ -171,7 +169,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
     long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(sqlNodeAndOptions.getParseTimeNs()
         + (executionEndTimeNs - compilationStartTimeNs));
     brokerResponse.setTimeUsedMs(totalTimeMs);
-    brokerResponse.setResultTable(toResultTable(queryResults));
+    brokerResponse.setResultTable(queryResults);
     requestContext.setQueryProcessingTime(totalTimeMs);
     augmentStatistics(requestContext, brokerResponse);
     return brokerResponse;
@@ -197,27 +195,6 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
     throw new UnsupportedOperationException();
   }
 
-  private ResultTable toResultTable(List<DataTable> queryResult) {
-    DataSchema resultDataSchema = null;
-    List<Object[]> resultRows = new ArrayList<>();
-    for (DataTable dataTable : queryResult) {
-      resultDataSchema = resultDataSchema == null ? dataTable.getDataSchema() : resultDataSchema;
-      int numColumns = resultDataSchema.getColumnNames().length;
-      DataSchema.ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes();
-      List<Object[]> rows = new ArrayList<>(dataTable.getNumberOfRows());
-      for (int rowId = 0; rowId < dataTable.getNumberOfRows(); rowId++) {
-        Object[] row = new Object[numColumns];
-        Object[] rawRow = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
-        for (int i = 0; i < numColumns; i++) {
-          row[i] = resultColumnDataTypes[i].convertAndFormat(rawRow[i]);
-        }
-        rows.add(row);
-      }
-      resultRows.addAll(rows);
-    }
-    return new ResultTable(resultDataSchema, resultRows);
-  }
-
   @Override
   public void start() {
     // no-op
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 21abfd6430..697efedb37 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -555,8 +555,15 @@ public class ClusterIntegrationTestUtils {
   static void testQuery(String pinotQuery, String brokerUrl, org.apache.pinot.client.Connection pinotConnection,
       String h2Query, Connection h2Connection, @Nullable Map<String, String> headers)
       throws Exception {
+    testQuery(pinotQuery, brokerUrl, pinotConnection, h2Query, h2Connection, headers, null);
+  }
+
+  static void testQuery(String pinotQuery, String brokerUrl, org.apache.pinot.client.Connection pinotConnection,
+      String h2Query, Connection h2Connection, @Nullable Map<String, String> headers,
+      @Nullable Map<String, String> extraJsonProperties)
+      throws Exception {
     // broker response
-    JsonNode pinotResponse = ClusterTest.postQuery(pinotQuery, brokerUrl, headers);
+    JsonNode pinotResponse = ClusterTest.postQuery(pinotQuery, brokerUrl, headers, extraJsonProperties);
     if (!pinotResponse.get("exceptions").isEmpty()) {
       throw new RuntimeException("Got Exceptions from Query Response: " + pinotResponse);
     }
@@ -638,7 +645,7 @@ public class ClusterIntegrationTestUtils {
             return;
           }
           if (h2ResultSet.first()) {
-            for (int i = 0; i < brokerResponseRows.size(); i++) {
+            for (int i = 0; i < numRows; i++) {
               for (int c = 0; c < numColumns; c++) {
                 String h2Value = h2ResultSet.getString(c + 1);
                 String brokerValue = brokerResponseRows.get(i).get(c).asText();
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index e940fd0f06..7dfc3134e3 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -42,6 +42,7 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.http.Header;
 import org.apache.http.HttpStatus;
@@ -448,8 +449,22 @@ public abstract class ClusterTest extends ControllerTest {
    */
   public static JsonNode postQuery(String query, String brokerBaseApiUrl, Map<String, String> headers)
       throws Exception {
+    return postQuery(query, brokerBaseApiUrl, headers, null);
+  }
+
+  /**
+   * Queries the broker's sql query endpoint (/sql)
+   */
+  public static JsonNode postQuery(String query, String brokerBaseApiUrl, Map<String, String> headers,
+      Map<String, String> extraJsonProperties)
+      throws Exception {
     ObjectNode payload = JsonUtils.newObjectNode();
     payload.put("sql", query);
+    if (MapUtils.isNotEmpty(extraJsonProperties)) {
+      for (Map.Entry<String, String> extraProperty :extraJsonProperties.entrySet()) {
+        payload.put(extraProperty.getKey(), extraProperty.getValue());
+      }
+    }
     return JsonUtils.stringToJsonNode(sendPostRequest(brokerBaseApiUrl + "/query/sql", payload.toString(), headers));
   }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index f4d9ef0b62..70a5fb7604 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -99,6 +99,27 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
     }, 600_000L, "Failed to delete table data managers");
   }
 
+  /**
+   * Test features supported in V2 Multi-stage Engine.
+   * - Some V1 features will not be supported.
+   * - Some V1 features will be added as V2 engine feature development progresses.
+   * @throws Exception
+   */
+  public void testHardcodedQueriesMultiStage()
+      throws Exception {
+    testHardcodedQueriesCommon();
+  }
+
+  /**
+   * Test hard-coded queries.
+   * @throws Exception
+   */
+  public void testHardcodedQueries()
+      throws Exception {
+    testHardcodedQueriesCommon();
+    testHardCodedQueriesV1();
+  }
+
   /**
    * Test hardcoded queries.
    * <p>NOTE:
@@ -118,9 +139,11 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
    * TODO: Selection queries, Aggregation Group By queries, Order By, Distinct
    *  This list is very basic right now (aggregations only) and needs to be enriched
    */
-  public void testHardcodedQueries()
+  private void testHardcodedQueriesCommon()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable WHERE CarrierDelay=15 AND ArrDelay > CarrierDelay LIMIT 1";
+    String query;
+    String h2Query;
+    query = "SELECT COUNT(*) FROM mytable WHERE CarrierDelay=15 AND ArrDelay > CarrierDelay LIMIT 1";
     testQuery(query);
     query = "SELECT ArrDelay, CarrierDelay, (ArrDelay - CarrierDelay) AS diff FROM mytable WHERE CarrierDelay=15 AND "
         + "ArrDelay > CarrierDelay ORDER BY diff, ArrDelay, CarrierDelay LIMIT 100000";
@@ -134,24 +157,6 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
     query = "SELECT count(*) FROM mytable WHERE AirlineID > 20355 AND OriginState BETWEEN 'PA' AND 'DE' AND DepTime <> "
         + "2202 LIMIT 21";
     testQuery(query);
-    query =
-        "SELECT SUM(CAST(CAST(ArrTime AS varchar) AS LONG)) FROM mytable WHERE DaysSinceEpoch <> 16312 AND Carrier = "
-            + "'DL'";
-    testQuery(query);
-    query =
-        "SELECT CAST(CAST(ArrTime AS varchar) AS LONG) FROM mytable WHERE DaysSinceEpoch <> 16312 AND Carrier = 'DL' "
-            + "ORDER BY ArrTime DESC";
-    testQuery(query);
-    query =
-        "SELECT DistanceGroup FROM mytable WHERE \"Month\" BETWEEN 1 AND 1 AND DivAirportSeqIDs IN (1078102, 1142303,"
-            + " 1530402, 1172102, 1291503) OR SecurityDelay IN (1, 0, 14, -9999) LIMIT 10";
-    String h2Query =
-        "SELECT DistanceGroup FROM mytable WHERE Month BETWEEN 1 AND 1 AND (DivAirportSeqIDs__MV0 IN (1078102, "
-            + "1142303, 1530402, 1172102, 1291503) OR DivAirportSeqIDs__MV1 IN (1078102, 1142303, 1530402, 1172102, "
-            + "1291503) OR DivAirportSeqIDs__MV2 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR "
-            + "DivAirportSeqIDs__MV3 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR DivAirportSeqIDs__MV4 IN "
-            + "(1078102, 1142303, 1530402, 1172102, 1291503)) OR SecurityDelay IN (1, 0, 14, -9999) LIMIT 10000";
-    testQuery(query, h2Query);
     query = "SELECT MAX(Quarter), MAX(FlightNum) FROM mytable LIMIT 8";
     h2Query = "SELECT MAX(Quarter),MAX(FlightNum) FROM mytable LIMIT 10000";
     testQuery(query, h2Query);
@@ -181,22 +186,7 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
     testQuery(query);
     query = "SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10 FROM mytable WHERE ArrTime - 100 > 0";
     testQuery(query);
-    query =
-        "SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10, ADD(ArrTime + 5, ArrDelay), ADD(ArrTime * 5, ArrDelay)"
-            + " FROM mytable WHERE mult((ArrTime - 100), (5 + ArrDelay))> 0";
-    h2Query =
-        "SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10, ArrTime + 5 + ArrDelay, ArrTime * 5 + ArrDelay FROM "
-            + "mytable WHERE (ArrTime - 100) * (5 + ArrDelay)> 0";
-    testQuery(query, h2Query);
-    query = "SELECT COUNT(*) AS \"date\", MAX(ArrTime) AS \"group\", MIN(ArrTime) AS min FROM myTable";
-    testQuery(query);
-
-    // LIKE
-    query = "SELECT count(*) FROM mytable WHERE OriginState LIKE 'A_'";
-    testQuery(query);
-    query = "SELECT count(*) FROM mytable WHERE DestCityName LIKE 'C%'";
-    testQuery(query);
-    query = "SELECT count(*) FROM mytable WHERE DestCityName LIKE '_h%'";
+    query = "SELECT COUNT(*) AS \"date\", MAX(ArrTime) AS \"group\", MIN(ArrTime) AS \"min\" FROM mytable";
     testQuery(query);
 
     // NOT
@@ -235,7 +225,59 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
     query = "SELECT DaysSinceEpoch, MAX(ArrDelay) - MAX(AirTime) AS Diff FROM mytable GROUP BY DaysSinceEpoch HAVING "
         + "(Diff >= 300 AND Diff < 500) OR Diff < -500 ORDER BY Diff DESC";
     testQuery(query);
+  }
+
+  private void testHardCodedQueriesV1()
+      throws Exception {
+    String query;
+    String h2Query;
+    // Escape quotes
+    // TODO: move to common when multistage support correct escaping strategy.
+    query = "SELECT DistanceGroup FROM mytable WHERE DATE_TIME_CONVERT(DaysSinceEpoch, '1:DAYS:EPOCH', "
+        + "'1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd''T''HH:mm:ss.SSS''Z''', '1:DAYS') = '2014-09-05T00:00:00.000Z'";
+    h2Query = "SELECT DistanceGroup FROM mytable WHERE DaysSinceEpoch = 16318 LIMIT 10000";
+    testQuery(query, h2Query);
+
+    // LIKE
+    // TODO: move to common when multistage support LIKE
+    query = "SELECT count(*) FROM mytable WHERE OriginState LIKE 'A_'";
+    testQuery(query);
+    query = "SELECT count(*) FROM mytable WHERE DestCityName LIKE 'C%'";
+    testQuery(query);
+    query = "SELECT count(*) FROM mytable WHERE DestCityName LIKE '_h%'";
+    testQuery(query);
 
+    // Non-Standard functions
+    // mult is not a standard function.
+    query =
+        "SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10, ADD(ArrTime + 5, ArrDelay), ADD(ArrTime * 5, ArrDelay)"
+            + " FROM mytable WHERE mult((ArrTime - 100), (5 + ArrDelay))> 0";
+    h2Query =
+        "SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10, ArrTime + 5 + ArrDelay, ArrTime * 5 + ArrDelay FROM "
+            + "mytable WHERE (ArrTime - 100) * (5 + ArrDelay)> 0";
+    testQuery(query, h2Query);
+    // TODO: move to common when multistage support CAST AS 'LONG', for now it must use: CAST AS BIGINT
+    query =
+        "SELECT SUM(CAST(CAST(ArrTime AS varchar) AS LONG)) FROM mytable WHERE DaysSinceEpoch <> 16312 AND Carrier = "
+            + "'DL'";
+    testQuery(query);
+    query =
+        "SELECT CAST(CAST(ArrTime AS varchar) AS LONG) FROM mytable WHERE DaysSinceEpoch <> 16312 AND Carrier = 'DL' "
+            + "ORDER BY ArrTime DESC";
+    testQuery(query);
+    // TODO: move to common when multistage support MV columns
+    query =
+        "SELECT DistanceGroup FROM mytable WHERE \"Month\" BETWEEN 1 AND 1 AND DivAirportSeqIDs IN (1078102, 1142303,"
+            + " 1530402, 1172102, 1291503) OR SecurityDelay IN (1, 0, 14, -9999) LIMIT 10";
+    h2Query =
+        "SELECT DistanceGroup FROM mytable WHERE Month BETWEEN 1 AND 1 AND (DivAirportSeqIDs__MV0 IN (1078102, "
+            + "1142303, 1530402, 1172102, 1291503) OR DivAirportSeqIDs__MV1 IN (1078102, 1142303, 1530402, 1172102, "
+            + "1291503) OR DivAirportSeqIDs__MV2 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR "
+            + "DivAirportSeqIDs__MV3 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR DivAirportSeqIDs__MV4 IN "
+            + "(1078102, 1142303, 1530402, 1172102, 1291503)) OR SecurityDelay IN (1, 0, 14, -9999) LIMIT 10000";
+    testQuery(query, h2Query);
+
+    // Non-Standard SQL syntax:
     // IN_ID_SET
     {
       IdSet idSet = IdSets.create(FieldSpec.DataType.LONG);
@@ -270,12 +312,6 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
               + "DaysSinceEpoch = 16430)";
       testQuery(notInSubqueryQuery, notInQuery);
     }
-
-    // Escape quotes
-    query = "SELECT DistanceGroup FROM mytable WHERE DATE_TIME_CONVERT(DaysSinceEpoch, '1:DAYS:EPOCH', "
-        + "'1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd''T''HH:mm:ss.SSS''Z''', '1:DAYS') = '2014-09-05T00:00:00.000Z'";
-    h2Query = "SELECT DistanceGroup FROM mytable WHERE DaysSinceEpoch = 16318 LIMIT 10000";
-    testQuery(query, h2Query);
   }
 
   /**
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index a639023b69..93934523a6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -18,9 +18,8 @@
  */
 package org.apache.pinot.integration.tests;
 
-import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableMap;
 import java.io.File;
-import java.io.IOException;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
@@ -29,16 +28,13 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 
-public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTest {
+public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestSet {
   private static final String SCHEMA_FILE_NAME =
       "On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema";
 
@@ -84,6 +80,13 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTest
     DataTableFactory.setDataTableVersion(4);
   }
 
+  @Test
+  @Override
+  public void testHardcodedQueriesMultiStage()
+      throws Exception {
+    super.testHardcodedQueriesMultiStage();
+  }
+
   @Override
   protected void overrideBrokerConf(PinotConfiguration brokerConf) {
     brokerConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
@@ -97,33 +100,13 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTest
     serverConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8422);
   }
 
-  @Test(dataProvider = "multiStageQueryEngineSqlTestSet")
-  public void testMultiStageQuery(String sql, int expectedNumOfRows, int expectedNumOfColumns)
-      throws IOException {
-    JsonNode multiStageResponse = JsonUtils.stringToJsonNode(
-        sendPostRequest(_brokerBaseApiUrl + "/query/sql",
-            "{\"queryOptions\":\"useMultistageEngine=true\", \"sql\":\"" + sql + "\"}"));
-    Assert.assertTrue(multiStageResponse.has("resultTable"));
-    JsonNode jsonNode = multiStageResponse.get("resultTable");
-    // TODO: assert actual result data payload.
-    Assert.assertEquals(jsonNode.get("rows").size(), expectedNumOfRows);
-    Assert.assertEquals(jsonNode.get("dataSchema").get("columnNames").size(), expectedNumOfColumns);
-  }
-
-  @DataProvider
-  public Object[][] multiStageQueryEngineSqlTestSet() {
-    return new Object[][] {
-        new Object[]{"SELECT * FROM mytable_OFFLINE WHERE ArrDelay>10000", 0, 73},
-        new Object[]{"SELECT COUNT(*) FROM mytable_OFFLINE WHERE Carrier='AA'", 1, 1},
-        new Object[]{"SELECT * FROM mytable_OFFLINE WHERE ArrDelay>1000", 2, 73},
-        new Object[]{"SELECT CarrierDelay, ArrDelay FROM mytable_OFFLINE"
-            + " WHERE CarrierDelay=15 AND ArrDelay>20", 172, 2},
-        new Object[]{"SELECT * FROM mytable_OFFLINE AS a JOIN mytable_OFFLINE AS b ON a.Origin = b.Origin "
-            + " WHERE a.Carrier='AA' AND a.ArrDelay>1000 AND b.ArrDelay>1000", 2, 146}
-    };
+  @Override
+  protected void testQuery(String pinotQuery, String h2Query)
+      throws Exception {
+    ClusterIntegrationTestUtils.testQuery(pinotQuery, _brokerBaseApiUrl, getPinotConnection(), h2Query,
+        getH2Connection(), null, ImmutableMap.of("queryOptions", "useMultistageEngine=true"));
   }
 
-
   @AfterClass
   public void tearDown()
       throws Exception {
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotLogicalSortFetchEliminationRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotLogicalSortFetchEliminationRule.java
deleted file mode 100644
index e6a32c387b..0000000000
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotLogicalSortFetchEliminationRule.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.calcite.rel.rules;
-
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.logical.LogicalSort;
-import org.apache.calcite.tools.RelBuilderFactory;
-
-
-/**
- * Special rule for Pinot, Pinot's top level sort fetch doesn't guarantee order without order by clause.
- */
-public class PinotLogicalSortFetchEliminationRule extends RelOptRule {
-  public static final PinotLogicalSortFetchEliminationRule INSTANCE =
-      new PinotLogicalSortFetchEliminationRule(PinotRuleUtils.PINOT_REL_FACTORY);
-
-  public PinotLogicalSortFetchEliminationRule(RelBuilderFactory factory) {
-    super(operand(LogicalSort.class, any()), factory, null);
-  }
-
-  @Override
-  public boolean matches(RelOptRuleCall call) {
-    if (call.rels.length < 1) {
-      return false;
-    }
-    if (call.rel(0) instanceof LogicalSort) {
-      Sort sort = call.rel(0);
-      return sort.collation.getFieldCollations().size() == 0;
-    }
-    return false;
-  }
-
-  @Override
-  public void onMatch(RelOptRuleCall call) {
-    Sort sort = call.rel(0);
-    call.transformTo(sort.getInputs().get(0));
-  }
-}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 87156476e9..0a661ac378 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -125,7 +125,7 @@ public class QueryEnvironment {
   public QueryPlan planQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions) {
     try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
       plannerContext.setOptions(sqlNodeAndOptions.getOptions());
-      RelNode relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), plannerContext);
+      RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), plannerContext);
       return toDispatchablePlan(relRoot, plannerContext);
     } catch (Exception e) {
       throw new RuntimeException("Error composing query plan for: " + sqlQuery, e);
@@ -147,11 +147,11 @@ public class QueryEnvironment {
     try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
       SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
       plannerContext.setOptions(sqlNodeAndOptions.getOptions());
-      RelNode relRoot = compileQuery(explain.getExplicandum(), plannerContext);
+      RelRoot relRoot = compileQuery(explain.getExplicandum(), plannerContext);
       SqlExplainFormat format = explain.getFormat() == null ? SqlExplainFormat.DOT : explain.getFormat();
       SqlExplainLevel level =
           explain.getDetailLevel() == null ? SqlExplainLevel.DIGEST_ATTRIBUTES : explain.getDetailLevel();
-      return PlannerUtils.explainPlan(relRoot, format, level);
+      return PlannerUtils.explainPlan(relRoot.rel, format, level);
     } catch (Exception e) {
       throw new RuntimeException("Error explain query plan for: " + sqlQuery, e);
     }
@@ -172,11 +172,12 @@ public class QueryEnvironment {
   // --------------------------------------------------------------------------
 
   @VisibleForTesting
-  protected RelNode compileQuery(SqlNode sqlNode, PlannerContext plannerContext)
+  protected RelRoot compileQuery(SqlNode sqlNode, PlannerContext plannerContext)
       throws Exception {
     SqlNode validated = validate(sqlNode, plannerContext);
     RelRoot relation = toRelation(validated, plannerContext);
-    return optimize(relation, plannerContext);
+    RelNode optimized = optimize(relation, plannerContext);
+    return relation.withRel(optimized);
   }
 
   private SqlNode validate(SqlNode parsed, PlannerContext plannerContext)
@@ -212,7 +213,7 @@ public class QueryEnvironment {
     }
   }
 
-  private QueryPlan toDispatchablePlan(RelNode relRoot, PlannerContext plannerContext) {
+  private QueryPlan toDispatchablePlan(RelRoot relRoot, PlannerContext plannerContext) {
     // 5. construct a dispatchable query plan.
     StagePlanner queryStagePlanner = new StagePlanner(plannerContext, _workerManager);
     return queryStagePlanner.makePlan(relRoot);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
index a93bddc4f7..b13194f4b8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.query.planner;
 
+import java.util.List;
 import java.util.Map;
+import org.apache.calcite.util.Pair;
 import org.apache.pinot.query.planner.logical.LogicalPlanner;
 import org.apache.pinot.query.planner.stage.StageNode;
 
@@ -35,10 +37,13 @@ import org.apache.pinot.query.planner.stage.StageNode;
  * </ul>
  */
 public class QueryPlan {
-  private Map<Integer, StageNode> _queryStageMap;
-  private Map<Integer, StageMetadata> _stageMetadataMap;
+  private final List<Pair<Integer, String>> _queryResultFields;
+  private final Map<Integer, StageNode> _queryStageMap;
+  private final Map<Integer, StageMetadata> _stageMetadataMap;
 
-  public QueryPlan(Map<Integer, StageNode> queryStageMap, Map<Integer, StageMetadata> stageMetadataMap) {
+  public QueryPlan(List<Pair<Integer, String>> fields, Map<Integer, StageNode> queryStageMap,
+      Map<Integer, StageMetadata> stageMetadataMap) {
+    _queryResultFields = fields;
     _queryStageMap = queryStageMap;
     _stageMetadataMap = stageMetadataMap;
   }
@@ -58,4 +63,12 @@ public class QueryPlan {
   public Map<Integer, StageMetadata> getStageMetadataMap() {
     return _stageMetadataMap;
   }
+
+  /**
+   * Get the query result field.
+   * @return query result field.
+   */
+  public List<Pair<Integer, String>> getQueryResultFields() {
+    return _queryResultFields;
+  }
 }
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 6212f68fa0..dd03a1cef8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -67,7 +67,8 @@ public class StagePlanner {
    * @param relRoot relational plan root.
    * @return dispatchable plan.
    */
-  public QueryPlan makePlan(RelNode relRoot) {
+  public QueryPlan makePlan(RelRoot relRoot) {
+    RelNode relRootNode = relRoot.rel;
     // clear the state
     _queryStageMap = new HashMap<>();
     _stageMetadataMap = new HashMap<>();
@@ -75,7 +76,7 @@ public class StagePlanner {
     _stageIdCounter = 1;
 
     // walk the plan and create stages.
-    StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId());
+    StageNode globalStageRoot = walkRelPlan(relRootNode, getNewStageId());
 
     // global root needs to send results back to the ROOT, a.k.a. the client response node. the last stage only has one
     // receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default.
@@ -99,7 +100,7 @@ public class StagePlanner {
       _workerManager.assignWorkerToStage(e.getKey(), e.getValue());
     }
 
-    return new QueryPlan(_queryStageMap, _stageMetadataMap);
+    return new QueryPlan(relRoot.fields, _queryStageMap, _stageMetadataMap);
   }
 
   // non-threadsafe
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java
index f774976c21..cdbe6d0999 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.validate;
 
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
 import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
 import org.apache.calcite.sql.validate.SqlValidatorImpl;
 
@@ -31,6 +32,6 @@ public class Validator extends SqlValidatorImpl {
 
   public Validator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, RelDataTypeFactory typeFactory) {
     // TODO: support BABEL validator. Currently parser conformance is set to use BABEL.
-    super(opTab, catalogReader, typeFactory, Config.DEFAULT);
+    super(opTab, catalogReader, typeFactory, Config.DEFAULT.withSqlConformance(SqlConformanceEnum.LENIENT));
   }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index df98061a80..49d3943bd5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.service;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import java.util.ArrayList;
@@ -26,14 +27,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.util.Pair;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.Mailbox;
 import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
 import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datablock.BaseDataBlock;
 import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.QueryPlan;
@@ -59,7 +63,7 @@ public class QueryDispatcher {
   public QueryDispatcher() {
   }
 
-  public List<DataTable> submitAndReduce(long requestId, QueryPlan queryPlan,
+  public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan,
       MailboxService<Mailbox.MailboxContent> mailboxService, long timeoutNano)
       throws Exception {
     // submit all the distributed stages.
@@ -70,7 +74,8 @@ public class QueryDispatcher {
         queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
         requestId, reduceNode.getSenderStageId(), reduceNode.getDataSchema(), mailboxService.getHostname(),
         mailboxService.getMailboxPort());
-    return reduceMailboxReceive(mailboxReceiveOperator, timeoutNano);
+    List<DataTable> resultDataBlocks = reduceMailboxReceive(mailboxReceiveOperator, timeoutNano);
+    return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields());
   }
 
   public int submit(long requestId, QueryPlan queryPlan)
@@ -147,6 +152,41 @@ public class QueryDispatcher {
     return resultDataBlocks;
   }
 
+  public static ResultTable toResultTable(List<DataTable> queryResult, List<Pair<Integer, String>> fields) {
+    DataSchema resultSchema = null;
+    List<Object[]> resultRows = new ArrayList<>();
+    for (DataTable dataTable : queryResult) {
+      resultSchema = resultSchema == null ? toResultSchema(dataTable.getDataSchema(), fields) : resultSchema;
+      int numColumns = resultSchema.getColumnNames().length;
+      DataSchema.ColumnDataType[] resultColumnDataTypes = resultSchema.getColumnDataTypes();
+      List<Object[]> rows = new ArrayList<>(dataTable.getNumberOfRows());
+      for (int rowId = 0; rowId < dataTable.getNumberOfRows(); rowId++) {
+        Object[] row = new Object[numColumns];
+        Object[] rawRow = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
+        // Only the masked fields should be selected out.
+        int colId = 0;
+        for (Pair<Integer, String> field : fields) {
+          int colRef = field.left;
+          row[colId++] = resultColumnDataTypes[colRef].convertAndFormat(rawRow[colRef]);
+        }
+        rows.add(row);
+      }
+      resultRows.addAll(rows);
+    }
+    return new ResultTable(resultSchema, resultRows);
+  }
+
+  private static DataSchema toResultSchema(DataSchema inputSchema, List<Pair<Integer, String>> fields) {
+    String[] colNames = new String[fields.size()];
+    DataSchema.ColumnDataType[] colTypes = new DataSchema.ColumnDataType[fields.size()];
+    for (int i = 0; i < fields.size(); i++) {
+      colNames[i] = fields.get(i).right;
+      colTypes[i] = inputSchema.getColumnDataType(fields.get(i).left);
+    }
+    return new DataSchema(colNames, colTypes);
+  }
+
+  @VisibleForTesting
   public static MailboxReceiveOperator createReduceStageOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
       List<ServerInstance> sendingInstances, long jobId, int stageId, DataSchema dataSchema, String hostname,
       int port) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index d1feacf663..43ba64815d 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -76,7 +76,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
       }
     }
     Preconditions.checkNotNull(mailboxReceiveOperator);
-    return toRows(QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator));
+    return QueryDispatcher.toResultTable(QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator),
+        queryPlan.getQueryResultFields()).getRows();
   }
 
   private List<Object[]> queryH2(String sql)
@@ -262,9 +263,12 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
         new Object[]{"SELECT a.col2, COUNT(*), MAX(a.col3), MIN(a.col3), SUM(a.col3) FROM a GROUP BY a.col2 "
             + "HAVING COUNT(*) < 5 OR (COUNT(*) > 5 AND SUM(a.col3) >= 10)"
             + "OR (MIN(a.col3) != 20 AND SUM(a.col3) = 100)"},
+        new Object[]{"SELECT COUNT(*) AS Count, MAX(a.col3) AS \"max\" FROM a GROUP BY a.col2 "
+            + "HAVING Count > 1 AND \"max\" < 50"},
 
         // Order-by
         new Object[]{"SELECT a.col1, a.col3, b.col3 FROM a JOIN b ON a.col1 = b.col1 ORDER BY a.col3, b.col3 DESC"},
+        new Object[]{"SELECT MAX(a.col3) FROM a GROUP BY a.col2 ORDER BY MAX(a.col3) - MIN(a.col3)"},
 
         // Test CAST
         //   - implicit CAST
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index b3785ac108..702e7b466d 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironment;
@@ -49,7 +48,6 @@ import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 
-import static org.apache.pinot.core.query.selection.SelectionOperatorUtils.extractRowFromDataTable;
 
 
 public class QueryRunnerTestBase {
@@ -71,17 +69,6 @@ public class QueryRunnerTestBase {
 
   protected Connection _h2Connection;
 
-  protected static List<Object[]> toRows(List<DataTable> dataTables) {
-    List<Object[]> resultRows = new ArrayList<>();
-    for (DataTable dataTable : dataTables) {
-      int numRows = dataTable.getNumberOfRows();
-      for (int rowId = 0; rowId < numRows; rowId++) {
-        resultRows.add(extractRowFromDataTable(dataTable, rowId));
-      }
-    }
-    return resultRows;
-  }
-
   protected Connection getH2Connection() {
     Assert.assertNotNull(_h2Connection, "H2 Connection has not been initialized");
     return _h2Connection;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org