You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/07/08 04:13:43 UTC

[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #5661: Optimize selection order-by when not all selected expressions are ordered

mayankshriv commented on a change in pull request #5661:
URL: https://github.com/apache/incubator-pinot/pull/5661#discussion_r451270291



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
##########
@@ -143,24 +168,145 @@ public SelectionOrderByOperator(IndexSegment indexSegment, QueryContext queryCon
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
+    if (_expressions.size() == _orderByExpressions.size()) {
+      return computeAllOrdered();
+    } else {
+      return computePartiallyOrdered();
+    }
+  }
+
+  /**
+   * Helper method to compute the result when all the output expressions are ordered.
+   */
+  private IntermediateResultsBlock computeAllOrdered() {
+    int numExpressions = _expressions.size();
+
+    // Fetch all the expressions and insert them into the priority queue
+    BlockValSet[] blockValSets = new BlockValSet[numExpressions];
     TransformBlock transformBlock;
     while ((transformBlock = _transformOperator.nextBlock()) != null) {
-      int numExpressions = _expressions.size();
-      BlockValSet[] blockValSets = new BlockValSet[numExpressions];
       for (int i = 0; i < numExpressions; i++) {
         ExpressionContext expression = _expressions.get(i);
         blockValSets[i] = transformBlock.getBlockValueSet(expression);
       }
       RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
-
       int numDocsFetched = transformBlock.getNumDocs();
       _numDocsScanned += numDocsFetched;
       for (int i = 0; i < numDocsFetched; i++) {
         SelectionOperatorUtils.addToPriorityQueue(blockValueFetcher.getRow(i), _rows, _numRowsToKeep);
       }
     }
+    _numEntriesScannedPostFilter = (long) _numDocsScanned * _transformOperator.getNumColumnsProjected();
 
-    return new IntermediateResultsBlock(_dataSchema, _rows);
+    // Create the data schema
+    String[] columnNames = new String[numExpressions];
+    DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      columnNames[i] = _expressions.get(i).toString();
+      TransformResultMetadata expressionMetadata = _orderByExpressionMetadata[i];
+      columnDataTypes[i] =
+          DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), expressionMetadata.isSingleValue());
+    }
+    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+
+    return new IntermediateResultsBlock(dataSchema, _rows);
+  }
+
+  /**
+   * Helper method to compute the result when not all the output expressions are ordered.
+   */
+  private IntermediateResultsBlock computePartiallyOrdered() {
+    int numExpressions = _expressions.size();
+    int numOrderByExpressions = _orderByExpressions.size();
+
+    // Fetch the order-by expressions and docIds and insert them into the priority queue
+    BlockValSet[] blockValSets = new BlockValSet[numOrderByExpressions + 1];
+    TransformBlock transformBlock;
+    while ((transformBlock = _transformOperator.nextBlock()) != null) {
+      for (int i = 0; i < numOrderByExpressions; i++) {
+        ExpressionContext expression = _orderByExpressions.get(i).getExpression();
+        blockValSets[i] = transformBlock.getBlockValueSet(expression);
+      }
+      blockValSets[numOrderByExpressions] = transformBlock.getBlockValueSet(BuiltInVirtualColumn.DOCID);
+      RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
+      int numDocsFetched = transformBlock.getNumDocs();
+      _numDocsScanned += numDocsFetched;
+      for (int i = 0; i < numDocsFetched; i++) {
+        // NOTE: We pre-allocate the complete row so that we can fill up the non-order-by output expression values later
+        //       without creating extra rows or re-constructing the priority queue. We can change the values in-place
+        //       because the comparator only compare the values for the order-by expressions.
+        Object[] row = new Object[numExpressions];
+        blockValueFetcher.getRow(i, row, 0);
+        SelectionOperatorUtils.addToPriorityQueue(row, _rows, _numRowsToKeep);
+      }
+    }
+
+    // Copy the rows (shallow copy so that any modification will also be reflected to the priority queue) into a list,
+    // and store the document ids into a bitmap
+    int numRows = _rows.size();
+    List<Object[]> rowList = new ArrayList<>(numRows);
+    MutableRoaringBitmap docIds = new MutableRoaringBitmap();
+    for (Object[] row : _rows) {
+      rowList.add(row);
+      docIds.add((int) row[numOrderByExpressions]);

Review comment:
       Nit for readability: either add a variable, or a comment to indicate row[numOrderByExpressions] has the docId.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
##########
@@ -143,24 +168,145 @@ public SelectionOrderByOperator(IndexSegment indexSegment, QueryContext queryCon
 
   @Override
   protected IntermediateResultsBlock getNextBlock() {
+    if (_expressions.size() == _orderByExpressions.size()) {
+      return computeAllOrdered();
+    } else {
+      return computePartiallyOrdered();
+    }
+  }
+
+  /**
+   * Helper method to compute the result when all the output expressions are ordered.
+   */
+  private IntermediateResultsBlock computeAllOrdered() {
+    int numExpressions = _expressions.size();
+
+    // Fetch all the expressions and insert them into the priority queue
+    BlockValSet[] blockValSets = new BlockValSet[numExpressions];
     TransformBlock transformBlock;
     while ((transformBlock = _transformOperator.nextBlock()) != null) {
-      int numExpressions = _expressions.size();
-      BlockValSet[] blockValSets = new BlockValSet[numExpressions];
       for (int i = 0; i < numExpressions; i++) {
         ExpressionContext expression = _expressions.get(i);
         blockValSets[i] = transformBlock.getBlockValueSet(expression);
       }
       RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
-
       int numDocsFetched = transformBlock.getNumDocs();
       _numDocsScanned += numDocsFetched;
       for (int i = 0; i < numDocsFetched; i++) {
         SelectionOperatorUtils.addToPriorityQueue(blockValueFetcher.getRow(i), _rows, _numRowsToKeep);
       }
     }
+    _numEntriesScannedPostFilter = (long) _numDocsScanned * _transformOperator.getNumColumnsProjected();
 
-    return new IntermediateResultsBlock(_dataSchema, _rows);
+    // Create the data schema
+    String[] columnNames = new String[numExpressions];
+    DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      columnNames[i] = _expressions.get(i).toString();
+      TransformResultMetadata expressionMetadata = _orderByExpressionMetadata[i];
+      columnDataTypes[i] =
+          DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), expressionMetadata.isSingleValue());
+    }
+    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+
+    return new IntermediateResultsBlock(dataSchema, _rows);
+  }
+
+  /**
+   * Helper method to compute the result when not all the output expressions are ordered.
+   */
+  private IntermediateResultsBlock computePartiallyOrdered() {

Review comment:
       Not required for this PR, but does it help if we completely defer the fetching of non-orderby expressions until combine phase? I think that might actually improve further quite a bit? We might need to have a segmentId+docId as a virtual column.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org