You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/09/28 18:25:38 UTC
[incubator-pinot] 01/01: Add option to fail query when column
mismatches
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch throw-exception-when-column-mismatch
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit e0987e566749cf93c2a60f36eb32b241b3b3374b
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Mon Sep 28 11:24:53 2020 -0700
Add option to fail query when column mismatches
---
.../requesthandler/BaseBrokerRequestHandler.java | 99 +++++++++++++++-------
.../pinot/common/exception/QueryException.java | 8 +-
.../pinot/common/utils/helix/TableCache.java | 12 +--
.../pinot/pql/parsers/pql2/ast/TopAstNode.java | 2 +-
.../api/resources/PinotQueryResource.java | 2 +-
.../tests/OfflineClusterIntegrationTest.java | 14 ++-
6 files changed, 95 insertions(+), 42 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 4925781..1e9f0b7 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -46,6 +46,7 @@ import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.RoutingManager;
import org.apache.pinot.broker.routing.RoutingTable;
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
+import org.apache.pinot.common.Utils;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.metrics.BrokerMeter;
@@ -74,6 +75,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.helix.TableCache;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
import org.apache.pinot.core.query.reduce.BrokerReduceService;
import org.apache.pinot.core.requesthandler.BrokerRequestOptimizer;
import org.apache.pinot.core.requesthandler.PinotQueryParserFactory;
@@ -182,8 +184,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
} catch (Exception e) {
LOGGER.info("Caught exception while compiling request {}: {}, {}", requestId, query, e.getMessage());
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1);
- requestStatistics.setErrorCode(QueryException.PQL_PARSING_ERROR_CODE);
- return new BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR, e));
+ requestStatistics.setErrorCode(QueryException.QUERY_PARSING_ERROR_CODE);
+ return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_PARSING_ERROR, e));
}
if (isLiteralOnlyQuery(brokerRequest)) {
LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query);
@@ -196,9 +198,15 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
e.getMessage());
}
}
+
+ // Set extra settings into broker request
+ setOptions(requestId, query, request, brokerRequest);
+
updateTableName(brokerRequest);
try {
updateColumnNames(brokerRequest);
+ } catch(BadQueryRequestException be) {
+ return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_PARSING_ERROR, be));
} catch (Exception e) {
LOGGER.warn("Caught exception while updating Column names in Query {}: {}, {}", requestId, query, e);
}
@@ -283,9 +291,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e));
}
- // Set extra settings into broker request
- setOptions(requestId, query, request, brokerRequest);
-
// Optimize the query
// TODO: get time column name from schema or table config so that we can apply it for REALTIME only case
// We get timeColumnName from time boundary service currently, which only exists for offline table
@@ -739,12 +744,16 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
String rawTableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName());
Map<String, String> columnNameMap =
_tableCache.isCaseInsensitive() ? _tableCache.getColumnNameMap(rawTableName) : null;
+ Map<String, String> queryOptions = brokerRequest.getQueryOptions();
+ boolean failQueryWhenColumnMismatch =
+ (queryOptions != null && Boolean.parseBoolean(queryOptions.get("failQueryWhenColumnMismatch")));
if (brokerRequest.getFilterSubQueryMap() != null) {
Collection<FilterQuery> values = brokerRequest.getFilterSubQueryMap().getFilterQueryMap().values();
for (FilterQuery filterQuery : values) {
if (filterQuery.getNestedFilterQueryIdsSize() == 0) {
- filterQuery.setColumn(fixColumnName(rawTableName, filterQuery.getColumn(), columnNameMap));
+ filterQuery.setColumn(
+ fixColumnName(rawTableName, filterQuery.getColumn(), columnNameMap, failQueryWhenColumnMismatch));
}
}
}
@@ -753,14 +762,22 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
if (!info.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
// Always read from backward compatible api in AggregationFunctionUtils.
List<String> arguments = AggregationFunctionUtils.getArguments(info);
- arguments.replaceAll(e -> fixColumnName(rawTableName, e, columnNameMap));
+ arguments.replaceAll(e -> {
+ try {
+ return fixColumnName(rawTableName, e, columnNameMap, failQueryWhenColumnMismatch);
+ } catch (Exception ex) {
+ Utils.rethrowException(ex);
+ throw new AssertionError("Should not reach this");
+ }
+ });
info.setExpressions(arguments);
}
}
if (brokerRequest.isSetGroupBy()) {
List<String> expressions = brokerRequest.getGroupBy().getExpressions();
for (int i = 0; i < expressions.size(); i++) {
- expressions.set(i, fixColumnName(rawTableName, expressions.get(i), columnNameMap));
+ expressions
+ .set(i, fixColumnName(rawTableName, expressions.get(i), columnNameMap, failQueryWhenColumnMismatch));
}
}
} else {
@@ -769,7 +786,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
for (int i = 0; i < selectionColumns.size(); i++) {
String expression = selectionColumns.get(i);
if (!expression.equals("*")) {
- selectionColumns.set(i, fixColumnName(rawTableName, expression, columnNameMap));
+ selectionColumns.set(i, fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch));
}
}
}
@@ -777,78 +794,98 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
List<SelectionSort> orderBy = brokerRequest.getOrderBy();
for (SelectionSort selectionSort : orderBy) {
String expression = selectionSort.getColumn();
- selectionSort.setColumn(fixColumnName(rawTableName, expression, columnNameMap));
+ selectionSort.setColumn(fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch));
}
}
PinotQuery pinotQuery = brokerRequest.getPinotQuery();
if (pinotQuery != null) {
for (Expression expression : pinotQuery.getSelectList()) {
- fixColumnName(rawTableName, expression, columnNameMap);
+ fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch);
}
Expression filterExpression = pinotQuery.getFilterExpression();
if (filterExpression != null) {
- fixColumnName(rawTableName, filterExpression, columnNameMap);
+ fixColumnName(rawTableName, filterExpression, columnNameMap, failQueryWhenColumnMismatch);
}
List<Expression> groupByList = pinotQuery.getGroupByList();
if (groupByList != null) {
for (Expression expression : groupByList) {
- fixColumnName(rawTableName, expression, columnNameMap);
+ fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch);
}
}
List<Expression> orderByList = pinotQuery.getOrderByList();
if (orderByList != null) {
for (Expression expression : orderByList) {
- fixColumnName(rawTableName, expression, columnNameMap);
+ fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch);
}
}
Expression havingExpression = pinotQuery.getHavingExpression();
if (havingExpression != null) {
- fixColumnName(rawTableName, havingExpression, columnNameMap);
+ fixColumnName(rawTableName, havingExpression, columnNameMap, failQueryWhenColumnMismatch);
}
}
}
- private String fixColumnName(String rawTableName, String expression, @Nullable Map<String, String> columnNameMap) {
+ private String fixColumnName(String rawTableName, String expression, @Nullable Map<String, String> columnNameMap,
+ boolean failQueryWhenColumnMismatch) {
TransformExpressionTree expressionTree = TransformExpressionTree.compileToExpressionTree(expression);
- fixColumnName(rawTableName, expressionTree, columnNameMap);
+ fixColumnName(rawTableName, expressionTree, columnNameMap, failQueryWhenColumnMismatch);
return expressionTree.toString();
}
private void fixColumnName(String rawTableName, TransformExpressionTree expression,
- @Nullable Map<String, String> columnNameMap) {
+ @Nullable Map<String, String> columnNameMap, boolean failQueryWhenColumnMismatch) {
TransformExpressionTree.ExpressionType expressionType = expression.getExpressionType();
if (expressionType == TransformExpressionTree.ExpressionType.IDENTIFIER) {
- expression.setValue(getActualColumnName(rawTableName, expression.getValue(), columnNameMap));
+ expression.setValue(
+ getActualColumnName(rawTableName, expression.getValue(), columnNameMap, failQueryWhenColumnMismatch));
} else if (expressionType == TransformExpressionTree.ExpressionType.FUNCTION) {
for (TransformExpressionTree child : expression.getChildren()) {
- fixColumnName(rawTableName, child, columnNameMap);
+ fixColumnName(rawTableName, child, columnNameMap, failQueryWhenColumnMismatch);
}
}
}
- private void fixColumnName(String rawTableName, Expression expression, @Nullable Map<String, String> columnNameMap) {
+ private void fixColumnName(String rawTableName, Expression expression, @Nullable Map<String, String> columnNameMap,
+ boolean throwExceptionWhenColumnNameMismatch) {
ExpressionType expressionType = expression.getType();
if (expressionType == ExpressionType.IDENTIFIER) {
Identifier identifier = expression.getIdentifier();
- identifier.setName(getActualColumnName(rawTableName, identifier.getName(), columnNameMap));
+ identifier.setName(
+ getActualColumnName(rawTableName, identifier.getName(), columnNameMap, throwExceptionWhenColumnNameMismatch));
} else if (expressionType == ExpressionType.FUNCTION) {
- for (Expression operand : expression.getFunctionCall().getOperands()) {
- fixColumnName(rawTableName, operand, columnNameMap);
+ String operator = expression.getFunctionCall().getOperator();
+ List<Expression> expressions = expression.getFunctionCall().getOperands();
+ if ("AS".equals(operator)) {
+ fixColumnName(rawTableName, expressions.get(0), columnNameMap, throwExceptionWhenColumnNameMismatch);
+ } else if (!isCountStarFromExpression(expression)) {
+ for (Expression operand : expression.getFunctionCall().getOperands()) {
+ fixColumnName(rawTableName, operand, columnNameMap, throwExceptionWhenColumnNameMismatch);
+ }
}
}
}
- private String getActualColumnName(String rawTableName, String columnName,
- @Nullable Map<String, String> columnNameMap) {
+ private boolean isCountStarFromExpression(Expression expression) {
+ String operator = expression.getFunctionCall().getOperator();
+ List<Expression> expressions = expression.getFunctionCall().getOperands();
+ return "COUNT".equals(operator) && expressions.size() == 1 && "*"
+ .equals(expressions.get(0).getIdentifier().getName());
+ }
+
+ private String getActualColumnName(String rawTableName, String columnName, Map<String, String> columnNameMap,
+ boolean failQueryWhenColumnMismatch) {
// Check if column is in the format of [table_name].[column_name]
String[] splits = StringUtils.split(columnName, ".", 2);
if (_tableCache.isCaseInsensitive()) {
if (splits.length == 2 && rawTableName.equalsIgnoreCase(splits[0])) {
columnName = splits[1];
}
- if (columnNameMap != null) {
- return columnNameMap.getOrDefault(columnName, columnName);
+ String actualColumnName = columnNameMap.get(columnName.toLowerCase());
+ if (actualColumnName != null) {
+ return actualColumnName;
+ } else if (failQueryWhenColumnMismatch) {
+ throw new BadQueryRequestException("Invalid column name in the query: " + columnName);
} else {
return columnName;
}
@@ -856,7 +893,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
if (splits.length == 2 && rawTableName.equals(splits[0])) {
columnName = splits[1];
}
- return columnName;
+ if (!columnNameMap.containsKey(columnName) && failQueryWhenColumnMismatch) {
+ throw new BadQueryRequestException("Invalid column name in the query: " + columnName);
+ } else {
+ return columnName;
+ }
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index 10a1544..7e05582 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -37,7 +37,7 @@ public class QueryException {
// TODO: several ProcessingExceptions are never used, clean them up.
public static final int JSON_PARSING_ERROR_CODE = 100;
public static final int JSON_COMPILATION_ERROR_CODE = 101;
- public static final int PQL_PARSING_ERROR_CODE = 150;
+ public static final int QUERY_PARSING_ERROR_CODE = 150;
public static final int SEGMENT_PLAN_EXECUTION_ERROR_CODE = 160;
public static final int COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR_CODE = 170;
public static final int ACCESS_DENIED_ERROR_CODE = 180;
@@ -66,7 +66,7 @@ public class QueryException {
public static final ProcessingException JSON_PARSING_ERROR = new ProcessingException(JSON_PARSING_ERROR_CODE);
public static final ProcessingException JSON_COMPILATION_ERROR = new ProcessingException(JSON_COMPILATION_ERROR_CODE);
- public static final ProcessingException PQL_PARSING_ERROR = new ProcessingException(PQL_PARSING_ERROR_CODE);
+ public static final ProcessingException QUERY_PARSING_ERROR = new ProcessingException(QUERY_PARSING_ERROR_CODE);
public static final ProcessingException ACCESS_DENIED_ERROR = new ProcessingException(ACCESS_DENIED_ERROR_CODE);
public static final ProcessingException SEGMENT_PLAN_EXECUTION_ERROR =
new ProcessingException(SEGMENT_PLAN_EXECUTION_ERROR_CODE);
@@ -105,7 +105,7 @@ public class QueryException {
static {
JSON_PARSING_ERROR.setMessage("JsonParsingError");
JSON_COMPILATION_ERROR.setMessage("JsonCompilationError");
- PQL_PARSING_ERROR.setMessage("PQLParsingError");
+ QUERY_PARSING_ERROR.setMessage("QueryParsingError");
SEGMENT_PLAN_EXECUTION_ERROR.setMessage("SegmentPlanExecutionError");
COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR.setMessage("CombineSegmentPlanTimeoutError");
QUERY_EXECUTION_ERROR.setMessage("QueryExecutionError");
@@ -168,7 +168,7 @@ public class QueryException {
case QueryException.JSON_COMPILATION_ERROR_CODE:
case QueryException.JSON_PARSING_ERROR_CODE:
case QueryException.QUERY_VALIDATION_ERROR_CODE:
- case QueryException.PQL_PARSING_ERROR_CODE:
+ case QueryException.QUERY_PARSING_ERROR_CODE:
case QueryException.TOO_MANY_REQUESTS_ERROR_CODE:
return true;
default:
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
index dbd360d..74bb3e8 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
@@ -216,15 +216,15 @@ public class TableCache {
throws IOException {
Schema schema = SchemaUtils.fromZNRecord(znRecord);
String rawTableName = schema.getSchemaName();
- if (_caseInsensitive) {
- Map<String, String> columnNameMap = new HashMap<>();
- for (String columnName : schema.getColumnNames()) {
+ Map<String, String> columnNameMap = new HashMap<>();
+ for (String columnName : schema.getColumnNames()) {
+ if (_caseInsensitive) {
columnNameMap.put(columnName.toLowerCase(), columnName);
+ } else {
+ columnNameMap.put(columnName, null);
}
- _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
- } else {
- _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
}
+ _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
}
private void removeSchema(String path) {
diff --git a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/TopAstNode.java b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/TopAstNode.java
index 4efe455..7c6f617 100644
--- a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/TopAstNode.java
+++ b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/TopAstNode.java
@@ -30,7 +30,7 @@ public class TopAstNode extends BaseAstNode {
public TopAstNode(int count) {
if (count < 0) {
- throw new RuntimeException(QueryException.PQL_PARSING_ERROR);
+ throw new RuntimeException(QueryException.QUERY_PARSING_ERROR);
}
if (count == 0) {
_count = DEFAULT_TOP_N;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 004f863..55c9a35 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -167,7 +167,7 @@ public class PinotQueryResource {
brokerRequest.getQuerySource().setTableName(_pinotHelixResourceManager.getActualTableName(inputTableName));
} catch (Exception e) {
LOGGER.error("Caught exception while compiling {} query: {}", querySyntax.toUpperCase(), query, e);
- return QueryException.getException(QueryException.PQL_PARSING_ERROR, e).toString();
+ return QueryException.getException(QueryException.QUERY_PARSING_ERROR, e).toString();
}
String tableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName());
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 790968a..bfa5b37 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -59,6 +59,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -504,6 +505,14 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
queryResponse = postQuery(SELECT_STAR_QUERY);
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
assertEquals(queryResponse.get("selectionResults").get("columns").size(), 79);
+
+ // The query would fail if 'failQueryWhenColumnMismatch' is set true and the query tried to query a non-existing column.
+ ObjectNode payload = JsonUtils.newObjectNode();
+ payload.put("sql", TEST_DEFAULT_COLUMNS_QUERY);
+ payload.put("queryOptions", "failQueryWhenColumnMismatch=true");
+ queryResponse = JsonUtils.stringToJsonNode(sendPostRequest(_brokerBaseApiUrl + "/query", payload.toString()));
+ Assert.assertEquals(queryResponse.get("totalDocs").asLong(), 0);
+ assertNotNull(queryResponse.get("exceptions"));
}
private void reloadDefaultColumns(boolean withExtraColumns)
@@ -532,6 +541,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
try {
JsonNode queryResponse = postQuery(TEST_DEFAULT_COLUMNS_QUERY);
// Total docs should not change during reload
+ if (queryResponse.get("totalDocs").asLong() == 0) {
+ return false;
+ }
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
long count = queryResponse.get("aggregationResults").get(0).get("value").asLong();
if (withExtraColumns) {
@@ -865,7 +877,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
caseStatementBuilder.append(String.format("WHEN origin = '%s' THEN %d ", origins.get(i), i + 1));
}
caseStatementBuilder.append("ELSE 0 END");
- String sqlQuery = "SELECT origin, " + caseStatementBuilder + " AS origin_code FROM mytable LIMIT 1000";
+ String sqlQuery = "SELECT origin, " + caseStatementBuilder + " as origin_code, AirlineID as aID FROM mytable LIMIT 1000";
JsonNode response = postSqlQuery(sqlQuery, _brokerBaseApiUrl);
JsonNode rows = response.get("resultTable").get("rows");
assertEquals(response.get("exceptions").size(), 0);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org