You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/05/19 00:01:52 UTC
[incubator-pinot] 01/01: Detect invalid column names from
BrokerRequestHandler
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch detect-invalid-columns
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 21fc475c56b54704ea6ee0619fa5e26997dd1322
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Tue May 18 17:00:56 2021 -0700
Detect invalid column names from BrokerRequestHandler
---
.../requesthandler/BaseBrokerRequestHandler.java | 75 +++++++++++++++++++++-
.../broker/requesthandler/QueryValidationTest.java | 65 ++++++++++++++++++-
.../apache/pinot/common/metrics/BrokerMeter.java | 1 +
.../pinot/common/utils/helix/TableCache.java | 17 ++++-
4 files changed, 152 insertions(+), 6 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index bbeff49..42003cc 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -321,7 +321,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
// Validate the request
try {
- validateRequest(pinotQuery, _queryResponseLimit);
+ validateRequest(pinotQuery, _queryResponseLimit, rawTableName, _tableCache.getColumnNames(rawTableName),
+ _brokerMetrics);
} catch (Exception e) {
LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage());
requestStatistics.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
@@ -1666,9 +1667,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
* <li>Value for 'LIMIT' <= configured value</li>
* <li>Query options must be set to SQL mode</li>
* </ul>
+ * This method also validates column names from query, and emit a broker metric if invalid column name is detected.
*/
@VisibleForTesting
- static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit) {
+ static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit, String rawTableName,
+ Set<String> columnNamesFromSchema, BrokerMetrics brokerMetrics) {
// Verify LIMIT
int limit = pinotQuery.getLimit();
if (limit > queryResponseLimit) {
@@ -1682,6 +1685,74 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
if (!queryOptions.isGroupByModeSQL() || !queryOptions.isResponseFormatSQL()) {
throw new IllegalStateException("SQL query should always have response format and group-by mode set to SQL");
}
+
+ // Validate column names from query
+ Set<String> columnsFromQuery = getColumnsFromQuery(pinotQuery);
+ if (!columnNamesFromSchema.containsAll(columnsFromQuery)) {
+ brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.INVALID_COLUMN_NAMES_IN_QUERY, 1L);
+ }
+ }
+
+ /**
+ * Fetch column names from the SQL query.
+ * @param pinotQuery pinot query
+ */
+ private static Set<String> getColumnsFromQuery(PinotQuery pinotQuery) {
+ Set<String> columnsFromQuery = new HashSet<>();
+ // Fetch columns from selection list.
+ if (pinotQuery.isSetSelectList()) {
+ List<Expression> selectionList = pinotQuery.getSelectList();
+ for (Expression expression : selectionList) {
+ computeColumnNamesFromExpression(expression, columnsFromQuery);
+ }
+ }
+ // Fetch columns from filter clause.
+ if (pinotQuery.isSetFilterExpression()) {
+ Expression filterExpression = pinotQuery.getFilterExpression();
+ computeColumnNamesFromExpression(filterExpression, columnsFromQuery);
+ }
+ // Fetch columns from group by list.
+ if (pinotQuery.isSetGroupByList()) {
+ List<Expression> groupByList = pinotQuery.getGroupByList();
+ for (Expression expression : groupByList) {
+ computeColumnNamesFromExpression(expression, columnsFromQuery);
+ }
+ }
+ // Fetch columns from order by list.
+ if (pinotQuery.isSetOrderByList()) {
+ List<Expression> orderByList = pinotQuery.getOrderByList();
+ for (Expression expression : orderByList) {
+ computeColumnNamesFromExpression(expression, columnsFromQuery);
+ }
+ }
+ if (pinotQuery.isSetHavingExpression()) {
+ Expression havingExpression = pinotQuery.getHavingExpression();
+ computeColumnNamesFromExpression(havingExpression, columnsFromQuery);
+ }
+ return columnsFromQuery;
+ }
+
+ /**
+ * Fetch column names from an expression.
+ */
+ private static void computeColumnNamesFromExpression(Expression e, Set<String> columnNames) {
+ if (e.getType() == ExpressionType.IDENTIFIER) {
+ Identifier identifier = e.getIdentifier();
+ if (!"*".equals(identifier.getName())) {
+ columnNames.add(identifier.getName());
+ }
+ } else if (e.getType() == ExpressionType.FUNCTION) {
+ if (e.getFunctionCall().getOperator().equalsIgnoreCase(SqlKind.AS.toString())) {
+ String columnName = e.getFunctionCall().getOperands().get(1).getIdentifier().getName();
+ computeColumnNamesFromExpression(e.getFunctionCall().getOperands().get(0), columnNames);
+ columnNames.add(columnName);
+ } else {
+ List<Expression> expressions = e.getFunctionCall().getOperands();
+ for (Expression expression : expressions) {
+ computeColumnNamesFromExpression(expression, columnNames);
+ }
+ }
+ }
}
/**
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryValidationTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryValidationTest.java
index 7dac470..da83df4 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryValidationTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryValidationTest.java
@@ -19,6 +19,11 @@
package org.apache.pinot.broker.requesthandler;
+import com.google.common.collect.Sets;
+import java.util.Set;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.pql.parsers.Pql2Compiler;
@@ -131,10 +136,68 @@ public class QueryValidationTest {
private void testUnsupportedSQLQuery(String query, String errorMessage) {
try {
PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
- BaseBrokerRequestHandler.validateRequest(pinotQuery, 1000);
+ BaseBrokerRequestHandler.validateRequest(pinotQuery, 1000, "testTable", null, null);
Assert.fail("Query should have failed");
} catch (Exception e) {
Assert.assertEquals(errorMessage, e.getMessage());
}
}
+
+ @Test
+ public void testInvalidColumnNames() {
+ BrokerMetrics brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+ Set<String> columnNamesFromSchema = Sets.newHashSet("column1", "column2", "column3");
+
+ String sql = "SELECT * FROM testTable LIMIT 100";
+ Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+ sql = "SELECT column1 FROM testTable LIMIT 100";
+ Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+ sql = "SELECT column1 FROM testTable WHERE column2 = '1' LIMIT 100";
+ Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+ sql =
+ "SELECT SUM(column1), COUNT(column3) FROM testTable WHERE column2 = '1' group by column3 order by column1 LIMIT 100";
+ Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+ sql = "SELECT COUNT(*) FROM testTable";
+ Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+ long metricCount = 0L;
+ // column4 doesn't exist in the schema
+ sql = "SELECT column4 FROM testTable WHERE column2 = '1' order by column4 LIMIT 100";
+ Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+ // column4 doesn't exist in the schema
+ sql =
+ "SELECT SUM(column4), COUNT(column3) FROM testTable WHERE column2 = '1' group by column3 order by column1 LIMIT 100";
+ Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+ // column5 doesn't exist in the schema
+ sql =
+ "SELECT SUM(column1), COUNT(column3) FROM testTable WHERE column5 = '1' group by column3 order by column1 LIMIT 100";
+ Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+ // column6 doesn't exist in the schema
+ sql =
+ "SELECT SUM(column1), COUNT(column3) FROM testTable WHERE column2 = '1' group by column6 order by column1 LIMIT 100";
+ Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+ // column7 doesn't exist in the schema
+ sql =
+ "SELECT SUM(column1), COUNT(column3) FROM testTable WHERE column2 = '1' group by column3 order by column7 LIMIT 100";
+ Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+ // column8 doesn't exist in the schema
+ sql = "SELECT SUM(column8), column2 FROM testTable WHERE true GROUP BY column2 HAVING SUM(column8) > 10";
+ Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+ }
+
+ private long getInvalidColumnNamesCount(String query, Set<String> columnNamesFromSchema,
+ BrokerMetrics brokerMetrics) {
+ PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query + "option(groupByMode=sql,responseFormat=sql)");
+ BaseBrokerRequestHandler.validateRequest(pinotQuery, 1000, "testTable", columnNamesFromSchema, brokerMetrics);
+ return brokerMetrics.getMeteredTableValue("testTable", BrokerMeter.INVALID_COLUMN_NAMES_IN_QUERY).count();
+ }
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index a6411a7..f0ff9a6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -86,6 +86,7 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
GROUP_BY_SIZE("queries", false),
TOTAL_SERVER_RESPONSE_SIZE("queries", false),
+ INVALID_COLUMN_NAMES_IN_QUERY("queries", false),
QUERY_QUOTA_EXCEEDED("exceptions", false),
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
index 4709cca..a796648 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.I0Itec.zkclient.IZkChildListener;
@@ -131,6 +132,14 @@ public class TableCache {
}
/**
+ * Returns a set of column names given the raw table name, or {@code null} if the table schema does not exist.
+ */
+ public Set<String> getColumnNames(String rawTableName) {
+ SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+ return schemaInfo != null ? schemaInfo._columnNames : null;
+ }
+
+ /**
* Returns the table config for the given table, or {@code null} if it does not exist.
*/
@Nullable
@@ -221,9 +230,9 @@ public class TableCache {
for (String columnName : schema.getColumnNames()) {
columnNameMap.put(columnName.toLowerCase(), columnName);
}
- _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
+ _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap, schema.getColumnNames()));
} else {
- _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
+ _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null, schema.getColumnNames()));
}
}
@@ -316,10 +325,12 @@ public class TableCache {
private static class SchemaInfo {
final Schema _schema;
final Map<String, String> _columnNameMap;
+ final Set<String> _columnNames;
- private SchemaInfo(Schema schema, Map<String, String> columnNameMap) {
+ private SchemaInfo(Schema schema, Map<String, String> columnNameMap, Set<String> columnNames) {
_schema = schema;
_columnNameMap = columnNameMap;
+ _columnNames = columnNames;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org