You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/09/16 22:47:09 UTC
[pinot] branch master updated: [multistage] test using BaseClusterIntegrationTestSet (#9412)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2d6665b8e5 [multistage] test using BaseClusterIntegrationTestSet (#9412)
2d6665b8e5 is described below
commit 2d6665b8e5fa0842ef67b3d9896c5e04ecad78e9
Author: Rong Rong <ro...@apache.org>
AuthorDate: Fri Sep 16 15:47:02 2022 -0700
[multistage] test using BaseClusterIntegrationTestSet (#9412)
by running the hard-coded queries list first
- make relRoot info available in query plan
- uses fields / collation / hints properly before returning results
- fix SQL conformance
- some other bug fixes
this PR also listed out TODOs for features we don't support right now.
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../MultiStageBrokerRequestHandler.java | 27 +----
.../tests/ClusterIntegrationTestUtils.java | 11 +-
.../pinot/integration/tests/ClusterTest.java | 15 +++
.../tests/BaseClusterIntegrationTestSet.java | 120 +++++++++++++--------
.../tests/MultiStageEngineIntegrationTest.java | 45 +++-----
.../PinotLogicalSortFetchEliminationRule.java | 56 ----------
.../org/apache/pinot/query/QueryEnvironment.java | 13 +--
.../org/apache/pinot/query/planner/QueryPlan.java | 19 +++-
.../pinot/query/planner/logical/StagePlanner.java | 7 +-
.../org/apache/pinot/query/validate/Validator.java | 3 +-
.../pinot/query/service/QueryDispatcher.java | 44 +++++++-
.../pinot/query/runtime/QueryRunnerTest.java | 6 +-
.../pinot/query/runtime/QueryRunnerTestBase.java | 13 ---
13 files changed, 194 insertions(+), 185 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 2554c9ed60..238ce36570 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -40,9 +40,7 @@ import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.request.RequestUtils;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.catalog.PinotCatalog;
@@ -156,7 +154,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
}
- List<DataTable> queryResults = null;
+ ResultTable queryResults;
try {
queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, _mailboxService, DEFAULT_TIMEOUT_NANO);
} catch (Exception e) {
@@ -171,7 +169,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(sqlNodeAndOptions.getParseTimeNs()
+ (executionEndTimeNs - compilationStartTimeNs));
brokerResponse.setTimeUsedMs(totalTimeMs);
- brokerResponse.setResultTable(toResultTable(queryResults));
+ brokerResponse.setResultTable(queryResults);
requestContext.setQueryProcessingTime(totalTimeMs);
augmentStatistics(requestContext, brokerResponse);
return brokerResponse;
@@ -197,27 +195,6 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
throw new UnsupportedOperationException();
}
- private ResultTable toResultTable(List<DataTable> queryResult) {
- DataSchema resultDataSchema = null;
- List<Object[]> resultRows = new ArrayList<>();
- for (DataTable dataTable : queryResult) {
- resultDataSchema = resultDataSchema == null ? dataTable.getDataSchema() : resultDataSchema;
- int numColumns = resultDataSchema.getColumnNames().length;
- DataSchema.ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes();
- List<Object[]> rows = new ArrayList<>(dataTable.getNumberOfRows());
- for (int rowId = 0; rowId < dataTable.getNumberOfRows(); rowId++) {
- Object[] row = new Object[numColumns];
- Object[] rawRow = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
- for (int i = 0; i < numColumns; i++) {
- row[i] = resultColumnDataTypes[i].convertAndFormat(rawRow[i]);
- }
- rows.add(row);
- }
- resultRows.addAll(rows);
- }
- return new ResultTable(resultDataSchema, resultRows);
- }
-
@Override
public void start() {
// no-op
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 21abfd6430..697efedb37 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -555,8 +555,15 @@ public class ClusterIntegrationTestUtils {
static void testQuery(String pinotQuery, String brokerUrl, org.apache.pinot.client.Connection pinotConnection,
String h2Query, Connection h2Connection, @Nullable Map<String, String> headers)
throws Exception {
+ testQuery(pinotQuery, brokerUrl, pinotConnection, h2Query, h2Connection, headers, null);
+ }
+
+ static void testQuery(String pinotQuery, String brokerUrl, org.apache.pinot.client.Connection pinotConnection,
+ String h2Query, Connection h2Connection, @Nullable Map<String, String> headers,
+ @Nullable Map<String, String> extraJsonProperties)
+ throws Exception {
// broker response
- JsonNode pinotResponse = ClusterTest.postQuery(pinotQuery, brokerUrl, headers);
+ JsonNode pinotResponse = ClusterTest.postQuery(pinotQuery, brokerUrl, headers, extraJsonProperties);
if (!pinotResponse.get("exceptions").isEmpty()) {
throw new RuntimeException("Got Exceptions from Query Response: " + pinotResponse);
}
@@ -638,7 +645,7 @@ public class ClusterIntegrationTestUtils {
return;
}
if (h2ResultSet.first()) {
- for (int i = 0; i < brokerResponseRows.size(); i++) {
+ for (int i = 0; i < numRows; i++) {
for (int c = 0; c < numColumns; c++) {
String h2Value = h2ResultSet.getString(c + 1);
String brokerValue = brokerResponseRows.get(i).get(c).asText();
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index e940fd0f06..7dfc3134e3 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -42,6 +42,7 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils;
import org.apache.http.Header;
import org.apache.http.HttpStatus;
@@ -448,8 +449,22 @@ public abstract class ClusterTest extends ControllerTest {
*/
public static JsonNode postQuery(String query, String brokerBaseApiUrl, Map<String, String> headers)
throws Exception {
+ return postQuery(query, brokerBaseApiUrl, headers, null);
+ }
+
+ /**
+ * Queries the broker's sql query endpoint (/sql)
+ */
+ public static JsonNode postQuery(String query, String brokerBaseApiUrl, Map<String, String> headers,
+ Map<String, String> extraJsonProperties)
+ throws Exception {
ObjectNode payload = JsonUtils.newObjectNode();
payload.put("sql", query);
+ if (MapUtils.isNotEmpty(extraJsonProperties)) {
+ for (Map.Entry<String, String> extraProperty :extraJsonProperties.entrySet()) {
+ payload.put(extraProperty.getKey(), extraProperty.getValue());
+ }
+ }
return JsonUtils.stringToJsonNode(sendPostRequest(brokerBaseApiUrl + "/query/sql", payload.toString(), headers));
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index f4d9ef0b62..70a5fb7604 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -99,6 +99,27 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
}, 600_000L, "Failed to delete table data managers");
}
+ /**
+ * Test features supported in V2 Multi-stage Engine.
+ * - Some V1 features will not be supported.
+ * - Some V1 features will be added as V2 engine feature development progresses.
+ * @throws Exception
+ */
+ public void testHardcodedQueriesMultiStage()
+ throws Exception {
+ testHardcodedQueriesCommon();
+ }
+
+ /**
+ * Test hard-coded queries.
+ * @throws Exception
+ */
+ public void testHardcodedQueries()
+ throws Exception {
+ testHardcodedQueriesCommon();
+ testHardCodedQueriesV1();
+ }
+
/**
* Test hardcoded queries.
* <p>NOTE:
@@ -118,9 +139,11 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
* TODO: Selection queries, Aggregation Group By queries, Order By, Distinct
* This list is very basic right now (aggregations only) and needs to be enriched
*/
- public void testHardcodedQueries()
+ private void testHardcodedQueriesCommon()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable WHERE CarrierDelay=15 AND ArrDelay > CarrierDelay LIMIT 1";
+ String query;
+ String h2Query;
+ query = "SELECT COUNT(*) FROM mytable WHERE CarrierDelay=15 AND ArrDelay > CarrierDelay LIMIT 1";
testQuery(query);
query = "SELECT ArrDelay, CarrierDelay, (ArrDelay - CarrierDelay) AS diff FROM mytable WHERE CarrierDelay=15 AND "
+ "ArrDelay > CarrierDelay ORDER BY diff, ArrDelay, CarrierDelay LIMIT 100000";
@@ -134,24 +157,6 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
query = "SELECT count(*) FROM mytable WHERE AirlineID > 20355 AND OriginState BETWEEN 'PA' AND 'DE' AND DepTime <> "
+ "2202 LIMIT 21";
testQuery(query);
- query =
- "SELECT SUM(CAST(CAST(ArrTime AS varchar) AS LONG)) FROM mytable WHERE DaysSinceEpoch <> 16312 AND Carrier = "
- + "'DL'";
- testQuery(query);
- query =
- "SELECT CAST(CAST(ArrTime AS varchar) AS LONG) FROM mytable WHERE DaysSinceEpoch <> 16312 AND Carrier = 'DL' "
- + "ORDER BY ArrTime DESC";
- testQuery(query);
- query =
- "SELECT DistanceGroup FROM mytable WHERE \"Month\" BETWEEN 1 AND 1 AND DivAirportSeqIDs IN (1078102, 1142303,"
- + " 1530402, 1172102, 1291503) OR SecurityDelay IN (1, 0, 14, -9999) LIMIT 10";
- String h2Query =
- "SELECT DistanceGroup FROM mytable WHERE Month BETWEEN 1 AND 1 AND (DivAirportSeqIDs__MV0 IN (1078102, "
- + "1142303, 1530402, 1172102, 1291503) OR DivAirportSeqIDs__MV1 IN (1078102, 1142303, 1530402, 1172102, "
- + "1291503) OR DivAirportSeqIDs__MV2 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR "
- + "DivAirportSeqIDs__MV3 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR DivAirportSeqIDs__MV4 IN "
- + "(1078102, 1142303, 1530402, 1172102, 1291503)) OR SecurityDelay IN (1, 0, 14, -9999) LIMIT 10000";
- testQuery(query, h2Query);
query = "SELECT MAX(Quarter), MAX(FlightNum) FROM mytable LIMIT 8";
h2Query = "SELECT MAX(Quarter),MAX(FlightNum) FROM mytable LIMIT 10000";
testQuery(query, h2Query);
@@ -181,22 +186,7 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
testQuery(query);
query = "SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10 FROM mytable WHERE ArrTime - 100 > 0";
testQuery(query);
- query =
- "SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10, ADD(ArrTime + 5, ArrDelay), ADD(ArrTime * 5, ArrDelay)"
- + " FROM mytable WHERE mult((ArrTime - 100), (5 + ArrDelay))> 0";
- h2Query =
- "SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10, ArrTime + 5 + ArrDelay, ArrTime * 5 + ArrDelay FROM "
- + "mytable WHERE (ArrTime - 100) * (5 + ArrDelay)> 0";
- testQuery(query, h2Query);
- query = "SELECT COUNT(*) AS \"date\", MAX(ArrTime) AS \"group\", MIN(ArrTime) AS min FROM myTable";
- testQuery(query);
-
- // LIKE
- query = "SELECT count(*) FROM mytable WHERE OriginState LIKE 'A_'";
- testQuery(query);
- query = "SELECT count(*) FROM mytable WHERE DestCityName LIKE 'C%'";
- testQuery(query);
- query = "SELECT count(*) FROM mytable WHERE DestCityName LIKE '_h%'";
+ query = "SELECT COUNT(*) AS \"date\", MAX(ArrTime) AS \"group\", MIN(ArrTime) AS \"min\" FROM mytable";
testQuery(query);
// NOT
@@ -235,7 +225,59 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
query = "SELECT DaysSinceEpoch, MAX(ArrDelay) - MAX(AirTime) AS Diff FROM mytable GROUP BY DaysSinceEpoch HAVING "
+ "(Diff >= 300 AND Diff < 500) OR Diff < -500 ORDER BY Diff DESC";
testQuery(query);
+ }
+
+ private void testHardCodedQueriesV1()
+ throws Exception {
+ String query;
+ String h2Query;
+ // Escape quotes
+ // TODO: move to common when multistage support correct escaping strategy.
+ query = "SELECT DistanceGroup FROM mytable WHERE DATE_TIME_CONVERT(DaysSinceEpoch, '1:DAYS:EPOCH', "
+ + "'1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd''T''HH:mm:ss.SSS''Z''', '1:DAYS') = '2014-09-05T00:00:00.000Z'";
+ h2Query = "SELECT DistanceGroup FROM mytable WHERE DaysSinceEpoch = 16318 LIMIT 10000";
+ testQuery(query, h2Query);
+
+ // LIKE
+ // TODO: move to common when multistage support LIKE
+ query = "SELECT count(*) FROM mytable WHERE OriginState LIKE 'A_'";
+ testQuery(query);
+ query = "SELECT count(*) FROM mytable WHERE DestCityName LIKE 'C%'";
+ testQuery(query);
+ query = "SELECT count(*) FROM mytable WHERE DestCityName LIKE '_h%'";
+ testQuery(query);
+ // Non-Standard functions
+ // mult is not a standard function.
+ query =
+ "SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10, ADD(ArrTime + 5, ArrDelay), ADD(ArrTime * 5, ArrDelay)"
+ + " FROM mytable WHERE mult((ArrTime - 100), (5 + ArrDelay))> 0";
+ h2Query =
+ "SELECT ArrTime, ArrTime + ArrTime * 9 - ArrTime * 10, ArrTime + 5 + ArrDelay, ArrTime * 5 + ArrDelay FROM "
+ + "mytable WHERE (ArrTime - 100) * (5 + ArrDelay)> 0";
+ testQuery(query, h2Query);
+ // TODO: move to common when multistage support CAST AS 'LONG', for now it must use: CAST AS BIGINT
+ query =
+ "SELECT SUM(CAST(CAST(ArrTime AS varchar) AS LONG)) FROM mytable WHERE DaysSinceEpoch <> 16312 AND Carrier = "
+ + "'DL'";
+ testQuery(query);
+ query =
+ "SELECT CAST(CAST(ArrTime AS varchar) AS LONG) FROM mytable WHERE DaysSinceEpoch <> 16312 AND Carrier = 'DL' "
+ + "ORDER BY ArrTime DESC";
+ testQuery(query);
+ // TODO: move to common when multistage support MV columns
+ query =
+ "SELECT DistanceGroup FROM mytable WHERE \"Month\" BETWEEN 1 AND 1 AND DivAirportSeqIDs IN (1078102, 1142303,"
+ + " 1530402, 1172102, 1291503) OR SecurityDelay IN (1, 0, 14, -9999) LIMIT 10";
+ h2Query =
+ "SELECT DistanceGroup FROM mytable WHERE Month BETWEEN 1 AND 1 AND (DivAirportSeqIDs__MV0 IN (1078102, "
+ + "1142303, 1530402, 1172102, 1291503) OR DivAirportSeqIDs__MV1 IN (1078102, 1142303, 1530402, 1172102, "
+ + "1291503) OR DivAirportSeqIDs__MV2 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR "
+ + "DivAirportSeqIDs__MV3 IN (1078102, 1142303, 1530402, 1172102, 1291503) OR DivAirportSeqIDs__MV4 IN "
+ + "(1078102, 1142303, 1530402, 1172102, 1291503)) OR SecurityDelay IN (1, 0, 14, -9999) LIMIT 10000";
+ testQuery(query, h2Query);
+
+ // Non-Standard SQL syntax:
// IN_ID_SET
{
IdSet idSet = IdSets.create(FieldSpec.DataType.LONG);
@@ -270,12 +312,6 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
+ "DaysSinceEpoch = 16430)";
testQuery(notInSubqueryQuery, notInQuery);
}
-
- // Escape quotes
- query = "SELECT DistanceGroup FROM mytable WHERE DATE_TIME_CONVERT(DaysSinceEpoch, '1:DAYS:EPOCH', "
- + "'1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd''T''HH:mm:ss.SSS''Z''', '1:DAYS') = '2014-09-05T00:00:00.000Z'";
- h2Query = "SELECT DistanceGroup FROM mytable WHERE DaysSinceEpoch = 16318 LIMIT 10000";
- testQuery(query, h2Query);
}
/**
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
index a639023b69..93934523a6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java
@@ -18,9 +18,8 @@
*/
package org.apache.pinot.integration.tests;
-import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.ImmutableMap;
import java.io.File;
-import java.io.IOException;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.core.common.datatable.DataTableFactory;
@@ -29,16 +28,13 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTest {
+public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestSet {
private static final String SCHEMA_FILE_NAME =
"On_Time_On_Time_Performance_2014_100k_subset_nonulls_single_value_columns.schema";
@@ -84,6 +80,13 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTest
DataTableFactory.setDataTableVersion(4);
}
+ @Test
+ @Override
+ public void testHardcodedQueriesMultiStage()
+ throws Exception {
+ super.testHardcodedQueriesMultiStage();
+ }
+
@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
@@ -97,33 +100,13 @@ public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTest
serverConf.setProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, 8422);
}
- @Test(dataProvider = "multiStageQueryEngineSqlTestSet")
- public void testMultiStageQuery(String sql, int expectedNumOfRows, int expectedNumOfColumns)
- throws IOException {
- JsonNode multiStageResponse = JsonUtils.stringToJsonNode(
- sendPostRequest(_brokerBaseApiUrl + "/query/sql",
- "{\"queryOptions\":\"useMultistageEngine=true\", \"sql\":\"" + sql + "\"}"));
- Assert.assertTrue(multiStageResponse.has("resultTable"));
- JsonNode jsonNode = multiStageResponse.get("resultTable");
- // TODO: assert actual result data payload.
- Assert.assertEquals(jsonNode.get("rows").size(), expectedNumOfRows);
- Assert.assertEquals(jsonNode.get("dataSchema").get("columnNames").size(), expectedNumOfColumns);
- }
-
- @DataProvider
- public Object[][] multiStageQueryEngineSqlTestSet() {
- return new Object[][] {
- new Object[]{"SELECT * FROM mytable_OFFLINE WHERE ArrDelay>10000", 0, 73},
- new Object[]{"SELECT COUNT(*) FROM mytable_OFFLINE WHERE Carrier='AA'", 1, 1},
- new Object[]{"SELECT * FROM mytable_OFFLINE WHERE ArrDelay>1000", 2, 73},
- new Object[]{"SELECT CarrierDelay, ArrDelay FROM mytable_OFFLINE"
- + " WHERE CarrierDelay=15 AND ArrDelay>20", 172, 2},
- new Object[]{"SELECT * FROM mytable_OFFLINE AS a JOIN mytable_OFFLINE AS b ON a.Origin = b.Origin "
- + " WHERE a.Carrier='AA' AND a.ArrDelay>1000 AND b.ArrDelay>1000", 2, 146}
- };
+ @Override
+ protected void testQuery(String pinotQuery, String h2Query)
+ throws Exception {
+ ClusterIntegrationTestUtils.testQuery(pinotQuery, _brokerBaseApiUrl, getPinotConnection(), h2Query,
+ getH2Connection(), null, ImmutableMap.of("queryOptions", "useMultistageEngine=true"));
}
-
@AfterClass
public void tearDown()
throws Exception {
diff --git a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotLogicalSortFetchEliminationRule.java b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotLogicalSortFetchEliminationRule.java
deleted file mode 100644
index e6a32c387b..0000000000
--- a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotLogicalSortFetchEliminationRule.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.calcite.rel.rules;
-
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.logical.LogicalSort;
-import org.apache.calcite.tools.RelBuilderFactory;
-
-
-/**
- * Special rule for Pinot, Pinot's top level sort fetch doesn't guarantee order without order by clause.
- */
-public class PinotLogicalSortFetchEliminationRule extends RelOptRule {
- public static final PinotLogicalSortFetchEliminationRule INSTANCE =
- new PinotLogicalSortFetchEliminationRule(PinotRuleUtils.PINOT_REL_FACTORY);
-
- public PinotLogicalSortFetchEliminationRule(RelBuilderFactory factory) {
- super(operand(LogicalSort.class, any()), factory, null);
- }
-
- @Override
- public boolean matches(RelOptRuleCall call) {
- if (call.rels.length < 1) {
- return false;
- }
- if (call.rel(0) instanceof LogicalSort) {
- Sort sort = call.rel(0);
- return sort.collation.getFieldCollations().size() == 0;
- }
- return false;
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- Sort sort = call.rel(0);
- call.transformTo(sort.getInputs().get(0));
- }
-}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 87156476e9..0a661ac378 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -125,7 +125,7 @@ public class QueryEnvironment {
public QueryPlan planQuery(String sqlQuery, SqlNodeAndOptions sqlNodeAndOptions) {
try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
- RelNode relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), plannerContext);
+ RelRoot relRoot = compileQuery(sqlNodeAndOptions.getSqlNode(), plannerContext);
return toDispatchablePlan(relRoot, plannerContext);
} catch (Exception e) {
throw new RuntimeException("Error composing query plan for: " + sqlQuery, e);
@@ -147,11 +147,11 @@ public class QueryEnvironment {
try (PlannerContext plannerContext = new PlannerContext(_config, _catalogReader, _typeFactory, _hepProgram)) {
SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
plannerContext.setOptions(sqlNodeAndOptions.getOptions());
- RelNode relRoot = compileQuery(explain.getExplicandum(), plannerContext);
+ RelRoot relRoot = compileQuery(explain.getExplicandum(), plannerContext);
SqlExplainFormat format = explain.getFormat() == null ? SqlExplainFormat.DOT : explain.getFormat();
SqlExplainLevel level =
explain.getDetailLevel() == null ? SqlExplainLevel.DIGEST_ATTRIBUTES : explain.getDetailLevel();
- return PlannerUtils.explainPlan(relRoot, format, level);
+ return PlannerUtils.explainPlan(relRoot.rel, format, level);
} catch (Exception e) {
throw new RuntimeException("Error explain query plan for: " + sqlQuery, e);
}
@@ -172,11 +172,12 @@ public class QueryEnvironment {
// --------------------------------------------------------------------------
@VisibleForTesting
- protected RelNode compileQuery(SqlNode sqlNode, PlannerContext plannerContext)
+ protected RelRoot compileQuery(SqlNode sqlNode, PlannerContext plannerContext)
throws Exception {
SqlNode validated = validate(sqlNode, plannerContext);
RelRoot relation = toRelation(validated, plannerContext);
- return optimize(relation, plannerContext);
+ RelNode optimized = optimize(relation, plannerContext);
+ return relation.withRel(optimized);
}
private SqlNode validate(SqlNode parsed, PlannerContext plannerContext)
@@ -212,7 +213,7 @@ public class QueryEnvironment {
}
}
- private QueryPlan toDispatchablePlan(RelNode relRoot, PlannerContext plannerContext) {
+ private QueryPlan toDispatchablePlan(RelRoot relRoot, PlannerContext plannerContext) {
// 5. construct a dispatchable query plan.
StagePlanner queryStagePlanner = new StagePlanner(plannerContext, _workerManager);
return queryStagePlanner.makePlan(relRoot);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
index a93bddc4f7..b13194f4b8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.query.planner;
+import java.util.List;
import java.util.Map;
+import org.apache.calcite.util.Pair;
import org.apache.pinot.query.planner.logical.LogicalPlanner;
import org.apache.pinot.query.planner.stage.StageNode;
@@ -35,10 +37,13 @@ import org.apache.pinot.query.planner.stage.StageNode;
* </ul>
*/
public class QueryPlan {
- private Map<Integer, StageNode> _queryStageMap;
- private Map<Integer, StageMetadata> _stageMetadataMap;
+ private final List<Pair<Integer, String>> _queryResultFields;
+ private final Map<Integer, StageNode> _queryStageMap;
+ private final Map<Integer, StageMetadata> _stageMetadataMap;
- public QueryPlan(Map<Integer, StageNode> queryStageMap, Map<Integer, StageMetadata> stageMetadataMap) {
+ public QueryPlan(List<Pair<Integer, String>> fields, Map<Integer, StageNode> queryStageMap,
+ Map<Integer, StageMetadata> stageMetadataMap) {
+ _queryResultFields = fields;
_queryStageMap = queryStageMap;
_stageMetadataMap = stageMetadataMap;
}
@@ -58,4 +63,12 @@ public class QueryPlan {
public Map<Integer, StageMetadata> getStageMetadataMap() {
return _stageMetadataMap;
}
+
+ /**
+ * Get the query result field.
+ * @return query result field.
+ */
+ public List<Pair<Integer, String>> getQueryResultFields() {
+ return _queryResultFields;
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 6212f68fa0..dd03a1cef8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -67,7 +67,8 @@ public class StagePlanner {
* @param relRoot relational plan root.
* @return dispatchable plan.
*/
- public QueryPlan makePlan(RelNode relRoot) {
+ public QueryPlan makePlan(RelRoot relRoot) {
+ RelNode relRootNode = relRoot.rel;
// clear the state
_queryStageMap = new HashMap<>();
_stageMetadataMap = new HashMap<>();
@@ -75,7 +76,7 @@ public class StagePlanner {
_stageIdCounter = 1;
// walk the plan and create stages.
- StageNode globalStageRoot = walkRelPlan(relRoot, getNewStageId());
+ StageNode globalStageRoot = walkRelPlan(relRootNode, getNewStageId());
// global root needs to send results back to the ROOT, a.k.a. the client response node. the last stage only has one
// receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default.
@@ -99,7 +100,7 @@ public class StagePlanner {
_workerManager.assignWorkerToStage(e.getKey(), e.getValue());
}
- return new QueryPlan(_queryStageMap, _stageMetadataMap);
+ return new QueryPlan(relRoot.fields, _queryStageMap, _stageMetadataMap);
}
// non-threadsafe
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java
index f774976c21..cdbe6d0999 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/validate/Validator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.validate;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
import org.apache.calcite.sql.validate.SqlValidatorImpl;
@@ -31,6 +32,6 @@ public class Validator extends SqlValidatorImpl {
public Validator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, RelDataTypeFactory typeFactory) {
// TODO: support BABEL validator. Currently parser conformance is set to use BABEL.
- super(opTab, catalogReader, typeFactory, Config.DEFAULT);
+ super(opTab, catalogReader, typeFactory, Config.DEFAULT.withSqlConformance(SqlConformanceEnum.LENIENT));
}
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
index df98061a80..49d3943bd5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.service;
+import com.google.common.annotations.VisibleForTesting;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.ArrayList;
@@ -26,14 +27,17 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.util.Pair;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.common.datablock.DataBlockUtils;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.QueryPlan;
@@ -59,7 +63,7 @@ public class QueryDispatcher {
public QueryDispatcher() {
}
- public List<DataTable> submitAndReduce(long requestId, QueryPlan queryPlan,
+ public ResultTable submitAndReduce(long requestId, QueryPlan queryPlan,
MailboxService<Mailbox.MailboxContent> mailboxService, long timeoutNano)
throws Exception {
// submit all the distributed stages.
@@ -70,7 +74,8 @@ public class QueryDispatcher {
queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(),
requestId, reduceNode.getSenderStageId(), reduceNode.getDataSchema(), mailboxService.getHostname(),
mailboxService.getMailboxPort());
- return reduceMailboxReceive(mailboxReceiveOperator, timeoutNano);
+ List<DataTable> resultDataBlocks = reduceMailboxReceive(mailboxReceiveOperator, timeoutNano);
+ return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields());
}
public int submit(long requestId, QueryPlan queryPlan)
@@ -147,6 +152,41 @@ public class QueryDispatcher {
return resultDataBlocks;
}
+ public static ResultTable toResultTable(List<DataTable> queryResult, List<Pair<Integer, String>> fields) {
+ DataSchema resultSchema = null;
+ List<Object[]> resultRows = new ArrayList<>();
+ for (DataTable dataTable : queryResult) {
+ resultSchema = resultSchema == null ? toResultSchema(dataTable.getDataSchema(), fields) : resultSchema;
+ int numColumns = resultSchema.getColumnNames().length;
+ DataSchema.ColumnDataType[] resultColumnDataTypes = resultSchema.getColumnDataTypes();
+ List<Object[]> rows = new ArrayList<>(dataTable.getNumberOfRows());
+ for (int rowId = 0; rowId < dataTable.getNumberOfRows(); rowId++) {
+ Object[] row = new Object[numColumns];
+ Object[] rawRow = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId);
+ // Only the masked fields should be selected out.
+ int colId = 0;
+ for (Pair<Integer, String> field : fields) {
+ int colRef = field.left;
+ row[colId++] = resultColumnDataTypes[colRef].convertAndFormat(rawRow[colRef]);
+ }
+ rows.add(row);
+ }
+ resultRows.addAll(rows);
+ }
+ return new ResultTable(resultSchema, resultRows);
+ }
+
+ private static DataSchema toResultSchema(DataSchema inputSchema, List<Pair<Integer, String>> fields) {
+ String[] colNames = new String[fields.size()];
+ DataSchema.ColumnDataType[] colTypes = new DataSchema.ColumnDataType[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ colNames[i] = fields.get(i).right;
+ colTypes[i] = inputSchema.getColumnDataType(fields.get(i).left);
+ }
+ return new DataSchema(colNames, colTypes);
+ }
+
+ @VisibleForTesting
public static MailboxReceiveOperator createReduceStageOperator(MailboxService<Mailbox.MailboxContent> mailboxService,
List<ServerInstance> sendingInstances, long jobId, int stageId, DataSchema dataSchema, String hostname,
int port) {
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index d1feacf663..43ba64815d 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -76,7 +76,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
}
}
Preconditions.checkNotNull(mailboxReceiveOperator);
- return toRows(QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator));
+ return QueryDispatcher.toResultTable(QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator),
+ queryPlan.getQueryResultFields()).getRows();
}
private List<Object[]> queryH2(String sql)
@@ -262,9 +263,12 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
new Object[]{"SELECT a.col2, COUNT(*), MAX(a.col3), MIN(a.col3), SUM(a.col3) FROM a GROUP BY a.col2 "
+ "HAVING COUNT(*) < 5 OR (COUNT(*) > 5 AND SUM(a.col3) >= 10)"
+ "OR (MIN(a.col3) != 20 AND SUM(a.col3) = 100)"},
+ new Object[]{"SELECT COUNT(*) AS Count, MAX(a.col3) AS \"max\" FROM a GROUP BY a.col2 "
+ + "HAVING Count > 1 AND \"max\" < 50"},
// Order-by
new Object[]{"SELECT a.col1, a.col3, b.col3 FROM a JOIN b ON a.col1 = b.col1 ORDER BY a.col3, b.col3 DESC"},
+ new Object[]{"SELECT MAX(a.col3) FROM a GROUP BY a.col2 ORDER BY MAX(a.col3) - MIN(a.col3)"},
// Test CAST
// - implicit CAST
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index b3785ac108..702e7b466d 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -31,7 +31,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
@@ -49,7 +48,6 @@ import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
-import static org.apache.pinot.core.query.selection.SelectionOperatorUtils.extractRowFromDataTable;
public class QueryRunnerTestBase {
@@ -71,17 +69,6 @@ public class QueryRunnerTestBase {
protected Connection _h2Connection;
- protected static List<Object[]> toRows(List<DataTable> dataTables) {
- List<Object[]> resultRows = new ArrayList<>();
- for (DataTable dataTable : dataTables) {
- int numRows = dataTable.getNumberOfRows();
- for (int rowId = 0; rowId < numRows; rowId++) {
- resultRows.add(extractRowFromDataTable(dataTable, rowId));
- }
- }
- return resultRows;
- }
-
protected Connection getH2Connection() {
Assert.assertNotNull(_h2Connection, "H2 Connection has not been initialized");
return _h2Connection;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org