You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/09/28 18:25:38 UTC

[incubator-pinot] 01/01: Add option to fail query when column mismatches

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch throw-exception-when-column-mismatch
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit e0987e566749cf93c2a60f36eb32b241b3b3374b
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Mon Sep 28 11:24:53 2020 -0700

    Add option to fail query when column mismatches
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 99 +++++++++++++++-------
 .../pinot/common/exception/QueryException.java     |  8 +-
 .../pinot/common/utils/helix/TableCache.java       | 12 +--
 .../pinot/pql/parsers/pql2/ast/TopAstNode.java     |  2 +-
 .../api/resources/PinotQueryResource.java          |  2 +-
 .../tests/OfflineClusterIntegrationTest.java       | 14 ++-
 6 files changed, 95 insertions(+), 42 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 4925781..1e9f0b7 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -46,6 +46,7 @@ import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.RoutingManager;
 import org.apache.pinot.broker.routing.RoutingTable;
 import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
+import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.metrics.BrokerMeter;
@@ -74,6 +75,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.helix.TableCache;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.core.query.exception.BadQueryRequestException;
 import org.apache.pinot.core.query.reduce.BrokerReduceService;
 import org.apache.pinot.core.requesthandler.BrokerRequestOptimizer;
 import org.apache.pinot.core.requesthandler.PinotQueryParserFactory;
@@ -182,8 +184,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     } catch (Exception e) {
       LOGGER.info("Caught exception while compiling request {}: {}, {}", requestId, query, e.getMessage());
       _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1);
-      requestStatistics.setErrorCode(QueryException.PQL_PARSING_ERROR_CODE);
-      return new BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR, e));
+      requestStatistics.setErrorCode(QueryException.QUERY_PARSING_ERROR_CODE);
+      return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_PARSING_ERROR, e));
     }
     if (isLiteralOnlyQuery(brokerRequest)) {
       LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query);
@@ -196,9 +198,15 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
                 e.getMessage());
       }
     }
+
+    // Set extra settings into broker request
+    setOptions(requestId, query, request, brokerRequest);
+
     updateTableName(brokerRequest);
     try {
       updateColumnNames(brokerRequest);
+    } catch(BadQueryRequestException be) {
+      return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_PARSING_ERROR, be));
     } catch (Exception e) {
       LOGGER.warn("Caught exception while updating Column names in Query {}: {}, {}", requestId, query, e);
     }
@@ -283,9 +291,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e));
     }
 
-    // Set extra settings into broker request
-    setOptions(requestId, query, request, brokerRequest);
-
     // Optimize the query
     // TODO: get time column name from schema or table config so that we can apply it for REALTIME only case
     // We get timeColumnName from time boundary service currently, which only exists for offline table
@@ -739,12 +744,16 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     String rawTableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName());
     Map<String, String> columnNameMap =
         _tableCache.isCaseInsensitive() ? _tableCache.getColumnNameMap(rawTableName) : null;
+    Map<String, String> queryOptions = brokerRequest.getQueryOptions();
+    boolean failQueryWhenColumnMismatch =
+        (queryOptions != null && Boolean.parseBoolean(queryOptions.get("failQueryWhenColumnMismatch")));
 
     if (brokerRequest.getFilterSubQueryMap() != null) {
       Collection<FilterQuery> values = brokerRequest.getFilterSubQueryMap().getFilterQueryMap().values();
       for (FilterQuery filterQuery : values) {
         if (filterQuery.getNestedFilterQueryIdsSize() == 0) {
-          filterQuery.setColumn(fixColumnName(rawTableName, filterQuery.getColumn(), columnNameMap));
+          filterQuery.setColumn(
+              fixColumnName(rawTableName, filterQuery.getColumn(), columnNameMap, failQueryWhenColumnMismatch));
         }
       }
     }
@@ -753,14 +762,22 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
         if (!info.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
           // Always read from backward compatible api in AggregationFunctionUtils.
           List<String> arguments = AggregationFunctionUtils.getArguments(info);
-          arguments.replaceAll(e -> fixColumnName(rawTableName, e, columnNameMap));
+          arguments.replaceAll(e -> {
+            try {
+              return fixColumnName(rawTableName, e, columnNameMap, failQueryWhenColumnMismatch);
+            } catch (Exception ex) {
+              Utils.rethrowException(ex);
+              throw new AssertionError("Should not reach this");
+            }
+          });
           info.setExpressions(arguments);
         }
       }
       if (brokerRequest.isSetGroupBy()) {
         List<String> expressions = brokerRequest.getGroupBy().getExpressions();
         for (int i = 0; i < expressions.size(); i++) {
-          expressions.set(i, fixColumnName(rawTableName, expressions.get(i), columnNameMap));
+          expressions
+              .set(i, fixColumnName(rawTableName, expressions.get(i), columnNameMap, failQueryWhenColumnMismatch));
         }
       }
     } else {
@@ -769,7 +786,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       for (int i = 0; i < selectionColumns.size(); i++) {
         String expression = selectionColumns.get(i);
         if (!expression.equals("*")) {
-          selectionColumns.set(i, fixColumnName(rawTableName, expression, columnNameMap));
+          selectionColumns.set(i, fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch));
         }
       }
     }
@@ -777,78 +794,98 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       List<SelectionSort> orderBy = brokerRequest.getOrderBy();
       for (SelectionSort selectionSort : orderBy) {
         String expression = selectionSort.getColumn();
-        selectionSort.setColumn(fixColumnName(rawTableName, expression, columnNameMap));
+        selectionSort.setColumn(fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch));
       }
     }
 
     PinotQuery pinotQuery = brokerRequest.getPinotQuery();
     if (pinotQuery != null) {
       for (Expression expression : pinotQuery.getSelectList()) {
-        fixColumnName(rawTableName, expression, columnNameMap);
+        fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch);
       }
       Expression filterExpression = pinotQuery.getFilterExpression();
       if (filterExpression != null) {
-        fixColumnName(rawTableName, filterExpression, columnNameMap);
+        fixColumnName(rawTableName, filterExpression, columnNameMap, failQueryWhenColumnMismatch);
       }
       List<Expression> groupByList = pinotQuery.getGroupByList();
       if (groupByList != null) {
         for (Expression expression : groupByList) {
-          fixColumnName(rawTableName, expression, columnNameMap);
+          fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch);
         }
       }
       List<Expression> orderByList = pinotQuery.getOrderByList();
       if (orderByList != null) {
         for (Expression expression : orderByList) {
-          fixColumnName(rawTableName, expression, columnNameMap);
+          fixColumnName(rawTableName, expression, columnNameMap, failQueryWhenColumnMismatch);
         }
       }
       Expression havingExpression = pinotQuery.getHavingExpression();
       if (havingExpression != null) {
-        fixColumnName(rawTableName, havingExpression, columnNameMap);
+        fixColumnName(rawTableName, havingExpression, columnNameMap, failQueryWhenColumnMismatch);
       }
     }
   }
 
-  private String fixColumnName(String rawTableName, String expression, @Nullable Map<String, String> columnNameMap) {
+  private String fixColumnName(String rawTableName, String expression, @Nullable Map<String, String> columnNameMap,
+      boolean failQueryWhenColumnMismatch) {
     TransformExpressionTree expressionTree = TransformExpressionTree.compileToExpressionTree(expression);
-    fixColumnName(rawTableName, expressionTree, columnNameMap);
+    fixColumnName(rawTableName, expressionTree, columnNameMap, failQueryWhenColumnMismatch);
     return expressionTree.toString();
   }
 
   private void fixColumnName(String rawTableName, TransformExpressionTree expression,
-      @Nullable Map<String, String> columnNameMap) {
+      @Nullable Map<String, String> columnNameMap, boolean failQueryWhenColumnMismatch) {
     TransformExpressionTree.ExpressionType expressionType = expression.getExpressionType();
     if (expressionType == TransformExpressionTree.ExpressionType.IDENTIFIER) {
-      expression.setValue(getActualColumnName(rawTableName, expression.getValue(), columnNameMap));
+      expression.setValue(
+          getActualColumnName(rawTableName, expression.getValue(), columnNameMap, failQueryWhenColumnMismatch));
     } else if (expressionType == TransformExpressionTree.ExpressionType.FUNCTION) {
       for (TransformExpressionTree child : expression.getChildren()) {
-        fixColumnName(rawTableName, child, columnNameMap);
+        fixColumnName(rawTableName, child, columnNameMap, failQueryWhenColumnMismatch);
       }
     }
   }
 
-  private void fixColumnName(String rawTableName, Expression expression, @Nullable Map<String, String> columnNameMap) {
+  private void fixColumnName(String rawTableName, Expression expression, @Nullable Map<String, String> columnNameMap,
+      boolean throwExceptionWhenColumnNameMismatch) {
     ExpressionType expressionType = expression.getType();
     if (expressionType == ExpressionType.IDENTIFIER) {
       Identifier identifier = expression.getIdentifier();
-      identifier.setName(getActualColumnName(rawTableName, identifier.getName(), columnNameMap));
+      identifier.setName(
+          getActualColumnName(rawTableName, identifier.getName(), columnNameMap, throwExceptionWhenColumnNameMismatch));
     } else if (expressionType == ExpressionType.FUNCTION) {
-      for (Expression operand : expression.getFunctionCall().getOperands()) {
-        fixColumnName(rawTableName, operand, columnNameMap);
+      String operator = expression.getFunctionCall().getOperator();
+      List<Expression> expressions = expression.getFunctionCall().getOperands();
+      if ("AS".equals(operator)) {
+        fixColumnName(rawTableName, expressions.get(0), columnNameMap, throwExceptionWhenColumnNameMismatch);
+      } else if (!isCountStarFromExpression(expression)) {
+        for (Expression operand : expression.getFunctionCall().getOperands()) {
+          fixColumnName(rawTableName, operand, columnNameMap, throwExceptionWhenColumnNameMismatch);
+        }
       }
     }
   }
 
-  private String getActualColumnName(String rawTableName, String columnName,
-      @Nullable Map<String, String> columnNameMap) {
+  private boolean isCountStarFromExpression(Expression expression) {
+    String operator = expression.getFunctionCall().getOperator();
+    List<Expression> expressions = expression.getFunctionCall().getOperands();
+    return "COUNT".equals(operator) && expressions.size() == 1 && "*"
+        .equals(expressions.get(0).getIdentifier().getName());
+  }
+
+  private String getActualColumnName(String rawTableName, String columnName, Map<String, String> columnNameMap,
+      boolean failQueryWhenColumnMismatch) {
     // Check if column is in the format of [table_name].[column_name]
     String[] splits = StringUtils.split(columnName, ".", 2);
     if (_tableCache.isCaseInsensitive()) {
       if (splits.length == 2 && rawTableName.equalsIgnoreCase(splits[0])) {
         columnName = splits[1];
       }
-      if (columnNameMap != null) {
-        return columnNameMap.getOrDefault(columnName, columnName);
+      String actualColumnName = columnNameMap.get(columnName.toLowerCase());
+      if (actualColumnName != null) {
+        return actualColumnName;
+      } else if (failQueryWhenColumnMismatch) {
+        throw new BadQueryRequestException("Invalid column name in the query: " + columnName);
       } else {
         return columnName;
       }
@@ -856,7 +893,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       if (splits.length == 2 && rawTableName.equals(splits[0])) {
         columnName = splits[1];
       }
-      return columnName;
+      if (!columnNameMap.containsKey(columnName) && failQueryWhenColumnMismatch) {
+        throw new BadQueryRequestException("Invalid column name in the query: " + columnName);
+      } else {
+        return columnName;
+      }
     }
   }
 
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index 10a1544..7e05582 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -37,7 +37,7 @@ public class QueryException {
   // TODO: several ProcessingExceptions are never used, clean them up.
   public static final int JSON_PARSING_ERROR_CODE = 100;
   public static final int JSON_COMPILATION_ERROR_CODE = 101;
-  public static final int PQL_PARSING_ERROR_CODE = 150;
+  public static final int QUERY_PARSING_ERROR_CODE = 150;
   public static final int SEGMENT_PLAN_EXECUTION_ERROR_CODE = 160;
   public static final int COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR_CODE = 170;
   public static final int ACCESS_DENIED_ERROR_CODE = 180;
@@ -66,7 +66,7 @@ public class QueryException {
 
   public static final ProcessingException JSON_PARSING_ERROR = new ProcessingException(JSON_PARSING_ERROR_CODE);
   public static final ProcessingException JSON_COMPILATION_ERROR = new ProcessingException(JSON_COMPILATION_ERROR_CODE);
-  public static final ProcessingException PQL_PARSING_ERROR = new ProcessingException(PQL_PARSING_ERROR_CODE);
+  public static final ProcessingException QUERY_PARSING_ERROR = new ProcessingException(QUERY_PARSING_ERROR_CODE);
   public static final ProcessingException ACCESS_DENIED_ERROR = new ProcessingException(ACCESS_DENIED_ERROR_CODE);
   public static final ProcessingException SEGMENT_PLAN_EXECUTION_ERROR =
       new ProcessingException(SEGMENT_PLAN_EXECUTION_ERROR_CODE);
@@ -105,7 +105,7 @@ public class QueryException {
   static {
     JSON_PARSING_ERROR.setMessage("JsonParsingError");
     JSON_COMPILATION_ERROR.setMessage("JsonCompilationError");
-    PQL_PARSING_ERROR.setMessage("PQLParsingError");
+    QUERY_PARSING_ERROR.setMessage("QueryParsingError");
     SEGMENT_PLAN_EXECUTION_ERROR.setMessage("SegmentPlanExecutionError");
     COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR.setMessage("CombineSegmentPlanTimeoutError");
     QUERY_EXECUTION_ERROR.setMessage("QueryExecutionError");
@@ -168,7 +168,7 @@ public class QueryException {
       case QueryException.JSON_COMPILATION_ERROR_CODE:
       case QueryException.JSON_PARSING_ERROR_CODE:
       case QueryException.QUERY_VALIDATION_ERROR_CODE:
-      case QueryException.PQL_PARSING_ERROR_CODE:
+      case QueryException.QUERY_PARSING_ERROR_CODE:
       case QueryException.TOO_MANY_REQUESTS_ERROR_CODE:
         return true;
       default:
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
index dbd360d..74bb3e8 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
@@ -216,15 +216,15 @@ public class TableCache {
       throws IOException {
     Schema schema = SchemaUtils.fromZNRecord(znRecord);
     String rawTableName = schema.getSchemaName();
-    if (_caseInsensitive) {
-      Map<String, String> columnNameMap = new HashMap<>();
-      for (String columnName : schema.getColumnNames()) {
+    Map<String, String> columnNameMap = new HashMap<>();
+    for (String columnName : schema.getColumnNames()) {
+      if (_caseInsensitive) {
         columnNameMap.put(columnName.toLowerCase(), columnName);
+      } else {
+        columnNameMap.put(columnName, null);
       }
-      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
-    } else {
-      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
     }
+    _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
   }
 
   private void removeSchema(String path) {
diff --git a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/TopAstNode.java b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/TopAstNode.java
index 4efe455..7c6f617 100644
--- a/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/TopAstNode.java
+++ b/pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/TopAstNode.java
@@ -30,7 +30,7 @@ public class TopAstNode extends BaseAstNode {
 
   public TopAstNode(int count) {
     if (count < 0) {
-      throw new RuntimeException(QueryException.PQL_PARSING_ERROR);
+      throw new RuntimeException(QueryException.QUERY_PARSING_ERROR);
     }
     if (count == 0) {
       _count = DEFAULT_TOP_N;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index 004f863..55c9a35 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -167,7 +167,7 @@ public class PinotQueryResource {
       brokerRequest.getQuerySource().setTableName(_pinotHelixResourceManager.getActualTableName(inputTableName));
     } catch (Exception e) {
       LOGGER.error("Caught exception while compiling {} query: {}", querySyntax.toUpperCase(), query, e);
-      return QueryException.getException(QueryException.PQL_PARSING_ERROR, e).toString();
+      return QueryException.getException(QueryException.QUERY_PARSING_ERROR, e).toString();
     }
     String tableName = TableNameBuilder.extractRawTableName(brokerRequest.getQuerySource().getTableName());
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 790968a..bfa5b37 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -59,6 +59,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -504,6 +505,14 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     queryResponse = postQuery(SELECT_STAR_QUERY);
     assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
     assertEquals(queryResponse.get("selectionResults").get("columns").size(), 79);
+
+    // The query would fail if 'failQueryWhenColumnMismatch' is set true and the query tried to query a non-existing column.
+    ObjectNode payload = JsonUtils.newObjectNode();
+    payload.put("sql", TEST_DEFAULT_COLUMNS_QUERY);
+    payload.put("queryOptions", "failQueryWhenColumnMismatch=true");
+    queryResponse = JsonUtils.stringToJsonNode(sendPostRequest(_brokerBaseApiUrl + "/query", payload.toString()));
+    Assert.assertEquals(queryResponse.get("totalDocs").asLong(), 0);
+    assertNotNull(queryResponse.get("exceptions"));
   }
 
   private void reloadDefaultColumns(boolean withExtraColumns)
@@ -532,6 +541,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
       try {
         JsonNode queryResponse = postQuery(TEST_DEFAULT_COLUMNS_QUERY);
         // Total docs should not change during reload
+        if (queryResponse.get("totalDocs").asLong() == 0) {
+          return false;
+        }
         assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
         long count = queryResponse.get("aggregationResults").get(0).get("value").asLong();
         if (withExtraColumns) {
@@ -865,7 +877,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
       caseStatementBuilder.append(String.format("WHEN origin = '%s' THEN %d ", origins.get(i), i + 1));
     }
     caseStatementBuilder.append("ELSE 0 END");
-    String sqlQuery = "SELECT origin, " + caseStatementBuilder + " AS origin_code FROM mytable LIMIT 1000";
+    String sqlQuery = "SELECT origin, " + caseStatementBuilder + " as origin_code, AirlineID as aID FROM mytable LIMIT 1000";
     JsonNode response = postSqlQuery(sqlQuery, _brokerBaseApiUrl);
     JsonNode rows = response.get("resultTable").get("rows");
     assertEquals(response.get("exceptions").size(), 0);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org