You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/05/19 00:01:51 UTC

[incubator-pinot] branch detect-invalid-columns created (now 21fc475)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch detect-invalid-columns
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 21fc475  Detect invalid column names from BrokerRequestHandler

This branch includes the following new commits:

     new 21fc475  Detect invalid column names from BrokerRequestHandler

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Detect invalid column names from BrokerRequestHandler

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch detect-invalid-columns
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 21fc475c56b54704ea6ee0619fa5e26997dd1322
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Tue May 18 17:00:56 2021 -0700

    Detect invalid column names from BrokerRequestHandler
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 75 +++++++++++++++++++++-
 .../broker/requesthandler/QueryValidationTest.java | 65 ++++++++++++++++++-
 .../apache/pinot/common/metrics/BrokerMeter.java   |  1 +
 .../pinot/common/utils/helix/TableCache.java       | 17 ++++-
 4 files changed, 152 insertions(+), 6 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index bbeff49..42003cc 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -321,7 +321,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
     // Validate the request
     try {
-      validateRequest(pinotQuery, _queryResponseLimit);
+      validateRequest(pinotQuery, _queryResponseLimit, rawTableName, _tableCache.getColumnNames(rawTableName),
+          _brokerMetrics);
     } catch (Exception e) {
       LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage());
       requestStatistics.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
@@ -1666,9 +1667,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
    *   <li>Value for 'LIMIT' <= configured value</li>
    *   <li>Query options must be set to SQL mode</li>
    * </ul>
+   * This method also validates column names from query, and emit a broker metric if invalid column name is detected.
    */
   @VisibleForTesting
-  static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit) {
+  static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit, String rawTableName,
+      Set<String> columnNamesFromSchema, BrokerMetrics brokerMetrics) {
     // Verify LIMIT
     int limit = pinotQuery.getLimit();
     if (limit > queryResponseLimit) {
@@ -1682,6 +1685,74 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     if (!queryOptions.isGroupByModeSQL() || !queryOptions.isResponseFormatSQL()) {
       throw new IllegalStateException("SQL query should always have response format and group-by mode set to SQL");
     }
+
+    // Validate column names from query
+    Set<String> columnsFromQuery = getColumnsFromQuery(pinotQuery);
+    if (!columnNamesFromSchema.containsAll(columnsFromQuery)) {
+      brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.INVALID_COLUMN_NAMES_IN_QUERY, 1L);
+    }
+  }
+
+  /**
+   * Fetch column names from the SQL query.
+   * @param pinotQuery pinot query
+   */
+  private static Set<String> getColumnsFromQuery(PinotQuery pinotQuery) {
+    Set<String> columnsFromQuery = new HashSet<>();
+    // Fetch columns from selection list.
+    if (pinotQuery.isSetSelectList()) {
+      List<Expression> selectionList = pinotQuery.getSelectList();
+      for (Expression expression : selectionList) {
+        computeColumnNamesFromExpression(expression, columnsFromQuery);
+      }
+    }
+    // Fetch columns from filter clause.
+    if (pinotQuery.isSetFilterExpression()) {
+      Expression filterExpression = pinotQuery.getFilterExpression();
+      computeColumnNamesFromExpression(filterExpression, columnsFromQuery);
+    }
+    // Fetch columns from group by list.
+    if (pinotQuery.isSetGroupByList()) {
+      List<Expression> groupByList = pinotQuery.getGroupByList();
+      for (Expression expression : groupByList) {
+        computeColumnNamesFromExpression(expression, columnsFromQuery);
+      }
+    }
+    // Fetch columns from order by list.
+    if (pinotQuery.isSetOrderByList()) {
+      List<Expression> orderByList = pinotQuery.getOrderByList();
+      for (Expression expression : orderByList) {
+        computeColumnNamesFromExpression(expression, columnsFromQuery);
+      }
+    }
+    if (pinotQuery.isSetHavingExpression()) {
+      Expression havingExpression = pinotQuery.getHavingExpression();
+      computeColumnNamesFromExpression(havingExpression, columnsFromQuery);
+    }
+    return columnsFromQuery;
+  }
+
+  /**
+   * Fetch column names from an expression.
+   */
+  private static void computeColumnNamesFromExpression(Expression e, Set<String> columnNames) {
+    if (e.getType() == ExpressionType.IDENTIFIER) {
+      Identifier identifier = e.getIdentifier();
+      if (!"*".equals(identifier.getName())) {
+        columnNames.add(identifier.getName());
+      }
+    } else if (e.getType() == ExpressionType.FUNCTION) {
+      if (e.getFunctionCall().getOperator().equalsIgnoreCase(SqlKind.AS.toString())) {
+        String columnName = e.getFunctionCall().getOperands().get(1).getIdentifier().getName();
+        computeColumnNamesFromExpression(e.getFunctionCall().getOperands().get(0), columnNames);
+        columnNames.add(columnName);
+      } else {
+        List<Expression> expressions = e.getFunctionCall().getOperands();
+        for (Expression expression : expressions) {
+          computeColumnNamesFromExpression(expression, columnNames);
+        }
+      }
+    }
   }
 
   /**
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryValidationTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryValidationTest.java
index 7dac470..da83df4 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryValidationTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/QueryValidationTest.java
@@ -19,6 +19,11 @@
 
 package org.apache.pinot.broker.requesthandler;
 
+import com.google.common.collect.Sets;
+import java.util.Set;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.PinotMetricUtils;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
@@ -131,10 +136,68 @@ public class QueryValidationTest {
   private void testUnsupportedSQLQuery(String query, String errorMessage) {
     try {
       PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
-      BaseBrokerRequestHandler.validateRequest(pinotQuery, 1000);
+      BaseBrokerRequestHandler.validateRequest(pinotQuery, 1000, "testTable", null, null);
       Assert.fail("Query should have failed");
     } catch (Exception e) {
       Assert.assertEquals(errorMessage, e.getMessage());
     }
   }
+
+  @Test
+  public void testInvalidColumnNames() {
+    BrokerMetrics brokerMetrics = new BrokerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+    Set<String> columnNamesFromSchema = Sets.newHashSet("column1", "column2", "column3");
+
+    String sql = "SELECT * FROM testTable LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+    sql = "SELECT column1 FROM testTable LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+    sql = "SELECT column1 FROM testTable WHERE column2 = '1' LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+    sql =
+        "SELECT SUM(column1), COUNT(column3) FROM testTable WHERE column2 = '1' group by column3 order by column1 LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+    sql = "SELECT COUNT(*) FROM testTable";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), 0L);
+
+    long metricCount = 0L;
+    // column4 doesn't exist in the schema
+    sql = "SELECT column4 FROM testTable WHERE column2 = '1' order by column4 LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+    // column4 doesn't exist in the schema
+    sql =
+        "SELECT SUM(column4), COUNT(column3) FROM testTable WHERE column2 = '1' group by column3 order by column1 LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+    // column5 doesn't exist in the schema
+    sql =
+        "SELECT SUM(column1), COUNT(column3) FROM testTable WHERE column5 = '1' group by column3 order by column1 LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+    // column6 doesn't exist in the schema
+    sql =
+        "SELECT SUM(column1), COUNT(column3) FROM testTable WHERE column2 = '1' group by column6 order by column1 LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+    // column7 doesn't exist in the schema
+    sql =
+        "SELECT SUM(column1), COUNT(column3) FROM testTable WHERE column2 = '1' group by column3 order by column7 LIMIT 100";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+
+    // column8 doesn't exist in the schema
+    sql = "SELECT SUM(column8), column2 FROM testTable WHERE true GROUP BY column2 HAVING SUM(column8) > 10";
+    Assert.assertEquals(getInvalidColumnNamesCount(sql, columnNamesFromSchema, brokerMetrics), ++metricCount);
+  }
+
+  private long getInvalidColumnNamesCount(String query, Set<String> columnNamesFromSchema,
+      BrokerMetrics brokerMetrics) {
+    PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query + "option(groupByMode=sql,responseFormat=sql)");
+    BaseBrokerRequestHandler.validateRequest(pinotQuery, 1000, "testTable", columnNamesFromSchema, brokerMetrics);
+    return brokerMetrics.getMeteredTableValue("testTable", BrokerMeter.INVALID_COLUMN_NAMES_IN_QUERY).count();
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index a6411a7..f0ff9a6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -86,6 +86,7 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
 
   GROUP_BY_SIZE("queries", false),
   TOTAL_SERVER_RESPONSE_SIZE("queries", false),
+  INVALID_COLUMN_NAMES_IN_QUERY("queries", false),
 
   QUERY_QUOTA_EXCEEDED("exceptions", false),
 
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
index 4709cca..a796648 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import org.I0Itec.zkclient.IZkChildListener;
@@ -131,6 +132,14 @@ public class TableCache {
   }
 
   /**
+   * Returns a set of column names given the raw table name, or {@code null} if the table schema does not exist.
+   */
+  public Set<String> getColumnNames(String rawTableName) {
+    SchemaInfo schemaInfo = _schemaInfoMap.get(rawTableName);
+    return schemaInfo != null ? schemaInfo._columnNames : null;
+  }
+
+  /**
    * Returns the table config for the given table, or {@code null} if it does not exist.
    */
   @Nullable
@@ -221,9 +230,9 @@ public class TableCache {
       for (String columnName : schema.getColumnNames()) {
         columnNameMap.put(columnName.toLowerCase(), columnName);
       }
-      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap));
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, columnNameMap, schema.getColumnNames()));
     } else {
-      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null));
+      _schemaInfoMap.put(rawTableName, new SchemaInfo(schema, null, schema.getColumnNames()));
     }
   }
 
@@ -316,10 +325,12 @@ public class TableCache {
   private static class SchemaInfo {
     final Schema _schema;
     final Map<String, String> _columnNameMap;
+    final Set<String> _columnNames;
 
-    private SchemaInfo(Schema schema, Map<String, String> columnNameMap) {
+    private SchemaInfo(Schema schema, Map<String, String> columnNameMap, Set<String> columnNames) {
       _schema = schema;
       _columnNameMap = columnNameMap;
+      _columnNames = columnNames;
     }
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org