You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/11/01 19:01:58 UTC

[incubator-pinot] branch master updated: Add IN_SUBQUERY support (#6022)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d586801  Add IN_SUBQUERY support (#6022)
d586801 is described below

commit d586801a29f33ea6852c7a8e86618fd409b42ae2
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sun Nov 1 11:01:43 2020 -0800

    Add IN_SUBQUERY support (#6022)
    
    Add `IN_SUBQUERY` transform function to support `IDSET` aggregation function as the subquery. The subquery is handled as a separate query on broker side.
    
    E.g. The following 2 queries can be combined into one query:
    `SELECT ID_SET(col) FROM table WHERE date = 20200901`
    `SELECT DISTINCT_COUNT(col), date FROM table WHERE IN_ID_SET(col, '<serializedIdSet>') = 1 GROUP BY date`
    ->
    `SELECT DISTINCT_COUNT(col), date FROM table WHERE IN_SUBQUERY(col, 'SELECT ID_SET(col) FROM table WHERE date = 20200901') = 1 GROUP BY date`
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 193 ++++++++++++++++++---
 .../tests/BaseClusterIntegrationTestSet.java       |  43 +++--
 2 files changed, 198 insertions(+), 38 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 3dfa4f1..ab90ccb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -19,7 +19,9 @@
 package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.util.concurrent.RateLimiter;
 import java.net.InetAddress;
@@ -48,6 +50,7 @@ import org.apache.pinot.broker.routing.RoutingTable;
 import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.function.TransformFunctionType;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerQueryPhase;
@@ -75,6 +78,7 @@ import org.apache.pinot.common.utils.helix.TableCache;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.reduce.BrokerReduceService;
+import org.apache.pinot.core.query.utils.idset.IdSets;
 import org.apache.pinot.core.requesthandler.BrokerRequestOptimizer;
 import org.apache.pinot.core.requesthandler.PinotQueryParserFactory;
 import org.apache.pinot.core.requesthandler.PinotQueryRequest;
@@ -89,9 +93,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+@SuppressWarnings("UnstableApiUsage")
 @ThreadSafe
 public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(BaseBrokerRequestHandler.class);
+  private static final String IN_SUBQUERY = "inSubquery";
 
   protected final PinotConfiguration _config;
   protected final RoutingManager _routingManager;
@@ -160,7 +166,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     }
   }
 
-  @SuppressWarnings("Duplicates")
   @Override
   public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity,
       RequestStatistics requestStatistics)
@@ -186,6 +191,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       requestStatistics.setErrorCode(QueryException.PQL_PARSING_ERROR_CODE);
       return new BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR, e));
     }
+    setOptions(requestId, query, request, brokerRequest);
+
     if (isLiteralOnlyQuery(brokerRequest)) {
       LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query);
       try {
@@ -197,11 +204,27 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
                 e.getMessage());
       }
     }
+
+    // TODO:
+    //   Separate the query rewrite for SQL and PQL and only rewrite one format (PinotQuery for SQL, BrokerRequest for
+    //   PQL) to save the unnecessary overhead.
+    //   Prerequisite:
+    //     Support filter optimizer for SQL. Currently the filter is always picked from the BrokerRequest.
+
+    try {
+      handleSubquery(brokerRequest, request, requesterIdentity, requestStatistics);
+    } catch (Exception e) {
+      LOGGER
+          .info("Caught exception while handling the subquery in request {}: {}, {}", requestId, query, e.getMessage());
+      requestStatistics.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
+      return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
+    }
+
     updateTableName(brokerRequest);
     try {
       updateColumnNames(brokerRequest);
     } catch (Exception e) {
-      LOGGER.warn("Caught exception while updating Column names in Query {}: {}, {}", requestId, query, e);
+      LOGGER.warn("Caught exception while updating Column names in Query {}: {}, {}", requestId, query, e.getMessage());
     }
     if (_defaultHllLog2m > 0) {
       handleHyperloglogLog2mOverride(brokerRequest, _defaultHllLog2m);
@@ -284,9 +307,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e));
     }
 
-    // Set extra settings into broker request
-    setOptions(requestId, query, request, brokerRequest);
-
     // Optimize the query
     // TODO: get time column name from schema or table config so that we can apply it for REALTIME only case
     // We get timeColumnName from time boundary service currently, which only exists for offline table
@@ -437,6 +457,138 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   }
 
   /**
+   * Handles the subquery in the given broker request.
+   * <p>Currently only supports subquery within the filter.
+   */
+  private void handleSubquery(BrokerRequest brokerRequest, JsonNode jsonRequest,
+      @Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
+      throws Exception {
+    FilterQueryMap filterSubQueryMap = brokerRequest.getFilterSubQueryMap();
+    if (filterSubQueryMap != null) {
+      for (FilterQuery filterQuery : filterSubQueryMap.getFilterQueryMap().values()) {
+        String column = filterQuery.getColumn();
+        if (column != null) {
+          TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column);
+          handleSubquery(expression, jsonRequest, requesterIdentity, requestStatistics);
+          filterQuery.setColumn(expression.toString());
+        }
+      }
+    }
+
+    PinotQuery pinotQuery = brokerRequest.getPinotQuery();
+    if (pinotQuery != null) {
+      Expression filterExpression = pinotQuery.getFilterExpression();
+      if (filterExpression != null) {
+        handleSubquery(filterExpression, jsonRequest, requesterIdentity, requestStatistics);
+      }
+    }
+  }
+
+  /**
+   * Handles the subquery in the given PQL expression.
+   * <p>When subquery is detected, first executes the subquery and gets the response, then rewrites the expression with
+   * the subquery response.
+   * <p>Currently only supports ID_SET subquery within the IN_SUBQUERY transform function, which will be rewritten to an
+   * IN_ID_SET transform function.
+   */
+  private void handleSubquery(TransformExpressionTree expression, JsonNode jsonRequest,
+      @Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
+      throws Exception {
+    if (expression.getExpressionType() != TransformExpressionTree.ExpressionType.FUNCTION) {
+      return;
+    }
+    List<TransformExpressionTree> children = expression.getChildren();
+    if (StringUtils.remove(expression.getValue(), '_').equalsIgnoreCase(IN_SUBQUERY)) {
+      Preconditions.checkState(children.size() == 2, "IN_SUBQUERY requires 2 arguments: expression, subquery");
+      TransformExpressionTree subqueryExpression = children.get(1);
+      Preconditions.checkState(subqueryExpression.getExpressionType() == TransformExpressionTree.ExpressionType.LITERAL,
+          "Second argument of IN_SUBQUERY must be a literal (subquery)");
+      String serializedIdSet =
+          getSerializedIdSetFromSubquery(subqueryExpression.getValue(), jsonRequest, requesterIdentity,
+              requestStatistics);
+      expression.setValue(TransformFunctionType.INIDSET.name());
+      children
+          .set(1, new TransformExpressionTree(TransformExpressionTree.ExpressionType.LITERAL, serializedIdSet, null));
+    } else {
+      for (TransformExpressionTree child : children) {
+        handleSubquery(child, jsonRequest, requesterIdentity, requestStatistics);
+      }
+    }
+  }
+
+  /**
+   * Handles the subquery in the given SQL expression.
+   * <p>When subquery is detected, first executes the subquery and gets the response, then rewrites the expression with
+   * the subquery response.
+   * <p>Currently only supports ID_SET subquery within the IN_SUBQUERY transform function, which will be rewritten to an
+   * IN_ID_SET transform function.
+   */
+  private void handleSubquery(Expression expression, JsonNode jsonRequest,
+      @Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
+      throws Exception {
+    Function function = expression.getFunctionCall();
+    if (function == null) {
+      return;
+    }
+    List<Expression> operands = function.getOperands();
+    if (StringUtils.remove(function.getOperator(), '_').equalsIgnoreCase(IN_SUBQUERY)) {
+      Preconditions.checkState(operands.size() == 2, "IN_SUBQUERY requires 2 arguments: expression, subquery");
+      Literal subqueryLiteral = operands.get(1).getLiteral();
+      Preconditions.checkState(subqueryLiteral != null, "Second argument of IN_SUBQUERY must be a literal (subquery)");
+      String serializedIdSet =
+          getSerializedIdSetFromSubquery(subqueryLiteral.getStringValue(), jsonRequest, requesterIdentity,
+              requestStatistics);
+      function.setOperator(TransformFunctionType.INIDSET.name());
+      operands.set(1, RequestUtils.getLiteralExpression(serializedIdSet));
+    } else {
+      for (Expression operand : operands) {
+        handleSubquery(operand, jsonRequest, requesterIdentity, requestStatistics);
+      }
+    }
+  }
+
+  /**
+   * Returns the result serialized IdSet of the subquery.
+   * <p>The subquery should be an aggregation-only query with one single IdSet aggregation function.
+   */
+  private String getSerializedIdSetFromSubquery(String subquery, JsonNode jsonRequest,
+      @Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
+      throws Exception {
+    // Make a copy of the query request to construct the subquery request so that they share the query options
+    ObjectNode subqueryRequest = jsonRequest.deepCopy();
+
+    if (subqueryRequest.has(Broker.Request.SQL)) {
+      subqueryRequest.put(Broker.Request.SQL, subquery);
+    } else {
+      subqueryRequest.put(Broker.Request.PQL, subquery);
+    }
+
+    BrokerResponseNative response =
+        (BrokerResponseNative) handleRequest(subqueryRequest, requesterIdentity, requestStatistics);
+    if (response.getExceptionsSize() != 0) {
+      throw new RuntimeException("Caught exception while executing subquery: " + subquery);
+    }
+
+    String serializedIdSet;
+    if (response.getResultTable() != null) {
+      serializedIdSet = (String) response.getResultTable().getRows().get(0)[0];
+    } else if (response.getAggregationResults() != null) {
+      serializedIdSet = (String) response.getAggregationResults().get(0).getValue();
+    } else {
+      throw new RuntimeException("Failed to get serialized IdSet from subquery: " + subquery);
+    }
+
+    try {
+      Preconditions.checkNotNull(IdSets.fromBase64String(serializedIdSet));
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Invalid serialized IdSet: " + serializedIdSet + " returned from the subquery: " + subquery);
+    }
+
+    return serializedIdSet;
+  }
+
+  /**
    * Check if table is in the format of [database_name].[table_name].
    *
    * Only update TableName in QuerySource if there is no existing table in the format of [database_name].[table_name],
@@ -499,11 +651,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   }
 
   /**
-   * Set Log2m value for DistinctCountHLL Function
-   * @param brokerRequest
-   * @param hllLog2mOverride
+   * Sets HyperLogLog log2m for DistinctCountHLL functions if not explicitly set.
    */
-  static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int hllLog2mOverride) {
+  private static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int hllLog2mOverride) {
     if (brokerRequest.getAggregationsInfo() != null) {
       for (AggregationInfo aggregationInfo : brokerRequest.getAggregationsInfo()) {
         switch (aggregationInfo.getAggregationType().toUpperCase()) {
@@ -524,6 +674,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     }
   }
 
+  /**
+   * Sets HyperLogLog log2m for DistinctCountHLL functions if not explicitly set.
+   */
   private static void updateDistinctCountHllExpr(Expression expr, int hllLog2mOverride) {
     Function functionCall = expr.getFunctionCall();
     if (functionCall == null) {
@@ -547,11 +700,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   }
 
   /**
-   * Reset limit for selection query if it exceeds maxQuerySelectionLimit.
-   * @param brokerRequest
-   * @param queryLimit
-   *
+   * Overrides the LIMIT/TOP of the query if it exceeds the query limit.
    */
+  @VisibleForTesting
   static void handleQueryLimitOverride(BrokerRequest brokerRequest, int queryLimit) {
     if (queryLimit > 0) {
       // Handle GroupBy for BrokerRequest
@@ -576,7 +727,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   }
 
   /**
-   * Helper method to rewrite 'DistinctCount' with 'DistinctCountBitmap' for the given broker request.
+   * Rewrites 'DistinctCount' to 'DistinctCountBitmap' for the given broker request.
    */
   private static void handleDistinctCountBitmapOverride(BrokerRequest brokerRequest) {
     List<AggregationInfo> aggregationsInfo = brokerRequest.getAggregationsInfo();
@@ -607,7 +758,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   }
 
   /**
-   * Helper method to rewrite 'DistinctCount' with 'DistinctCountBitmap' for the given expression.
+   * Rewrites 'DistinctCount' with 'DistinctCountBitmap' for the given expression.
    */
   private static void handleDistinctCountBitmapOverride(Expression expression) {
     Function function = expression.getFunctionCall();
@@ -625,10 +776,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   }
 
   /**
-   * Check if a SQL parsed BrokerRequest is a literal only query.
-   * @param brokerRequest
-   * @return true if this query selects only Literals
-   *
+   * Returns {@code true} if the given query only contains literal, {@code false} otherwise.
    */
   @VisibleForTesting
   static boolean isLiteralOnlyQuery(BrokerRequest brokerRequest) {
@@ -644,12 +792,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   }
 
   /**
-   * Compute BrokerResponse for literal only query.
-   *
-   * @param brokerRequest
-   * @param compilationStartTimeNs
-   * @param requestStatistics
-   * @return BrokerResponse
+   * Processes the literal only query.
    */
   private BrokerResponse processLiteralOnlyBrokerRequest(BrokerRequest brokerRequest, long compilationStartTimeNs,
       RequestStatistics requestStatistics)
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index b0a6ed2..4195674 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -254,19 +254,36 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
     testSqlQuery(query, Collections.singletonList(query));
 
     // IN_ID_SET
-    IdSet idSet = IdSets.create(FieldSpec.DataType.LONG);
-    idSet.add(19690L);
-    idSet.add(20355L);
-    idSet.add(21171L);
-    // Also include a non-existing id
-    idSet.add(0L);
-    String serializedIdSet = idSet.toBase64String();
-    String inIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 1";
-    String inQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID IN (19690, 20355, 21171, 0)";
-    testSqlQuery(inIdSetQuery, Collections.singletonList(inQuery));
-    String notInIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 0";
-    String notInQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID NOT IN (19690, 20355, 21171, 0)";
-    testSqlQuery(notInIdSetQuery, Collections.singletonList(notInQuery));
+    {
+      IdSet idSet = IdSets.create(FieldSpec.DataType.LONG);
+      idSet.add(19690L);
+      idSet.add(20355L);
+      idSet.add(21171L);
+      // Also include a non-existing id
+      idSet.add(0L);
+      String serializedIdSet = idSet.toBase64String();
+      String inIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 1";
+      String inQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID IN (19690, 20355, 21171, 0)";
+      testSqlQuery(inIdSetQuery, Collections.singletonList(inQuery));
+      String notInIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 0";
+      String notInQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID NOT IN (19690, 20355, 21171, 0)";
+      testSqlQuery(notInIdSetQuery, Collections.singletonList(notInQuery));
+    }
+
+    // IN_SUBQUERY
+    {
+      String inSubqueryQuery =
+          "SELECT COUNT(*) FROM mytable WHERE INSUBQUERY(DestAirportID, 'SELECT IDSET(DestAirportID) FROM mytable WHERE DaysSinceEpoch = 16430') = 1";
+      String inQuery =
+          "SELECT COUNT(*) FROM mytable WHERE DestAirportID IN (SELECT DestAirportID FROM mytable WHERE DaysSinceEpoch = 16430)";
+      testSqlQuery(inSubqueryQuery, Collections.singletonList(inQuery));
+
+      String notInSubqueryQuery =
+          "SELECT COUNT(*) FROM mytable WHERE INSUBQUERY(DestAirportID, 'SELECT IDSET(DestAirportID) FROM mytable WHERE DaysSinceEpoch = 16430') = 0";
+      String notInQuery =
+          "SELECT COUNT(*) FROM mytable WHERE DestAirportID NOT IN (SELECT DestAirportID FROM mytable WHERE DaysSinceEpoch = 16430)";
+      testSqlQuery(notInSubqueryQuery, Collections.singletonList(notInQuery));
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org