You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/11/01 19:01:58 UTC
[incubator-pinot] branch master updated: Add IN_SUBQUERY support
(#6022)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d586801 Add IN_SUBQUERY support (#6022)
d586801 is described below
commit d586801a29f33ea6852c7a8e86618fd409b42ae2
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sun Nov 1 11:01:43 2020 -0800
Add IN_SUBQUERY support (#6022)
Add `IN_SUBQUERY` transform function to support `IDSET` aggregation function as the subquery. The subquery is handled as a separate query on broker side.
E.g. The following 2 queries can be combined into one query:
`SELECT ID_SET(col) FROM table WHERE date = 20200901`
`SELECT DISTINCT_COUNT(col), date FROM table WHERE IN_ID_SET(col, '<serializedIdSet>') = 1 GROUP BY date`
->
`SELECT DISTINCT_COUNT(col), date FROM table WHERE IN_SUBQUERY(col, 'SELECT ID_SET(col) FROM table WHERE date = 20200901') = 1 GROUP BY date`
---
.../requesthandler/BaseBrokerRequestHandler.java | 193 ++++++++++++++++++---
.../tests/BaseClusterIntegrationTestSet.java | 43 +++--
2 files changed, 198 insertions(+), 38 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 3dfa4f1..ab90ccb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -19,7 +19,9 @@
package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.util.concurrent.RateLimiter;
import java.net.InetAddress;
@@ -48,6 +50,7 @@ import org.apache.pinot.broker.routing.RoutingTable;
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.function.TransformFunctionType;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerQueryPhase;
@@ -75,6 +78,7 @@ import org.apache.pinot.common.utils.helix.TableCache;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.reduce.BrokerReduceService;
+import org.apache.pinot.core.query.utils.idset.IdSets;
import org.apache.pinot.core.requesthandler.BrokerRequestOptimizer;
import org.apache.pinot.core.requesthandler.PinotQueryParserFactory;
import org.apache.pinot.core.requesthandler.PinotQueryRequest;
@@ -89,9 +93,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@SuppressWarnings("UnstableApiUsage")
@ThreadSafe
public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseBrokerRequestHandler.class);
+ private static final String IN_SUBQUERY = "inSubquery";
protected final PinotConfiguration _config;
protected final RoutingManager _routingManager;
@@ -160,7 +166,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
}
- @SuppressWarnings("Duplicates")
@Override
public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity,
RequestStatistics requestStatistics)
@@ -186,6 +191,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
requestStatistics.setErrorCode(QueryException.PQL_PARSING_ERROR_CODE);
return new BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR, e));
}
+ setOptions(requestId, query, request, brokerRequest);
+
if (isLiteralOnlyQuery(brokerRequest)) {
LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query);
try {
@@ -197,11 +204,27 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
e.getMessage());
}
}
+
+ // TODO:
+ // Separate the query rewrite for SQL and PQL and only rewrite one format (PinotQuery for SQL, BrokerRequest for
+ // PQL) to save the unnecessary overhead.
+ // Prerequisite:
+ // Support filter optimizer for SQL. Currently the filter is always picked from the BrokerRequest.
+
+ try {
+ handleSubquery(brokerRequest, request, requesterIdentity, requestStatistics);
+ } catch (Exception e) {
+ LOGGER
+ .info("Caught exception while handling the subquery in request {}: {}, {}", requestId, query, e.getMessage());
+ requestStatistics.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
+ return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
+ }
+
updateTableName(brokerRequest);
try {
updateColumnNames(brokerRequest);
} catch (Exception e) {
- LOGGER.warn("Caught exception while updating Column names in Query {}: {}, {}", requestId, query, e);
+ LOGGER.warn("Caught exception while updating Column names in Query {}: {}, {}", requestId, query, e.getMessage());
}
if (_defaultHllLog2m > 0) {
handleHyperloglogLog2mOverride(brokerRequest, _defaultHllLog2m);
@@ -284,9 +307,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e));
}
- // Set extra settings into broker request
- setOptions(requestId, query, request, brokerRequest);
-
// Optimize the query
// TODO: get time column name from schema or table config so that we can apply it for REALTIME only case
// We get timeColumnName from time boundary service currently, which only exists for offline table
@@ -437,6 +457,138 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
/**
+ * Handles the subquery in the given broker request.
+ * <p>Currently only supports subquery within the filter.
+ */
+ private void handleSubquery(BrokerRequest brokerRequest, JsonNode jsonRequest,
+ @Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
+ throws Exception {
+ FilterQueryMap filterSubQueryMap = brokerRequest.getFilterSubQueryMap();
+ if (filterSubQueryMap != null) {
+ for (FilterQuery filterQuery : filterSubQueryMap.getFilterQueryMap().values()) {
+ String column = filterQuery.getColumn();
+ if (column != null) {
+ TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column);
+ handleSubquery(expression, jsonRequest, requesterIdentity, requestStatistics);
+ filterQuery.setColumn(expression.toString());
+ }
+ }
+ }
+
+ PinotQuery pinotQuery = brokerRequest.getPinotQuery();
+ if (pinotQuery != null) {
+ Expression filterExpression = pinotQuery.getFilterExpression();
+ if (filterExpression != null) {
+ handleSubquery(filterExpression, jsonRequest, requesterIdentity, requestStatistics);
+ }
+ }
+ }
+
+ /**
+ * Handles the subquery in the given PQL expression.
+ * <p>When subquery is detected, first executes the subquery and gets the response, then rewrites the expression with
+ * the subquery response.
+ * <p>Currently only supports ID_SET subquery within the IN_SUBQUERY transform function, which will be rewritten to an
+ * IN_ID_SET transform function.
+ */
+ private void handleSubquery(TransformExpressionTree expression, JsonNode jsonRequest,
+ @Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
+ throws Exception {
+ if (expression.getExpressionType() != TransformExpressionTree.ExpressionType.FUNCTION) {
+ return;
+ }
+ List<TransformExpressionTree> children = expression.getChildren();
+ if (StringUtils.remove(expression.getValue(), '_').equalsIgnoreCase(IN_SUBQUERY)) {
+ Preconditions.checkState(children.size() == 2, "IN_SUBQUERY requires 2 arguments: expression, subquery");
+ TransformExpressionTree subqueryExpression = children.get(1);
+ Preconditions.checkState(subqueryExpression.getExpressionType() == TransformExpressionTree.ExpressionType.LITERAL,
+ "Second argument of IN_SUBQUERY must be a literal (subquery)");
+ String serializedIdSet =
+ getSerializedIdSetFromSubquery(subqueryExpression.getValue(), jsonRequest, requesterIdentity,
+ requestStatistics);
+ expression.setValue(TransformFunctionType.INIDSET.name());
+ children
+ .set(1, new TransformExpressionTree(TransformExpressionTree.ExpressionType.LITERAL, serializedIdSet, null));
+ } else {
+ for (TransformExpressionTree child : children) {
+ handleSubquery(child, jsonRequest, requesterIdentity, requestStatistics);
+ }
+ }
+ }
+
+ /**
+ * Handles the subquery in the given SQL expression.
+ * <p>When subquery is detected, first executes the subquery and gets the response, then rewrites the expression with
+ * the subquery response.
+ * <p>Currently only supports ID_SET subquery within the IN_SUBQUERY transform function, which will be rewritten to an
+ * IN_ID_SET transform function.
+ */
+ private void handleSubquery(Expression expression, JsonNode jsonRequest,
+ @Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
+ throws Exception {
+ Function function = expression.getFunctionCall();
+ if (function == null) {
+ return;
+ }
+ List<Expression> operands = function.getOperands();
+ if (StringUtils.remove(function.getOperator(), '_').equalsIgnoreCase(IN_SUBQUERY)) {
+ Preconditions.checkState(operands.size() == 2, "IN_SUBQUERY requires 2 arguments: expression, subquery");
+ Literal subqueryLiteral = operands.get(1).getLiteral();
+ Preconditions.checkState(subqueryLiteral != null, "Second argument of IN_SUBQUERY must be a literal (subquery)");
+ String serializedIdSet =
+ getSerializedIdSetFromSubquery(subqueryLiteral.getStringValue(), jsonRequest, requesterIdentity,
+ requestStatistics);
+ function.setOperator(TransformFunctionType.INIDSET.name());
+ operands.set(1, RequestUtils.getLiteralExpression(serializedIdSet));
+ } else {
+ for (Expression operand : operands) {
+ handleSubquery(operand, jsonRequest, requesterIdentity, requestStatistics);
+ }
+ }
+ }
+
+ /**
+ * Returns the result serialized IdSet of the subquery.
+ * <p>The subquery should be an aggregation-only query with one single IdSet aggregation function.
+ */
+ private String getSerializedIdSetFromSubquery(String subquery, JsonNode jsonRequest,
+ @Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
+ throws Exception {
+ // Make a copy of the query request to construct the subquery request so that they share the query options
+ ObjectNode subqueryRequest = jsonRequest.deepCopy();
+
+ if (subqueryRequest.has(Broker.Request.SQL)) {
+ subqueryRequest.put(Broker.Request.SQL, subquery);
+ } else {
+ subqueryRequest.put(Broker.Request.PQL, subquery);
+ }
+
+ BrokerResponseNative response =
+ (BrokerResponseNative) handleRequest(subqueryRequest, requesterIdentity, requestStatistics);
+ if (response.getExceptionsSize() != 0) {
+ throw new RuntimeException("Caught exception while executing subquery: " + subquery);
+ }
+
+ String serializedIdSet;
+ if (response.getResultTable() != null) {
+ serializedIdSet = (String) response.getResultTable().getRows().get(0)[0];
+ } else if (response.getAggregationResults() != null) {
+ serializedIdSet = (String) response.getAggregationResults().get(0).getValue();
+ } else {
+ throw new RuntimeException("Failed to get serialized IdSet from subquery: " + subquery);
+ }
+
+ try {
+ Preconditions.checkNotNull(IdSets.fromBase64String(serializedIdSet));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Invalid serialized IdSet: " + serializedIdSet + " returned from the subquery: " + subquery);
+ }
+
+ return serializedIdSet;
+ }
+
+ /**
* Check if table is in the format of [database_name].[table_name].
*
* Only update TableName in QuerySource if there is no existing table in the format of [database_name].[table_name],
@@ -499,11 +651,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
/**
- * Set Log2m value for DistinctCountHLL Function
- * @param brokerRequest
- * @param hllLog2mOverride
+ * Sets HyperLogLog log2m for DistinctCountHLL functions if not explicitly set.
*/
- static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int hllLog2mOverride) {
+ private static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int hllLog2mOverride) {
if (brokerRequest.getAggregationsInfo() != null) {
for (AggregationInfo aggregationInfo : brokerRequest.getAggregationsInfo()) {
switch (aggregationInfo.getAggregationType().toUpperCase()) {
@@ -524,6 +674,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
}
+ /**
+ * Sets HyperLogLog log2m for DistinctCountHLL functions if not explicitly set.
+ */
private static void updateDistinctCountHllExpr(Expression expr, int hllLog2mOverride) {
Function functionCall = expr.getFunctionCall();
if (functionCall == null) {
@@ -547,11 +700,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
/**
- * Reset limit for selection query if it exceeds maxQuerySelectionLimit.
- * @param brokerRequest
- * @param queryLimit
- *
+ * Overrides the LIMIT/TOP of the query if it exceeds the query limit.
*/
+ @VisibleForTesting
static void handleQueryLimitOverride(BrokerRequest brokerRequest, int queryLimit) {
if (queryLimit > 0) {
// Handle GroupBy for BrokerRequest
@@ -576,7 +727,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
/**
- * Helper method to rewrite 'DistinctCount' with 'DistinctCountBitmap' for the given broker request.
+ * Rewrites 'DistinctCount' to 'DistinctCountBitmap' for the given broker request.
*/
private static void handleDistinctCountBitmapOverride(BrokerRequest brokerRequest) {
List<AggregationInfo> aggregationsInfo = brokerRequest.getAggregationsInfo();
@@ -607,7 +758,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
/**
- * Helper method to rewrite 'DistinctCount' with 'DistinctCountBitmap' for the given expression.
+ * Rewrites 'DistinctCount' with 'DistinctCountBitmap' for the given expression.
*/
private static void handleDistinctCountBitmapOverride(Expression expression) {
Function function = expression.getFunctionCall();
@@ -625,10 +776,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
/**
- * Check if a SQL parsed BrokerRequest is a literal only query.
- * @param brokerRequest
- * @return true if this query selects only Literals
- *
+ * Returns {@code true} if the given query only contains literal, {@code false} otherwise.
*/
@VisibleForTesting
static boolean isLiteralOnlyQuery(BrokerRequest brokerRequest) {
@@ -644,12 +792,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
/**
- * Compute BrokerResponse for literal only query.
- *
- * @param brokerRequest
- * @param compilationStartTimeNs
- * @param requestStatistics
- * @return BrokerResponse
+ * Processes the literal only query.
*/
private BrokerResponse processLiteralOnlyBrokerRequest(BrokerRequest brokerRequest, long compilationStartTimeNs,
RequestStatistics requestStatistics)
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index b0a6ed2..4195674 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -254,19 +254,36 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
testSqlQuery(query, Collections.singletonList(query));
// IN_ID_SET
- IdSet idSet = IdSets.create(FieldSpec.DataType.LONG);
- idSet.add(19690L);
- idSet.add(20355L);
- idSet.add(21171L);
- // Also include a non-existing id
- idSet.add(0L);
- String serializedIdSet = idSet.toBase64String();
- String inIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 1";
- String inQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID IN (19690, 20355, 21171, 0)";
- testSqlQuery(inIdSetQuery, Collections.singletonList(inQuery));
- String notInIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 0";
- String notInQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID NOT IN (19690, 20355, 21171, 0)";
- testSqlQuery(notInIdSetQuery, Collections.singletonList(notInQuery));
+ {
+ IdSet idSet = IdSets.create(FieldSpec.DataType.LONG);
+ idSet.add(19690L);
+ idSet.add(20355L);
+ idSet.add(21171L);
+ // Also include a non-existing id
+ idSet.add(0L);
+ String serializedIdSet = idSet.toBase64String();
+ String inIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 1";
+ String inQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID IN (19690, 20355, 21171, 0)";
+ testSqlQuery(inIdSetQuery, Collections.singletonList(inQuery));
+ String notInIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 0";
+ String notInQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID NOT IN (19690, 20355, 21171, 0)";
+ testSqlQuery(notInIdSetQuery, Collections.singletonList(notInQuery));
+ }
+
+ // IN_SUBQUERY
+ {
+ String inSubqueryQuery =
+ "SELECT COUNT(*) FROM mytable WHERE INSUBQUERY(DestAirportID, 'SELECT IDSET(DestAirportID) FROM mytable WHERE DaysSinceEpoch = 16430') = 1";
+ String inQuery =
+ "SELECT COUNT(*) FROM mytable WHERE DestAirportID IN (SELECT DestAirportID FROM mytable WHERE DaysSinceEpoch = 16430)";
+ testSqlQuery(inSubqueryQuery, Collections.singletonList(inQuery));
+
+ String notInSubqueryQuery =
+ "SELECT COUNT(*) FROM mytable WHERE INSUBQUERY(DestAirportID, 'SELECT IDSET(DestAirportID) FROM mytable WHERE DaysSinceEpoch = 16430') = 0";
+ String notInQuery =
+ "SELECT COUNT(*) FROM mytable WHERE DestAirportID NOT IN (SELECT DestAirportID FROM mytable WHERE DaysSinceEpoch = 16430)";
+ testSqlQuery(notInSubqueryQuery, Collections.singletonList(notInQuery));
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org