You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/07/22 22:48:10 UTC

[incubator-pinot] branch enable_table_cache_and_check_on_alias_table_column_name updated (2ca2e18 -> 2c1102e)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch enable_table_cache_and_check_on_alias_table_column_name
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard 2ca2e18  Adding column name rewrite for the identifiers in the format of [table_name].[column_name]
     new 2c1102e  Adding column name rewrite for the identifiers in the format of [table_name].[column_name]

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (2ca2e18)
            \
             N -- N -- N   refs/heads/enable_table_cache_and_check_on_alias_table_column_name (2c1102e)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../requesthandler/BaseBrokerRequestHandler.java   | 31 ++++++++++++----------
 1 file changed, 17 insertions(+), 14 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding column name rewrite for the identifiers in the format of [table_name].[column_name]

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch enable_table_cache_and_check_on_alias_table_column_name
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 2c1102ec56c0d865721312a79a2db9e42632691a
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Jul 22 15:45:42 2020 -0700

    Adding column name rewrite for the identifiers in the format of [table_name].[column_name]
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 83 ++++++++++++++--------
 .../tests/OfflineClusterIntegrationTest.java       | 68 ++++++++++++++++++
 2 files changed, 121 insertions(+), 30 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 9654554..635d03a 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -202,12 +202,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       }
     }
     updateQuerySource(brokerRequest);
-    if (_enableCaseInsensitive) {
-      try {
-        handleCaseSensitivity(brokerRequest);
-      } catch (Exception e) {
-        LOGGER.warn("Caught exception while rewriting PQL to make it case-insensitive {}: {}, {}", requestId, query, e);
-      }
+    try {
+      handleUpdateColumnNames(brokerRequest);
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while rewriting Column names in Pinot Query {}: {}, {}", requestId, query, e);
     }
     if (_defaultHllLog2m > 0) {
       handleHyperloglogLog2mOverride(brokerRequest, _defaultHllLog2m);
@@ -667,19 +665,22 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   }
 
   /**
-   * Fixes the case-insensitive column names to the actual column names in the given broker request.
+   * Fixes the column names to the actual column names in the given broker request.
    */
-  private void handleCaseSensitivity(BrokerRequest brokerRequest) {
-    String inputTableName = brokerRequest.getQuerySource().getTableName();
-    String actualTableName = _tableCache.getActualTableName(inputTableName);
-    brokerRequest.getQuerySource().setTableName(actualTableName);
+  private void handleUpdateColumnNames(BrokerRequest brokerRequest) {
+    if (_enableCaseInsensitive) {
+      String inputTableName = brokerRequest.getQuerySource().getTableName();
+      String actualTableName = _tableCache.getActualTableName(inputTableName);
+      brokerRequest.getQuerySource().setTableName(actualTableName);
+    }
+    String tableName = brokerRequest.getQuerySource().getTableName();
     //fix columns
     if (brokerRequest.getFilterSubQueryMap() != null) {
       Collection<FilterQuery> values = brokerRequest.getFilterSubQueryMap().getFilterQueryMap().values();
       for (FilterQuery filterQuery : values) {
         if (filterQuery.getNestedFilterQueryIdsSize() == 0) {
           String expression = filterQuery.getColumn();
-          filterQuery.setColumn(fixColumnNameCase(actualTableName, expression));
+          filterQuery.setColumn(fixColumnName(tableName, expression));
         }
       }
     }
@@ -688,14 +689,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
         if (!info.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
           // Always read from backward compatible api in AggregationFunctionUtils.
           List<String> arguments = AggregationFunctionUtils.getArguments(info);
-          arguments.replaceAll(e -> fixColumnNameCase(actualTableName, e));
+          arguments.replaceAll(e -> fixColumnName(tableName, e));
           info.setExpressions(arguments);
         }
       }
       if (brokerRequest.isSetGroupBy()) {
         List<String> expressions = brokerRequest.getGroupBy().getExpressions();
         for (int i = 0; i < expressions.size(); i++) {
-          expressions.set(i, fixColumnNameCase(actualTableName, expressions.get(i)));
+          expressions.set(i, fixColumnName(tableName, expressions.get(i)));
         }
       }
     } else {
@@ -704,7 +705,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       for (int i = 0; i < selectionColumns.size(); i++) {
         String expression = selectionColumns.get(i);
         if (!expression.equals("*")) {
-          selectionColumns.set(i, fixColumnNameCase(actualTableName, expression));
+          selectionColumns.set(i, fixColumnName(tableName, expression));
         }
       }
     }
@@ -712,66 +713,88 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       List<SelectionSort> orderBy = brokerRequest.getOrderBy();
       for (SelectionSort selectionSort : orderBy) {
         String expression = selectionSort.getColumn();
-        selectionSort.setColumn(fixColumnNameCase(actualTableName, expression));
+        selectionSort.setColumn(fixColumnName(tableName, expression));
       }
     }
 
     PinotQuery pinotQuery = brokerRequest.getPinotQuery();
     if (pinotQuery != null) {
-      pinotQuery.getDataSource().setTableName(actualTableName);
+      pinotQuery.getDataSource().setTableName(tableName);
       for (Expression expression : pinotQuery.getSelectList()) {
-        fixColumnNameCase(actualTableName, expression);
+        fixColumnName(tableName, expression);
       }
       Expression filterExpression = pinotQuery.getFilterExpression();
       if (filterExpression != null) {
-        fixColumnNameCase(actualTableName, filterExpression);
+        fixColumnName(tableName, filterExpression);
       }
       List<Expression> groupByList = pinotQuery.getGroupByList();
       if (groupByList != null) {
         for (Expression expression : groupByList) {
-          fixColumnNameCase(actualTableName, expression);
+          fixColumnName(tableName, expression);
         }
       }
       List<Expression> orderByList = pinotQuery.getOrderByList();
       if (orderByList != null) {
         for (Expression expression : orderByList) {
-          fixColumnNameCase(actualTableName, expression);
+          fixColumnName(tableName, expression);
         }
       }
       Expression havingExpression = pinotQuery.getHavingExpression();
       if (havingExpression != null) {
-        fixColumnNameCase(actualTableName, havingExpression);
+        fixColumnName(tableName, havingExpression);
       }
     }
   }
 
-  private String fixColumnNameCase(String tableNameWithType, String expression) {
+  private String fixColumnName(String tableNameWithType, String expression) {
     TransformExpressionTree expressionTree = TransformExpressionTree.compileToExpressionTree(expression);
-    fixColumnNameCase(tableNameWithType, expressionTree);
+    fixColumnName(tableNameWithType, expressionTree);
     return expressionTree.toString();
   }
 
-  private void fixColumnNameCase(String tableNameWithType, TransformExpressionTree expression) {
+  private void fixColumnName(String tableNameWithType, TransformExpressionTree expression) {
     TransformExpressionTree.ExpressionType expressionType = expression.getExpressionType();
     if (expressionType == TransformExpressionTree.ExpressionType.IDENTIFIER) {
-      expression.setValue(_tableCache.getActualColumnName(tableNameWithType, expression.getValue()));
+      String identifier = expression.getValue();
+      expression.setValue(getActualColumnName(tableNameWithType, identifier));
     } else if (expressionType == TransformExpressionTree.ExpressionType.FUNCTION) {
       for (TransformExpressionTree child : expression.getChildren()) {
-        fixColumnNameCase(tableNameWithType, child);
+        fixColumnName(tableNameWithType, child);
       }
     }
   }
 
-  private void fixColumnNameCase(String tableNameWithType, Expression expression) {
+  private void fixColumnName(String tableNameWithType, Expression expression) {
     ExpressionType expressionType = expression.getType();
     if (expressionType == ExpressionType.IDENTIFIER) {
       Identifier identifier = expression.getIdentifier();
-      identifier.setName(_tableCache.getActualColumnName(tableNameWithType, identifier.getName()));
+      identifier.setName(getActualColumnName(tableNameWithType, identifier.getName()));
     } else if (expressionType == ExpressionType.FUNCTION) {
       for (Expression operand : expression.getFunctionCall().getOperands()) {
-        fixColumnNameCase(tableNameWithType, operand);
+        fixColumnName(tableNameWithType, operand);
+      }
+    }
+  }
+
+  private String getActualColumnName(String tableNameWithType, String columnName) {
+    String[] splits = StringUtils.split(columnName, ".", 2);
+    if (_enableCaseInsensitive) {
+      if (splits.length == 2) {
+        if (tableNameWithType.equalsIgnoreCase(splits[0]) || TableNameBuilder.extractRawTableName(tableNameWithType)
+            .equalsIgnoreCase(splits[0])) {
+          return _tableCache.getActualColumnName(tableNameWithType, splits[1]);
+        }
+      }
+      return _tableCache.getActualColumnName(tableNameWithType, columnName);
+    } else {
+      if (splits.length == 2) {
+        if (tableNameWithType.equals(splits[0]) || TableNameBuilder.extractRawTableName(tableNameWithType)
+            .equals(splits[0])) {
+          return splits[1];
+        }
       }
     }
+    return columnName;
   }
 
   private static Map<String, String> getOptionsFromJson(JsonNode request, String optionsKey) {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 9a7e49d..076a186 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1180,6 +1180,74 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   }
 
   @Test
+  public void testColumnNameContainsTableName() {
+    int daysSinceEpoch = 16138;
+    long secondsSinceEpoch = 16138 * 24 * 60 * 60;
+    List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
+        "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
+        "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
+        "SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
+    List<String> queries = new ArrayList<>();
+    baseQueries.stream().forEach(q -> queries.add(q.replace("DaysSinceEpoch", "mytable.DAYSSinceEpOch")));
+    baseQueries.stream().forEach(q -> queries.add(q.replace("DaysSinceEpoch", "mytable.DAYSSinceEpOch")));
+
+    // Wait for at most 10 seconds for broker to get the ZK callback of the schema change
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        for (String query : queries) {
+          JsonNode response = postQuery(query);
+          // NOTE: When table does not exist, we will get 'BrokerResourceMissingError'.
+          //       When column does not exist, all segments will be pruned and 'numSegmentsProcessed' will be 0.
+          return response.get("exceptions").size() == 0 && response.get("numSegmentsProcessed").asInt() > 0;
+        }
+      } catch (Exception e) {
+        // Fail the test when exception caught
+        throw new RuntimeException(e);
+      }
+      return true;
+    }, 10_000L, "Failed to get results for case-insensitive queries");
+  }
+
+  @Test
+  public void testCaseInsensitivityWithColumnNameContainsTableName() {
+    int daysSinceEpoch = 16138;
+    long secondsSinceEpoch = 16138 * 24 * 60 * 60;
+    List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000",
+        "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
+        "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
+        "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
+        "SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
+    List<String> queries = new ArrayList<>();
+    baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
+    baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
+
+    // Wait for at most 10 seconds for broker to get the ZK callback of the schema change
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        for (String query : queries) {
+          JsonNode response = postQuery(query);
+          // NOTE: When table does not exist, we will get 'BrokerResourceMissingError'.
+          //       When column does not exist, all segments will be pruned and 'numSegmentsProcessed' will be 0.
+          return response.get("exceptions").size() == 0 && response.get("numSegmentsProcessed").asInt() > 0;
+        }
+      } catch (Exception e) {
+        // Fail the test when exception caught
+        throw new RuntimeException(e);
+      }
+      return true;
+    }, 10_000L, "Failed to get results for case-insensitive queries");
+  }
+
+  @Test
   public void testQuerySourceWithDatabaseName()
       throws Exception {
     // by default 10 rows will be returned, so use high limit


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org