You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/07/16 07:18:27 UTC
[incubator-pinot] branch master updated: Allow Pinot to accept
query with FROM clause in the format of [database].[table] (#5707)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 48d9ca7 Allow Pinot to accept query with FROM clause in the format of [database].[table] (#5707)
48d9ca7 is described below
commit 48d9ca72c97e8689da238052dfc38c33100932fe
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Thu Jul 16 00:18:14 2020 -0700
Allow Pinot to accept query with FROM clause in the format of [database].[table] (#5707)
* Allow Pinot to accept query with FROM clause in the format of [database].[table]
* address comments
* Update pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
Co-authored-by: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
Co-authored-by: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
---
.../requesthandler/BaseBrokerRequestHandler.java | 42 ++++++++++++++++++++++
.../pinot/common/utils/helix/TableCache.java | 4 +++
.../tests/OfflineClusterIntegrationTest.java | 17 +++++++--
3 files changed, 61 insertions(+), 2 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 13e170c..9654554 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -201,6 +201,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
e.getMessage());
}
}
+ updateQuerySource(brokerRequest);
if (_enableCaseInsensitive) {
try {
handleCaseSensitivity(brokerRequest);
@@ -439,6 +440,47 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
/**
+ * Check if table is in the format of [database_name].[table_name].
+ *
+ * Only update TableName in QuerySource if there is no existing table in the format of [database_name].[table_name],
+ * but only [table_name].
+ *
+ * @param brokerRequest
+ */
+ private void updateQuerySource(BrokerRequest brokerRequest) {
+ String tableName = brokerRequest.getQuerySource().getTableName();
+ // Check if table is in the format of [database_name].[table_name]
+ String[] tableNameSplits = StringUtils.split(tableName, '.');
+ if (tableNameSplits.length != 2) {
+ return;
+ }
+ // Update table name if there is no existing table in the format of [database_name].[table_name] but only [table_name]
+ if (_enableCaseInsensitive) {
+ if (_tableCache.containsTable(tableNameSplits[1]) && !_tableCache.containsTable(tableName)) {
+ // Use TableCache to check case insensitive table name.
+ brokerRequest.getQuerySource().setTableName(tableNameSplits[1]);
+ }
+ return;
+ }
+ // Use RoutingManager to check case sensitive table name.
+ if (TableNameBuilder.isTableResource(tableName)) {
+ if (_routingManager.routingExists(tableNameSplits[1]) && !_routingManager.routingExists(tableName)) {
+ brokerRequest.getQuerySource().setTableName(tableNameSplits[1]);
+ }
+ return;
+ }
+ if (_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(tableNameSplits[1]))
+ && !_routingManager.routingExists(TableNameBuilder.REALTIME.tableNameWithType(tableName))) {
+ brokerRequest.getQuerySource().setTableName(tableNameSplits[1]);
+ return;
+ }
+ if (_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableNameSplits[1]))
+ && !_routingManager.routingExists(TableNameBuilder.OFFLINE.tableNameWithType(tableName))) {
+ brokerRequest.getQuerySource().setTableName(tableNameSplits[1]);
+ }
+ }
+
+ /**
* Set Log2m value for DistinctCountHLL Function
* @param brokerRequest
* @param hllLog2mOverride
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
index 6f46ac2..4e3dd4d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/TableCache.java
@@ -67,6 +67,10 @@ public class TableCache {
return _tableConfigChangeListener._tableNameMap.getOrDefault(tableName.toLowerCase(), tableName);
}
+ public boolean containsTable(String tableName) {
+ return _tableConfigChangeListener._tableNameMap.containsKey(tableName.toLowerCase());
+ }
+
public String getActualColumnName(String tableName, String columnName) {
String schemaName = _tableConfigChangeListener._table2SchemaConfigMap.get(tableName.toLowerCase());
if (schemaName != null) {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 774f209..4408ce2 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1133,7 +1133,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
public void testCaseInsensitivity() {
int daysSinceEpoch = 16138;
long secondsSinceEpoch = 16138 * 24 * 60 * 60;
- List<String> queries = Arrays.asList("SELECT * FROM mytable",
+ List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
@@ -1142,7 +1142,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
"SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
"SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
"SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
- queries.replaceAll(query -> query.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch"));
+ List<String> queries = new ArrayList<>();
+ baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
+ baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
// Wait for at most 10 seconds for broker to get the ZK callback of the schema change
TestUtils.waitForCondition(aVoid -> {
@@ -1162,6 +1164,17 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}
@Test
+ public void testQuerySourceWithDatabaseName()
+ throws Exception {
+ // by default 10 rows will be returned, so use high limit
+ String pql = "SELECT DISTINCT(Carrier) FROM mytable LIMIT 1000000";
+ String sql = "SELECT DISTINCT Carrier FROM mytable";
+ testQuery(pql, Collections.singletonList(sql));
+ pql = "SELECT DISTINCT Carrier FROM db.mytable LIMIT 1000000";
+ testSqlQuery(pql, Collections.singletonList(sql));
+ }
+
+ @Test
public void testDistinctCountHll()
throws Exception {
String query;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org