You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/07/05 07:06:15 UTC

[GitHub] [pinot] gortiz commented on a diff in pull request #8979: optimize `order by sorted ASC, unsorted` case

gortiz commented on code in PR #8979:
URL: https://github.com/apache/pinot/pull/8979#discussion_r913454116


##########
pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionPartiallyOrderedByOperator.java:
##########
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * An operator for order-by queries that are partially sorted over the sorting keys.
+ */
+public class SelectionPartiallyOrderedByOperator extends BaseOperator<IntermediateResultsBlock> {
+
+  private static final String EXPLAIN_NAME = "SELECT_PARTIAL_ORDERBY";
+
+  private final IndexSegment _indexSegment;
+
+  // Deduped order-by expressions followed by output expressions from SelectionOperatorUtils.extractExpressions()
+  private final List<ExpressionContext> _expressions;
+  private final List<ExpressionContext> _alreadySorted;
+  private final List<ExpressionContext> _toSort;
+
+  private final TransformOperator _transformOperator;
+  private final List<OrderByExpressionContext> _orderByExpressions;
+  private final TransformResultMetadata[] _expressionsMetadata;
+  private final int _numRowsToKeep;
+  private final PartiallySortedListBuilder _sorter;
+
+  private int _numDocsScanned = 0;
+  private long _numEntriesScannedPostFilter = 0;
+  private boolean _used = false;
+  private Comparator<Object[]> _comparator;
+
+  public SelectionPartiallyOrderedByOperator(IndexSegment indexSegment, QueryContext queryContext,
+      List<ExpressionContext> expressions, TransformOperator transformOperator, int sortedExpr) {
+    Preconditions.checkArgument(sortedExpr > 0,
+        "This operator should not be used when sorting expressions doesn't start with sorted expressions");
+    _indexSegment = indexSegment;
+    _expressions = expressions;
+    _transformOperator = transformOperator;
+
+    _orderByExpressions = queryContext.getOrderByExpressions();
+    assert _orderByExpressions != null;
+    int numOrderByExpressions = _orderByExpressions.size();
+
+    _alreadySorted = expressions.subList(0, sortedExpr);
+    _toSort = expressions.subList(sortedExpr, numOrderByExpressions);
+
+    Preconditions.checkArgument(!_toSort.isEmpty(),
+        "This operator should not be used when all sorting expressions are sorted. "
+            + "Use %s instead", SelectionOrderByOperator.class.getName());
+
+    _expressionsMetadata = new TransformResultMetadata[_expressions.size()];
+    for (int i = 0; i < _expressionsMetadata.length; i++) {
+      ExpressionContext expression = _expressions.get(i);
+      _expressionsMetadata[i] = _transformOperator.getResultMetadata(expression);
+    }
+
+    _numRowsToKeep = queryContext.getOffset() + queryContext.getLimit();
+
+    int expectedSize = Math.min(_numRowsToKeep * 2, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY);
+    Comparator<Object[]> sortedComparator =
+        SelectionOrderByOperator.getComparator(_orderByExpressions, _expressionsMetadata, true, 0, sortedExpr);
+    Comparator<Object[]> unsortedComparator =
+        SelectionOrderByOperator.getComparator(_orderByExpressions, _expressionsMetadata, true, sortedExpr,
+            numOrderByExpressions);
+    _sorter = new PartiallySortedListBuilder(expectedSize, sortedComparator, unsortedComparator);
+    _comparator = SelectionOrderByOperator.getComparator(_orderByExpressions, _expressionsMetadata, false);
+  }
+
+  @Override
+  public List<Operator> getChildOperators() {
+    return Collections.singletonList(_transformOperator);
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    if (!_used) {
+      _used = true;
+      fetch();
+
+      DataSchema dataSchema = createDataSchema();
+
+      return new IntermediateResultsBlock.ListSelection(dataSchema, _sorter.build(), _comparator);
+    }
+    return null;
+  }
+
+  private DataSchema createDataSchema() {
+
+    int numExpressions = _expressions.size();
+
+    // Create the data schema
+    String[] columnNames = new String[numExpressions];
+    DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numExpressions];
+    for (int i = 0; i < columnNames.length; i++) {
+      columnNames[i] = _expressions.get(i).toString();
+    }
+    for (int i = 0; i < numExpressions; i++) {
+      TransformResultMetadata expressionMetadata = _expressionsMetadata[i];
+      columnDataTypes[i] =
+          DataSchema.ColumnDataType.fromDataType(expressionMetadata.getDataType(), expressionMetadata.isSingleValue());
+    }
+    return new DataSchema(columnNames, columnDataTypes);
+  }
+
+  /**
+   * Fetches the rows that are needed into {@link #_sorter} and updated the metrics.
+   *
+   * The sorter can still be used.
+   */
+  private void fetch() {
+    int numExpressions = _expressions.size();
+    BlockValSet[] blockValSets = new BlockValSet[numExpressions];
+
+    TransformBlock transformBlock;
+    int numColumnsProjected = _transformOperator.getNumColumnsProjected();
+    try {
+      while ((transformBlock = _transformOperator.nextBlock()) != null) {
+        for (int i = 0; i < numExpressions; i++) {
+          ExpressionContext expression = _expressions.get(i);
+          blockValSets[i] = transformBlock.getBlockValueSet(expression);
+        }
+        RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
+        int numDocsFetched = transformBlock.getNumDocs();
+        _numDocsScanned += numDocsFetched;
+        for (int i = 0; i < numDocsFetched; i++) {
+          boolean newBlock = _sorter.add(blockValueFetcher.getRow(i));
+          if (newBlock && _sorter.sortedSize() >= _numRowsToKeep) {
+            // We changed to a new section to ordered sorted values and we have more values than required.
+            // Therefore, we can stop the execution here.
+            return;
+          }
+        }
+      }
+    } finally {
+      _numEntriesScannedPostFilter = (long) _numDocsScanned * numColumnsProjected;
+    }
+  }
+
+  @Override
+  public String toExplainString() {
+    StringBuilder sb = new StringBuilder(EXPLAIN_NAME);
+
+    sb.append("(sortedList: ");
+    concatList(sb, _alreadySorted);
+
+    sb.append(", unsortedList: ");
+    concatList(sb, _toSort);
+
+    sb.append(", rest: ");
+    concatList(sb, _expressions.subList(_alreadySorted.size() + _toSort.size(), _expressions.size()));
+
+    sb.append(')');
+    return sb.toString();
+  }
+
+  private void concatList(StringBuilder sb, List<?> list) {
+    sb.append('(');
+    Iterator<?> it = list.iterator();
+    if (it.hasNext()) {
+      sb.append(it.next());
+      while (it.hasNext()) {
+        sb.append(", ").append(it.next());
+      }
+    }
+    sb.append(')');
+  }
+
+  @Override
+  public ExecutionStatistics getExecutionStatistics() {
+    long numEntriesScannedInFilter = _transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+    int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+    return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, _numEntriesScannedPostFilter,
+        numTotalDocs);
+  }
+
+  /**
+   * A private class used to build a sorted list by adding partially sorted data.
+   *
+   * Specifically, this class has been designed to receive successive calls to {@link #add(Object[])} follow by a single
+   * call to {@link #build()}. Once this method is called, the behavior of this object is undefined and therefore it
+   * should not be used.
+   *
+   * Rows must be inserted in ascending order accordingly to the partial order specified by {@link #_sortedComparator}.
+   * This comparator will define <i>partitions</i> of rows. All the rows in the same partition same are considered equal
+   * by that comparator.
+   *
+   * There is a second comparator called {@link #_unsortedComparator} that is used to sort rows inside each partition.
+   * When calling {@link #add(Object[])} with a row that doesn't belong to the current partition, a new one is started
+   * and the previous one is sorted. Therefore, this object maintains the invariant that at any moment there are at
+   * least {@link #sortedSize()} elements that are completely sorted, which means that no matter which elements are
+   * added after that, these elements are going to be smallest.
+   */
+  private static class PartiallySortedListBuilder {
+    private final ArrayList<Object[]> _rows;
+    private final Comparator<Object[]> _sortedComparator;
+    private final Comparator<Object[]> _unsortedComparator;
+
+    public PartiallySortedListBuilder(int expectedSize, Comparator<Object[]> sortedComparator,
+        Comparator<Object[]> unsortedComparator) {
+      _rows = new ArrayList<>(expectedSize);
+      _sortedComparator = sortedComparator;
+      _unsortedComparator = unsortedComparator;
+    }
+
+    @Nullable
+    private Object[] _lastBlockRow;
+    private int _lastSorted;
+
+    /**
+     * Adds the given row to this object. The new column must be equal or higher than the last added row in relation to
+     * {@link #_sortedComparator}.
+     *
+     * @param row The row to add. The values of the already sorted columns must be equal or higher than the last added
+     *           row, if any.
+     * @return true if and only if the previous block was closed.
+     */
+    boolean add(Object[] row) {
+      boolean result;
+      if (_lastBlockRow == null) {
+        _lastSorted = 0;
+        _lastBlockRow = row;
+        result = false;
+      } else {
+        int cmp = _sortedComparator.compare(_lastBlockRow, row);
+        assert cmp >= 0 : "Row " + Arrays.toString(row) + " is lower than previously added row"

Review Comment:
   This is more a "computable comment" than something I would like to evaluate every time one element is being added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org