You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/05/19 00:01:52 UTC

[incubator-pinot] 01/01: Detect invalid column names from BrokerRequestHandler

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch detect-invalid-columns
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 21fc475c56b54704ea6ee0619fa5e26997dd1322
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Tue May 18 17:00:56 2021 -0700

    Detect invalid column names from BrokerRequestHandler
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 75 +++++++++++++++++++++-
 .../broker/requesthandler/QueryValidationTest.java | 65 ++++++++++++++++++-
 .../apache/pinot/common/metrics/BrokerMeter.java   |  1 +
 .../pinot/common/utils/helix/TableCache.java       | 17 ++++-
 4 files changed, 152 insertions(+), 6 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index bbeff49..42003cc 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -321,7 +321,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
     // Validate the request
     try {
-      validateRequest(pinotQuery, _queryResponseLimit);
+      validateRequest(pinotQuery, _queryResponseLimit, rawTableName, _tableCache.getColumnNames(rawTableName),
+          _brokerMetrics);
     } catch (Exception e) {
       LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage());
       requestStatistics.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
@@ -1666,9 +1667,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
    *   <li>Value for 'LIMIT' <= configured value</li>
    *   <li>Query options must be set to SQL mode</li>
    * </ul>
+   * This method also validates column names from query, and emit a broker metric if invalid column name is detected.
    */
   @VisibleForTesting
-  static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit) {
+  static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit, String rawTableName,
+      Set<String> columnNamesFromSchema, BrokerMetrics brokerMetrics) {
     // Verify LIMIT
     int limit = pinotQuery.getLimit();
     if (limit > queryResponseLimit) {
@@ -1682,6 +1685,74 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     if (!queryOptions.isGroupByModeSQL() || !queryOptions.isResponseFormatSQL()) {
       throw new IllegalStateException("SQL query should always have response format and group-by mode set to SQL");
     }
+
+    // Validate column names from query
+    Set<String> columnsFromQuery = getColumnsFromQuery(pinotQuery);
+    if (!columnNamesFromSchema.containsAll(columnsFromQuery)) {
+      brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.INVALID_COLUMN_NAMES_IN_QUERY, 1L);
+    }
+  }
+
+  /**
+   * Fetch column names from the SQL query.
+   * @param pinotQuery pinot query
+   */
+  private static Set<String> getColumnsFromQuery(PinotQuery pinotQuery) {
+    Set<String> columnsFromQuery = new HashSet<>();
+    // Fetch columns from selection list.
+    if (pinotQuery.isSetSelectList()) {
+      List<Expression> selectionList = pinotQuery.getSelectList();
+      for (Expression expression : selectionList) {
+        computeColumnNamesFromExpression(expression, columnsFromQuery);
+      }
+    }
+    // Fetch columns from filter clause.
+    if (pinotQuery.isSetFilterExpression()) {
+      Expression filterExpression = pinotQuery.getFilterExpression();
+      computeColumnNamesFromExpression(filterExpression, columnsFromQuery);
+    }
+    // Fetch columns from group by list.
+    if (pinotQuery.isSetGroupByList()) {
+      List<Expression> groupByList = pinotQuery.getGroupByList();
+      for (Expression expression : groupByList) {
+        computeColumnNamesFromExpression(expression, columnsFromQuery);
+      }
+    }
+    // Fetch columns from order by list.
+    if (pinotQuery.isSetOrderByList()) {
+      List<Expression> orderByList = pinotQuery.getOrderByList();
+      for (Expression expression : orderByList) {
+        computeColumnNamesFromExpression(expression, columnsFromQuery);
+      }
+    }
+    if (pinotQuery.isSetHavingExpression()) {
+      Expression havingExpression = pinotQuery.getHavingExpression();
+      computeColumnNamesFromExpression(havingExpression, columnsFromQuery);
+    }
+    return columnsFromQuery;
+  }
+
+  /**
+   * Fetch column names from an expression.
+   */
+  private static void computeColumnNamesFromExpression(Expression e, Set<String> columnNames) {
+    if (e.getType() == ExpressionType.IDENTIFIER) {
+      Identifier identifier = e.getIdentifier();
+      if (!"*".equals(identifier.getName())) {
+        columnNames.add(identifier.getName());
+      }
+    } else if (e.getType() == ExpressionType.FUNCTION) {
+      if (e.getFunctionCall().getOperator().equalsIgnoreCase(SqlKind.AS.toString())) {
+        String columnName = e.getFunctionCall().getOperands().get(1).getIdentifier().getName();
+        computeColumnNamesFromExpression(e.getFunctionCall().getOperands().get(0), columnNames);
+        columnNames.add(columnName);
+      } else {
+        List<Expression> expressions = e.getFunctionCall().getOperands();
+        for (Expression expression : expressions) {
+          computeColumnNamesFromExpression(expression, columnNames);
+        }
+      }
+    }
   }
 
   /**
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryValidationTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryValidationTest.java
index 7dac470..da83df4 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryValidationTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryValidationTest.java
@@ -19,6 +19,11 @@
 
 package org.apache.pinot.broker.requesthandler;
 
+import com.google.common.collect.Sets;
+import java.util.Set;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.PinotMetricUtils;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
@@ -131,10 +136,68 @@ public class QueryValidationTest {
   private void testUnsupportedSQLQuery(String query, String errorMessage) {
     try {
       PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
-      BaseBrokerRequestHandler.validateRequest(pinotQuery, 1000);
+      BaseBrokerRequestHandler.validateRequest(pinotQuery, 1000, "testTable", null, null);
       Assert.fail("Query should have failed");
     } catch (Exception e) {
       Assert.assertEquals(errorMessage, e.getMessage());
     }
   }
+
+  @Test
+  public void testInvalidColumnNames() {
+    BrokerMetrics brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+    Set<String> columnNamesFromSchema = Sets.newHashSet("column1", "column2", "column3");
+
+    String sql = "SELECT * FROM testTable LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+    sql = "SELECT column1 FROM testTable LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+    sql = "SELECT column1 FROM testTable WHERE column2 = '1' LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+    sql =
+        "SELECT SUM(column1), COUNT(column3) FROM testTable WHERE column2 = '1' group by column3 order by column1 LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+    sql = "SELECT COUNT(*) FROM testTable";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+    long metricCount = 0L;
+    // column4 doesn't exist in the schema
+    sql = "SELECT column4 FROM testTable WHERE column2 = '1' order by column4 LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+    // column4 doesn't exist in the schema
+    sql =
+        "SELECT SUM(column4), COUNT(column3) FROM testTable WHERE column2 = '1' group by column3 order by column1 LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+    // column5 doesn't exist in the schema
+    sql =
+        "SELECT SUM(column1), COUNT(column3) FROM testTable WHERE column5 = '1' group by column3 order by column1 LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+    // column6 doesn't exist in the schema
+    sql =
+        "SELECT SUM(column1), COUNT(column3) FROM testTable WHERE column2 = '1' group by column6 order by column1 LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+    // column7 doesn't exist in the schema
+    sql =
+        "SELECT SUM(column1), COUNT(column3) FROM testTable WHERE column2 = '1' group by column3 order by column7 LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+    // column8 doesn't exist in the schema
+    sql = "SELECT SUM(column8), column2 FROM testTable WHERE true GROUP BY column2 HAVING SUM(column8) > 10";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+  }
+
+  private long getInvalidColumnNamesCount(String query, Set<String> columnNamesFromSchema,
+      BrokerMetrics brokerMetrics) {
+    PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query + "option(groupByMode=sql,responseFormat=sql)");
+    BaseBrokerRequestHandler.validateRequest(pinotQuery, 1000, "testTable", columnNamesFromSchema, brokerMetrics);
+    return brokerMetrics.getMeteredTableValue("testTable", BrokerMeter.INVALID_COLUMN_NAMES_IN_QUERY).count();
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index a6411a7..f0ff9a6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -86,6 +86,7 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
 
   GROUP_BY_SIZE("queries", false),
   TOTAL_SERVER_RESPONSE_SIZE("queries", false),
+  INVALID_COLUMN_NAMES_IN_QUERY("queries", false),
 
   QUERY_QUOTA_EXCEEDED("exceptions", false),
 
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
index 4709cca..a796648 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import org.I0Itec.zkclient.IZkChildListener;
@@ -131,6 +132,14 @@ public class TableCache {
   }
 
   /**
+   * Returns a set of column names given the raw table name, or {@code null} if the table schema does not exist.
+   */
+  public Set<String> getColumnNames(String rawTableName) {
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    return schemaInfo != null ? schemaInfo._columnNames : null;
+  }
+
+  /**
    * Returns the table config for the given table, or {@code null} if it does not exist.
    */
   @Nullable
@@ -221,9 +230,9 @@ public class TableCache {
       for (String columnName : schema.getColumnNames()) {
         columnNameMap.put(columnName.toLowerCase(), columnName);
       }
-      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap, schema.getColumnNames()));
     } else {
-      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null, schema.getColumnNames()));
     }
   }
 
@@ -316,10 +325,12 @@ public class TableCache {
   private static class SchemaInfo {
     final Schema _schema;
     final Map<String, String> _columnNameMap;
+    final Set<String> _columnNames;
 
-    private SchemaInfo(Schema schema, Map<String, String> columnNameMap) {
+    private SchemaInfo(Schema schema, Map<String, String> columnNameMap, Set<String> columnNames) {
       _schema = schema;
       _columnNameMap = columnNameMap;
+      _columnNames = columnNames;
     }
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org