You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/07/08 00:41:57 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #7113: High Cardinality Order by Optimization

Jackie-Jiang commented on a change in pull request #7113:
URL: https://github.com/apache/incubator-pinot/pull/7113#discussion_r665779622



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
##########
@@ -97,14 +116,38 @@ public AggregationGroupByOrderByOperator run() {
     int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
     if (_transformPlanNode != null) {
       // Do not use star-tree
-      return new AggregationGroupByOrderByOperator(_aggregationFunctions, _groupByExpressions,
-          _maxInitialResultHolderCapacity, _numGroupsLimit, _minSegmentTrimSize, _transformPlanNode.run(), numTotalDocs,
-          _queryContext, false);
+      return new AggregationGroupByOrderByOperator(_indexSegment, _aggregationFunctions, _groupByExpressions,
+          _orderByExpressionContexts.toArray(new OrderByExpressionContext[0]), _maxInitialResultHolderCapacity,
+          _numGroupsLimit, _minSegmentTrimSize, _transformPlanNode.run(), numTotalDocs, _queryContext,
+          _enableGroupByOpt, false);
     } else {
       // Use star-tree
-      return new AggregationGroupByOrderByOperator(_aggregationFunctions, _groupByExpressions,
-          _maxInitialResultHolderCapacity, _numGroupsLimit, _minSegmentTrimSize, _starTreeTransformPlanNode.run(),
-          numTotalDocs, _queryContext, true);
+      return new AggregationGroupByOrderByOperator(_indexSegment, _aggregationFunctions, _groupByExpressions,
+          _orderByExpressionContexts.toArray(new OrderByExpressionContext[0]), _maxInitialResultHolderCapacity,
+          _numGroupsLimit, _minSegmentTrimSize, _starTreeTransformPlanNode.run(), numTotalDocs, _queryContext,
+          _enableGroupByOpt, true);
+    }
+  }
+
+  private boolean checkOrderByOptimization() {
+    if (_queryContext.getHavingFilter() != null) {
+      return false;
+    }
+    Set<ExpressionContext> orderByExpressionsSet = new HashSet<>();
+    // Filter out function expressions
+    for (OrderByExpressionContext orderByExpressionContext : _orderByExpressionContexts) {
+      ExpressionContext expression = orderByExpressionContext.getExpression();
+      if (expression.getType() == ExpressionContext.Type.FUNCTION) {

Review comment:
       We should only skip `AGGREGATION` function, not `TRANSFORM` function

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
##########
@@ -95,6 +96,8 @@ private static QueryContext convertSQL(BrokerRequest brokerRequest) {
       for (Expression thriftExpression : groupByList) {
         groupByExpressions.add(RequestContextUtils.getExpression(thriftExpression));
       }
+      pinotQuery.putToQueryOptions(CommonConstants.Broker.Request.QueryOptionKey.GROUP_BY_MODE,

Review comment:
       This should not be required because we have this check on the broker side to ensure the SQL query always have this option set

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
##########
@@ -87,6 +101,11 @@ public AggregationGroupByOrderByPlanNode(IndexSegment indexSegment, QueryContext
 
     Set<ExpressionContext> expressionsToTransform =
         AggregationFunctionUtils.collectExpressionsToTransform(_aggregationFunctions, _groupByExpressions);
+    _enableGroupByOpt = checkOrderByOptimization();

Review comment:
       Seems most of the checks for enabling the optimization rely on the column metadata, and has to happen in the operator. Let's move the whole check inside then. Splitting the checks in 2 places is hard to manage

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
##########
@@ -97,14 +116,38 @@ public AggregationGroupByOrderByOperator run() {
     int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
     if (_transformPlanNode != null) {
       // Do not use star-tree
-      return new AggregationGroupByOrderByOperator(_aggregationFunctions, _groupByExpressions,
-          _maxInitialResultHolderCapacity, _numGroupsLimit, _minSegmentTrimSize, _transformPlanNode.run(), numTotalDocs,
-          _queryContext, false);
+      return new AggregationGroupByOrderByOperator(_indexSegment, _aggregationFunctions, _groupByExpressions,
+          _orderByExpressionContexts.toArray(new OrderByExpressionContext[0]), _maxInitialResultHolderCapacity,
+          _numGroupsLimit, _minSegmentTrimSize, _transformPlanNode.run(), numTotalDocs, _queryContext,
+          _enableGroupByOpt, false);
     } else {
       // Use star-tree
-      return new AggregationGroupByOrderByOperator(_aggregationFunctions, _groupByExpressions,
-          _maxInitialResultHolderCapacity, _numGroupsLimit, _minSegmentTrimSize, _starTreeTransformPlanNode.run(),
-          numTotalDocs, _queryContext, true);
+      return new AggregationGroupByOrderByOperator(_indexSegment, _aggregationFunctions, _groupByExpressions,
+          _orderByExpressionContexts.toArray(new OrderByExpressionContext[0]), _maxInitialResultHolderCapacity,
+          _numGroupsLimit, _minSegmentTrimSize, _starTreeTransformPlanNode.run(), numTotalDocs, _queryContext,
+          _enableGroupByOpt, true);
+    }
+  }
+
+  private boolean checkOrderByOptimization() {
+    if (_queryContext.getHavingFilter() != null) {
+      return false;
+    }
+    Set<ExpressionContext> orderByExpressionsSet = new HashSet<>();
+    // Filter out function expressions
+    for (OrderByExpressionContext orderByExpressionContext : _orderByExpressionContexts) {
+      ExpressionContext expression = orderByExpressionContext.getExpression();
+      if (expression.getType() == ExpressionContext.Type.FUNCTION) {
+        return false;
+      }
+      orderByExpressionsSet.add(expression);
+    }
+    // Add group by expressions to order by expressions
+    for (ExpressionContext groupByExpression: _groupByExpressions) {
+      if (!orderByExpressionsSet.contains(groupByExpression)) {
+        _orderByExpressionContexts.add(new OrderByExpressionContext(groupByExpression, true));

Review comment:
       We should not directly modify the query context because it is shared among multiple segments, and can cause race condition. Ideally this rewrite should be done either on broker side or before the query planning on server side.
   For now, we can keep a local copy of the order by expressions list.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
##########
@@ -168,4 +198,226 @@ private int calculateMinSegmentTrimSize() {
     }
     return _minSegmentTrimSize;
   }
+
+  private TransformOperator constructTransformOperator() {
+    List<TransformResultMetadata> orderByExpressionMetadataList = new ArrayList<>();
+    for (OrderByExpressionContext orderByExpressionContext : _orderByExpressionContexts) {
+      ExpressionContext expression = orderByExpressionContext.getExpression();
+      TransformResultMetadata orderByExpressionMetadata = _transformOperator.getResultMetadata(expression);
+      // Only handle single value column now
+      if (!orderByExpressionMetadata.isSingleValue()) {
+        return _transformOperator;
+      }
+      orderByExpressionMetadataList.add(orderByExpressionMetadata);
+    }
+    return constructNewTransformOperator(orderByExpressionMetadataList.toArray(new TransformResultMetadata[0]));
+  }
+
+  /**
+   * Two pass approach for orderBy on groupBy columns. Fetch the orderBy columns to rank the top results
+   * whose docIds will be used to construct a new transform operator for aggregations.
+   */
+  private TransformOperator constructNewTransformOperator(TransformResultMetadata[] orderByExpressionMetadata) {
+    int numOrderByExpressions = _orderByExpressionContexts.length;
+    HashMap<Key, MutableRoaringBitmap> groupByKeyMap = new HashMap<>();
+    TransformBlock transformBlock;
+
+    Dictionary[] dictionaries = new Dictionary[numOrderByExpressions];
+    boolean[] hasDict = new boolean[numOrderByExpressions];
+    int numNoDict = 0;
+    long cardinalityProduct = 1L;
+    boolean longOverflow = false;
+    // Get dictionaries and calculate cardinalities
+    for (int i = 0; i < numOrderByExpressions; i++) {
+      ExpressionContext expression = _orderByExpressionContexts[i].getExpression();
+      hasDict[i] = orderByExpressionMetadata[i].hasDictionary();
+      if (hasDict[i]) {
+        dictionaries[i] = _transformOperator.getDictionary(expression);
+        int cardinality = dictionaries[i].length();
+        if (!longOverflow) {
+          if (cardinalityProduct > Long.MAX_VALUE / cardinality) {
+            longOverflow = true;
+          } else {
+            cardinalityProduct *= cardinality;
+          }
+        }
+      }
+      numNoDict += hasDict[i] ? 0 : 1;
+    }
+    //TODO: Determine reasonable threshold
+    if (!longOverflow && cardinalityProduct < _limit || cardinalityProduct < 500000) {
+      return _transformOperator;
+    }
+    BlockValSet[] blockValSets = new BlockValSet[numNoDict];
+    PriorityQueue<Object[]> PQ = new PriorityQueue<>(_limit,
+        getComparator(orderByExpressionMetadata, numOrderByExpressions, dictionaries, hasDict));
+    int[][] dictionaryIds = new int[numOrderByExpressions - numNoDict][];
+    while ((transformBlock = _transformOperator.nextBlock()) != null) {
+      int numDocsFetched = transformBlock.getNumDocs();
+      int[] docIds = transformBlock.getBlockValueSet("$docId").getIntValuesSV();
+      int dictionaryIdsIndex = 0;
+      // For dictionary-based columns, we fetch the dictionary ids. Otherwise fetch the actual value
+      for (int i = 0; i < numOrderByExpressions; i++) {
+        ExpressionContext expression = _orderByExpressionContexts[i].getExpression();
+        BlockValSet blockValSet = transformBlock.getBlockValueSet(expression);
+        if (hasDict[i]) {
+          dictionaryIds[dictionaryIdsIndex] = blockValSet.getDictionaryIdsSV();
+          dictionaryIdsIndex++;
+        } else {
+          blockValSets[i - dictionaryIdsIndex] = blockValSet;
+        }
+      }
+      RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
+      // TODO: Add special optimization for all dict condition
+      for (int i = 0; i < numDocsFetched; i++) {
+        int docId = docIds[i];
+        // Generate key based on the dictionary Id/fetched values
+        Object[] keys = new Object[numOrderByExpressions];
+        Object[] row = new Object[numNoDict];
+        if (numNoDict != 0) {
+          blockValueFetcher.getRow(i, row, 0);
+        }
+        dictionaryIdsIndex = 0;
+        for (int j = 0; j < numOrderByExpressions; j++) {
+          if (hasDict[j]) {
+            keys[j] = dictionaryIds[dictionaryIdsIndex][i];
+            dictionaryIdsIndex++;
+          } else {
+            keys[j] = row[j - dictionaryIdsIndex];
+          }
+        }
+        AddToObjectPriorityQueue(keys, docId, PQ, groupByKeyMap);
+      }
+    }
+    // Collect docIds
+    Collection<MutableRoaringBitmap> docIdList = groupByKeyMap.values();
+    int numDocs = 0;
+    MutableRoaringBitmap docIds = new MutableRoaringBitmap();
+    for (MutableRoaringBitmap filteredDocIds : docIdList) {
+      for (Integer docId : filteredDocIds) {
+        docIds.add(docId);
+        numDocs++;
+      }
+    }
+
+    // Make a new transform operator
+    Set<ExpressionContext> expressionsToTransform =
+        AggregationFunctionUtils.collectExpressionsToTransform(_aggregationFunctions, _groupByExpressions);
+    Set<String> columns = new HashSet<>();
+    for (ExpressionContext expression : expressionsToTransform) {
+      expression.getColumns(columns);
+    }
+    Map<String, DataSource> dataSourceMap = new HashMap<>();

Review comment:
       The `DataSource` should be retrieved from the `ProjectionOperator` instead of the segment because the star-tree won't share the same data source




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org