You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/03/09 23:17:57 UTC

[pinot] branch master updated: Enhance select order-by combine to use merge sort (#10357)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bd8a7dcff Enhance select order-by combine to use merge sort (#10357)
6bd8a7dcff is described below

commit 6bd8a7dcfff67b6fb80f96a60e502d7a6df65336
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Mar 9 15:17:51 2023 -0800

    Enhance select order-by combine to use merge sort (#10357)
---
 .../blocks/results/AggregationResultsBlock.java    |   3 +-
 .../operator/blocks/results/BaseResultsBlock.java  |   3 +-
 .../blocks/results/DistinctResultsBlock.java       |   2 +-
 .../blocks/results/ExceptionResultsBlock.java      |   4 +-
 .../blocks/results/ExplainResultsBlock.java        |   3 +-
 .../blocks/results/GroupByResultsBlock.java        |   2 +-
 .../blocks/results/MetadataResultsBlock.java       |   4 +-
 .../blocks/results/SelectionResultsBlock.java      |  47 ++++-----
 .../combine/BaseSingleBlockCombineOperator.java    |   2 +-
 ...xValueBasedSelectionOrderByCombineOperator.java |  21 +---
 .../combine/merger/ResultsBlockMerger.java         |  10 --
 .../merger/SelectionOnlyResultsBlockMerger.java    |   7 +-
 .../merger/SelectionOrderByResultsBlockMerger.java |  15 +--
 .../query/LinearSelectionOrderByOperator.java      | 111 ++++++++++-----------
 .../operator/query/SelectionOrderByOperator.java   |  22 ++--
 .../query/selection/SelectionOperatorService.java  |   7 +-
 .../query/selection/SelectionOperatorUtils.java    |  69 +++++++++----
 .../core/query/utils/OrderByComparatorFactory.java |  14 +--
 .../combine/SelectionCombineOperatorTest.java      |  30 +++---
 .../query/LinearSelectionOrderByOperatorTest.java  |  26 ++---
 .../selection/SelectionOperatorServiceTest.java    |  40 ++++----
 ...InnerSegmentSelectionMultiValueQueriesTest.java |  11 +-
 ...erSegmentSelectionMultiValueRawQueriesTest.java |   9 +-
 ...nnerSegmentSelectionSingleValueQueriesTest.java |  63 ++++++------
 24 files changed, 245 insertions(+), 280 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index 54fa0b9558..b2bb5b7308 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.operator.blocks.results;
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import org.apache.pinot.common.datatable.DataTable;
@@ -77,7 +76,7 @@ public class AggregationResultsBlock extends BaseResultsBlock {
   }
 
   @Override
-  public Collection<Object[]> getRows(QueryContext queryContext) {
+  public List<Object[]> getRows(QueryContext queryContext) {
     return Collections.singletonList(_results.toArray());
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
index 9d119afce7..b611430d50 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.operator.blocks.results;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -171,7 +170,7 @@ public abstract class BaseResultsBlock implements Block {
    * Returns the rows for the results. Return {@code null} when the block only contains metadata.
    */
   @Nullable
-  public abstract Collection<Object[]> getRows(QueryContext queryContext);
+  public abstract List<Object[]> getRows(QueryContext queryContext);
 
   /**
    * Returns a data table without metadata or exception attached.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
index 99a0f868c6..0cbeebd176 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
@@ -62,7 +62,7 @@ public class DistinctResultsBlock extends BaseResultsBlock {
   }
 
   @Override
-  public Collection<Object[]> getRows(QueryContext queryContext) {
+  public List<Object[]> getRows(QueryContext queryContext) {
     List<Object[]> rows = new ArrayList<>(_distinctTable.size());
     for (Record record : _distinctTable.getRecords()) {
       rows.add(record.getValues());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
index 783c7c22a4..02abb39b84 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
-import java.util.Collection;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
@@ -51,7 +51,7 @@ public class ExceptionResultsBlock extends BaseResultsBlock {
 
   @Nullable
   @Override
-  public Collection<Object[]> getRows(QueryContext queryContext) {
+  public List<Object[]> getRows(QueryContext queryContext) {
     return null;
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java
index bb3e71958b..5789c9ba76 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.operator.blocks.results;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -52,7 +51,7 @@ public class ExplainResultsBlock extends BaseResultsBlock {
   }
 
   @Override
-  public Collection<Object[]> getRows(QueryContext queryContext) {
+  public List<Object[]> getRows(QueryContext queryContext) {
     List<Object[]> rows = new ArrayList<>(_entries.size());
     for (ExplainEntry entry : _entries) {
       rows.add(entry.toRow());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index f431f4bd6f..7e5057b4d4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -149,7 +149,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
 
   @Nullable
   @Override
-  public Collection<Object[]> getRows(QueryContext queryContext) {
+  public List<Object[]> getRows(QueryContext queryContext) {
     if (_table == null) {
       return Collections.emptyList();
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
index 3ef747d2a1..97b941f3d6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
-import java.util.Collection;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
@@ -41,7 +41,7 @@ public class MetadataResultsBlock extends BaseResultsBlock {
 
   @Nullable
   @Override
-  public Collection<Object[]> getRows(QueryContext queryContext) {
+  public List<Object[]> getRows(QueryContext queryContext) {
     return null;
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
index 363cd1a4f5..b19c9898c6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
@@ -18,12 +18,9 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
-import com.google.common.base.Preconditions;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
-import java.util.PriorityQueue;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
@@ -36,32 +33,37 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
  */
 public class SelectionResultsBlock extends BaseResultsBlock {
   private final DataSchema _dataSchema;
-  private final Collection<Object[]> _rows;
   private final Comparator<? super Object[]> _comparator;
+  private List<Object[]> _rows;
 
-  public SelectionResultsBlock(DataSchema dataSchema, List<Object[]> rows) {
-    this(dataSchema, rows, null);
-  }
-
-  public SelectionResultsBlock(DataSchema dataSchema, PriorityQueue<Object[]> rows) {
-    this(dataSchema, rows, rows.comparator());
-  }
-
-  public SelectionResultsBlock(DataSchema dataSchema, Collection<Object[]> rows,
+  public SelectionResultsBlock(DataSchema dataSchema, List<Object[]> rows,
       @Nullable Comparator<? super Object[]> comparator) {
     _dataSchema = dataSchema;
     _rows = rows;
     _comparator = comparator;
   }
 
+  public SelectionResultsBlock(DataSchema dataSchema, List<Object[]> rows) {
+    this(dataSchema, rows, null);
+  }
+
   public DataSchema getDataSchema() {
     return _dataSchema;
   }
 
-  public Collection<Object[]> getRows() {
+  public List<Object[]> getRows() {
     return _rows;
   }
 
+  public void setRows(List<Object[]> rows) {
+    _rows = rows;
+  }
+
+  @Nullable
+  public Comparator<? super Object[]> getComparator() {
+    return _comparator;
+  }
+
   @Override
   public int getNumRows() {
     return _rows.size();
@@ -73,25 +75,10 @@ public class SelectionResultsBlock extends BaseResultsBlock {
   }
 
   @Override
-  public Collection<Object[]> getRows(QueryContext queryContext) {
+  public List<Object[]> getRows(QueryContext queryContext) {
     return _rows;
   }
 
-  public SelectionResultsBlock convertToPriorityQueueBased() {
-    if (_rows instanceof PriorityQueue) {
-      return this;
-    }
-    Preconditions.checkState(_comparator != null, "No comparator specified in the results block");
-    PriorityQueue<Object[]> result = new PriorityQueue<>(_comparator);
-    result.addAll(_rows);
-    return new SelectionResultsBlock(_dataSchema, result);
-  }
-
-  public PriorityQueue<Object[]> getRowsAsPriorityQueue() {
-    Preconditions.checkState(_rows instanceof PriorityQueue, "The results block is not backed by a priority queue");
-    return (PriorityQueue<Object[]>) _rows;
-  }
-
   @Override
   public DataTable getDataTable(QueryContext queryContext)
       throws IOException {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java
index adfeb710fc..7366a61b69 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java
@@ -148,7 +148,7 @@ public abstract class BaseSingleBlockCombineOperator<T extends BaseResultsBlock>
         return blockToMerge;
       }
       if (mergedBlock == null) {
-        mergedBlock = _resultsBlockMerger.convertToMergeableBlock((T) blockToMerge);
+        mergedBlock = (T) blockToMerge;
       } else {
         _resultsBlockMerger.mergeResultsBlocks(mergedBlock, (T) blockToMerge);
       }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index f33be5538e..666332d5ac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -285,17 +285,16 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator
         return blockToMerge;
       }
       if (mergedBlock == null) {
-        mergedBlock = convertToMergeableBlock((SelectionResultsBlock) blockToMerge);
+        mergedBlock = (SelectionResultsBlock) blockToMerge;
       } else {
         mergeResultsBlocks(mergedBlock, (SelectionResultsBlock) blockToMerge);
       }
       numBlocksMerged++;
 
       // Update the boundary value if enough rows are collected
-      PriorityQueue<Object[]> selectionResult = mergedBlock.getRowsAsPriorityQueue();
-      if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
-        assert selectionResult.peek() != null;
-        _globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
+      List<Object[]> rows = mergedBlock.getRows();
+      if (rows.size() == _numRowsToKeep) {
+        _globalBoundaryValue.set((Comparable) rows.get(_numRowsToKeep - 1)[0]);
       }
     }
     return mergedBlock;
@@ -315,17 +314,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator
           QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
       return;
     }
-
-    PriorityQueue<Object[]> mergedRows = mergedBlock.getRowsAsPriorityQueue();
-    Collection<Object[]> rowsToMerge = blockToMerge.getRows();
-    assert mergedRows != null && rowsToMerge != null;
-    SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, _numRowsToKeep);
-  }
-
-  protected SelectionResultsBlock convertToMergeableBlock(SelectionResultsBlock resultsBlock) {
-    // This may create a copy or return the same instance. Anyway, this operator is the owner of the
-    // value now, so it can mutate it.
-    return resultsBlock.convertToPriorityQueueBased();
+    SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge, _numRowsToKeep);
   }
 
   private static class MinMaxValueContext {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/ResultsBlockMerger.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/ResultsBlockMerger.java
index 0c77e66c93..318dfd2bd0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/ResultsBlockMerger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/ResultsBlockMerger.java
@@ -44,14 +44,4 @@ public interface ResultsBlockMerger<T extends BaseResultsBlock> {
   default boolean isQuerySatisfied(T resultsBlock) {
     return false;
   }
-
-  /**
-   * Converts the given results block into a mergeable results block.
-   *
-   * <p>This conversion is necessary if a block is used as the first argument for:
-   * {@link ResultsBlockMerger#mergeResultsBlocks(BaseResultsBlock, BaseResultsBlock)}.
-   */
-  default T convertToMergeableBlock(T resultsBlock) {
-    return resultsBlock;
-  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOnlyResultsBlockMerger.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOnlyResultsBlockMerger.java
index 54b6185be1..aec95823c8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOnlyResultsBlockMerger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOnlyResultsBlockMerger.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
-import java.util.Collection;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
@@ -56,10 +55,6 @@ public class SelectionOnlyResultsBlockMerger implements ResultsBlockMerger<Selec
           QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
       return;
     }
-
-    Collection<Object[]> mergedRows = mergedBlock.getRows();
-    Collection<Object[]> rowsToMerge = blockToMerge.getRows();
-    assert mergedRows != null && rowsToMerge != null;
-    SelectionOperatorUtils.mergeWithoutOrdering(mergedRows, rowsToMerge, _numRowsToKeep);
+    SelectionOperatorUtils.mergeWithoutOrdering(mergedBlock, blockToMerge, _numRowsToKeep);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOrderByResultsBlockMerger.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOrderByResultsBlockMerger.java
index 569b2b8e3b..a5483853cb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOrderByResultsBlockMerger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOrderByResultsBlockMerger.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pinot.core.operator.combine.merger;
 
-import java.util.Collection;
-import java.util.PriorityQueue;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
@@ -52,17 +50,6 @@ public class SelectionOrderByResultsBlockMerger implements ResultsBlockMerger<Se
           QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
       return;
     }
-
-    PriorityQueue<Object[]> mergedRows = mergedBlock.getRowsAsPriorityQueue();
-    Collection<Object[]> rowsToMerge = blockToMerge.getRows();
-    assert mergedRows != null && rowsToMerge != null;
-    SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, _numRowsToKeep);
-  }
-
-  @Override
-  public SelectionResultsBlock convertToMergeableBlock(SelectionResultsBlock resultsBlock) {
-    // This may create a copy or return the same instance. Anyway, this operator is the owner of the
-    // value now, so it can mutate it.
-    return resultsBlock.convertToPriorityQueueBased();
+    SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge, _numRowsToKeep);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java
index e9fe801262..c86e5a114d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java
@@ -74,13 +74,8 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select
   protected final List<OrderByExpressionContext> _orderByExpressions;
   protected final TransformResultMetadata[] _expressionsMetadata;
   protected final int _numRowsToKeep;
-  private final Supplier<ListBuilder> _listBuilderSupplier;
-  protected boolean _used = false;
-  /**
-   * The comparator used to build the resulting {@link SelectionResultsBlock}, which sorts rows in reverse order to the
-   * one specified in the query.
-   */
-  protected Comparator<Object[]> _comparator;
+  protected final Comparator<Object[]> _comparator;
+  protected final Supplier<ListBuilder> _listBuilderSupplier;
 
   /**
    * @param expressions Order-by expressions must be at the head of the list.
@@ -107,21 +102,20 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select
     }
 
     _numRowsToKeep = queryContext.getOffset() + queryContext.getLimit();
+    _comparator =
+        OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, _nullHandlingEnabled);
 
     if (_toSort.isEmpty()) {
       _listBuilderSupplier = () -> new TotallySortedListBuilder(_numRowsToKeep);
     } else {
       Comparator<Object[]> sortedComparator =
-          OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, false, _nullHandlingEnabled,
-              0, numSortedExpressions);
+          OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, _nullHandlingEnabled, 0,
+              numSortedExpressions);
       Comparator<Object[]> unsortedComparator =
-          OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, true, _nullHandlingEnabled,
+          OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, _nullHandlingEnabled,
               numSortedExpressions, numOrderByExpressions);
       _listBuilderSupplier = () -> new PartiallySortedListBuilder(_numRowsToKeep, sortedComparator, unsortedComparator);
     }
-
-    _comparator =
-        OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, true, _nullHandlingEnabled);
   }
 
   @Override
@@ -221,17 +215,7 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select
 
   @Override
   protected SelectionResultsBlock getNextBlock() {
-    Preconditions.checkState(!_used, "nextBlock was called more than once");
-    _used = true;
-    List<Object[]> list = fetch(_listBuilderSupplier);
-
-    DataSchema dataSchema = createDataSchema();
-
-    if (list.size() > _numRowsToKeep) {
-      list = new ArrayList<>(list.subList(0, _numRowsToKeep));
-    }
-
-    return new SelectionResultsBlock(dataSchema, list, _comparator);
+    return new SelectionResultsBlock(createDataSchema(), fetch(_listBuilderSupplier), _comparator);
   }
 
   protected DataSchema createDataSchema() {
@@ -335,25 +319,28 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select
    */
   @VisibleForTesting
   static class PartiallySortedListBuilder implements ListBuilder {
-    /**
-     * A list with all the elements that have been already sorted.
-     */
-    private final ArrayList<Object[]> _sorted;
-    /**
-     * This attribute is used to store the last partition when the builder already contains {@link #_maxNumRows} rows.
-     */
-    private PriorityQueue<Object[]> _lastPartitionQueue;
+
+    private final int _maxNumRows;
+
     /**
      * The comparator that defines the partitions and the one that impose in which order add has to be called.
      */
     private final Comparator<Object[]> _partitionComparator;
+
     /**
-     * The comparator that sorts different rows on each partition, which sorts rows in reverse order to the one
-     * specified in the query.
+     * The comparator that sorts different rows on each partition.
      */
     private final Comparator<Object[]> _unsortedComparator;
 
-    private final int _maxNumRows;
+    /**
+     * List of rows, where the first _numSortedRows are sorted.
+     */
+    private final ArrayList<Object[]> _rows;
+
+    /**
+     * This attribute is used to store the last partition when the builder already contains {@link #_maxNumRows} rows.
+     */
+    private PriorityQueue<Object[]> _lastPartitionQueue;
 
     private Object[] _lastPartitionRow;
     private int _numSortedRows;
@@ -361,50 +348,52 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select
     public PartiallySortedListBuilder(int maxNumRows, Comparator<Object[]> partitionComparator,
         Comparator<Object[]> unsortedComparator) {
       _maxNumRows = maxNumRows;
-      _sorted = new ArrayList<>(Integer.min(maxNumRows, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY));
       _partitionComparator = partitionComparator;
       _unsortedComparator = unsortedComparator;
+      _rows = new ArrayList<>(Integer.min(maxNumRows, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY));
     }
 
     @Override
     public boolean add(Object[] row) {
       if (_lastPartitionRow == null) {
         _lastPartitionRow = row;
-        _sorted.add(row);
+        _rows.add(row);
         return false;
       }
-      int cmp = _partitionComparator.compare(row, _lastPartitionRow);
-      if (cmp < 0) {
-        throw new IllegalArgumentException(
-            "Row with docId " + _sorted.size() + " is not sorted compared to the previous one");
-      }
+      int compareResult = _partitionComparator.compare(row, _lastPartitionRow);
+      Preconditions.checkState(compareResult >= 0, "Rows are not sorted");
 
-      boolean newPartition = cmp > 0;
-      if (_sorted.size() < _maxNumRows) {
+      boolean newPartition = compareResult > 0;
+      int numRows = _rows.size();
+      if (numRows < _maxNumRows) {
         // we don't have enough rows yet
         if (newPartition) {
           _lastPartitionRow = row;
-          _numSortedRows = _sorted.size();
+          if (numRows - _numSortedRows > 1) {
+            _rows.subList(_numSortedRows, numRows).sort(_unsortedComparator);
+          }
+          _numSortedRows = numRows;
         }
         // just add the new row to the result list
-        _sorted.add(row);
+        _rows.add(row);
         return false;
       }
 
       // enough rows have been collected
-      assert _sorted.size() == _maxNumRows;
-      if (newPartition) { // and the new element belongs to a new partition, so we can just ignore it
+      assert numRows == _maxNumRows;
+      if (newPartition) {
+        // new element belongs to a new partition, so we can just ignore it
         return true;
       }
       // new element doesn't belong to a new partition, so we may need to add it
-      if (_lastPartitionQueue == null) { // we have exactly _numRows rows, and the new belongs to the last partition
+      if (_lastPartitionQueue == null) {
         // we need to prepare the priority queue
-        int numRowsInPriorityQueue = _maxNumRows - _numSortedRows;
-        _lastPartitionQueue = new PriorityQueue<>(numRowsInPriorityQueue, _unsortedComparator);
-        _lastPartitionQueue.addAll(_sorted.subList(_numSortedRows, _maxNumRows));
+        int numRowsInPriorityQueue = numRows - _numSortedRows;
+        _lastPartitionQueue = new PriorityQueue<>(numRowsInPriorityQueue, _unsortedComparator.reversed());
+        _lastPartitionQueue.addAll(_rows.subList(_numSortedRows, numRows));
       }
       // add the new element if it is lower than the greatest element stored in the partition
-      if (_unsortedComparator.compare(row, _lastPartitionQueue.peek()) > 0) {
+      if (_unsortedComparator.compare(row, _lastPartitionQueue.peek()) < 0) {
         _lastPartitionQueue.poll();
         _lastPartitionQueue.offer(row);
       }
@@ -413,14 +402,18 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select
 
     @Override
     public List<Object[]> build() {
-      if (_lastPartitionQueue != null) {
-        assert _lastPartitionQueue.size() == _maxNumRows - _numSortedRows;
-        Iterator<Object[]> lastPartitionIt = _lastPartitionQueue.iterator();
-        for (int i = _numSortedRows; i < _maxNumRows; i++) {
-          _sorted.set(i, lastPartitionIt.next());
+      int numRows = _rows.size();
+      if (_lastPartitionQueue == null) {
+        if (numRows - _numSortedRows > 1) {
+          _rows.subList(_numSortedRows, numRows).sort(_unsortedComparator);
+        }
+      } else {
+        assert numRows == _maxNumRows && _lastPartitionQueue.size() == numRows - _numSortedRows;
+        for (int i = numRows - 1; i >= _numSortedRows; i--) {
+          _rows.set(i, _lastPartitionQueue.poll());
         }
       }
-      return _sorted;
+      return _rows;
     }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index 107cbe0d7b..9c6702a481 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.operator.query;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -80,6 +81,7 @@ public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock
   private final List<OrderByExpressionContext> _orderByExpressions;
   private final TransformResultMetadata[] _orderByExpressionMetadata;
   private final int _numRowsToKeep;
+  private final Comparator<Object[]> _comparator;
   private final PriorityQueue<Object[]> _rows;
 
   private int _numDocsScanned = 0;
@@ -103,11 +105,10 @@ public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock
     }
 
     _numRowsToKeep = queryContext.getOffset() + queryContext.getLimit();
-    Comparator<Object[]> comparator =
-        OrderByComparatorFactory.getComparator(_orderByExpressions, _orderByExpressionMetadata, true,
-            _nullHandlingEnabled);
+    _comparator =
+        OrderByComparatorFactory.getComparator(_orderByExpressions, _orderByExpressionMetadata, _nullHandlingEnabled);
     _rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
-        comparator);
+        _comparator.reversed());
   }
 
   @Override
@@ -183,7 +184,7 @@ public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock
     }
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
 
-    return new SelectionResultsBlock(dataSchema, _rows);
+    return new SelectionResultsBlock(dataSchema, getSortedRows(), _comparator);
   }
 
   /**
@@ -303,7 +304,16 @@ public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock
     }
     DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
 
-    return new SelectionResultsBlock(dataSchema, _rows);
+    return new SelectionResultsBlock(dataSchema, getSortedRows(), _comparator);
+  }
+
+  private List<Object[]> getSortedRows() {
+    int numRows = _rows.size();
+    Object[][] sortedRows = new Object[numRows][];
+    for (int i = numRows - 1; i >= 0; i--) {
+      sortedRows[i] = _rows.poll();
+    }
+    return Arrays.asList(sortedRows);
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index ff1ab76533..419a4df883 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -55,7 +55,6 @@ import org.roaringbitmap.RoaringBitmap;
  *   </li>
  * </ul>
  */
-@SuppressWarnings("rawtypes")
 public class SelectionOperatorService {
   private final QueryContext _queryContext;
   private final List<String> _selectionColumns;
@@ -65,7 +64,7 @@ public class SelectionOperatorService {
   private final PriorityQueue<Object[]> _rows;
 
   /**
-   * Constructor for <code>SelectionOperatorService</code> with {@link DataSchema}. (Inter segment)
+   * Constructor for <code>SelectionOperatorService</code> with {@link DataSchema}. (Broker side)
    *
    * @param queryContext Selection order-by query
    * @param dataSchema data schema.
@@ -78,6 +77,8 @@ public class SelectionOperatorService {
     _offset = queryContext.getOffset();
     _numRowsToKeep = _offset + queryContext.getLimit();
     assert queryContext.getOrderByExpressions() != null;
+    // TODO: Do not use type compatible comparator for performance since we don't support different data schema on
+    //       server side combine
     _rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
         SelectionOperatorUtils.getTypeCompatibleComparator(queryContext.getOrderByExpressions(), _dataSchema,
             _queryContext.isNullHandlingEnabled()));
@@ -95,6 +96,8 @@ public class SelectionOperatorService {
   /**
    * Reduces a collection of {@link DataTable}s to selection rows for selection queries with <code>ORDER BY</code>.
    * (Broker side)
+   * TODO: Do merge sort after releasing 0.13.0 when server side results are sorted
+   *       Can also consider adding a data table metadata to indicate whether the server side results are sorted
    */
   public void reduceWithOrdering(Collection<DataTable> dataTables, boolean nullHandlingEnabled) {
     for (DataTable dataTable : dataTables) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 8ef2d405c3..dcb3cec6e6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -27,7 +27,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
@@ -40,6 +39,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.trace.Tracing;
@@ -192,37 +192,70 @@ public class SelectionOperatorUtils {
   /**
    * Merge two partial results for selection queries without <code>ORDER BY</code>. (Server side)
    *
-   * @param mergedRows partial results 1.
-   * @param rowsToMerge partial results 2.
+   * @param mergedBlock partial results 1.
+   * @param blockToMerge partial results 2.
    * @param selectionSize size of the selection.
    */
-  public static void mergeWithoutOrdering(Collection<Object[]> mergedRows, Collection<Object[]> rowsToMerge,
+  public static void mergeWithoutOrdering(SelectionResultsBlock mergedBlock, SelectionResultsBlock blockToMerge,
       int selectionSize) {
-    Iterator<Object[]> iterator = rowsToMerge.iterator();
-    int numMergedRows = 0;
-    while (mergedRows.size() < selectionSize && iterator.hasNext()) {
-      mergedRows.add(iterator.next());
-      Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numMergedRows);
-      numMergedRows++;
+    List<Object[]> mergedRows = mergedBlock.getRows();
+    List<Object[]> rowsToMerge = blockToMerge.getRows();
+    int numRowsToMerge = Math.min(selectionSize - mergedRows.size(), rowsToMerge.size());
+    if (numRowsToMerge > 0) {
+      mergedRows.addAll(rowsToMerge.subList(0, numRowsToMerge));
     }
   }
 
   /**
    * Merge two partial results for selection queries with <code>ORDER BY</code>. (Server side)
-   * TODO: Should use type compatible comparator to compare the rows
    *
-   * @param mergedRows partial results 1.
-   * @param rowsToMerge partial results 2.
+   * @param mergedBlock partial results 1 (sorted).
+   * @param blockToMerge partial results 2 (sorted).
    * @param maxNumRows maximum number of rows need to be stored.
    */
-  public static void mergeWithOrdering(PriorityQueue<Object[]> mergedRows, Collection<Object[]> rowsToMerge,
+  public static void mergeWithOrdering(SelectionResultsBlock mergedBlock, SelectionResultsBlock blockToMerge,
       int maxNumRows) {
+    List<Object[]> sortedRows1 = mergedBlock.getRows();
+    List<Object[]> sortedRows2 = blockToMerge.getRows();
+    Comparator<? super Object[]> comparator = mergedBlock.getComparator();
+    assert comparator != null;
+    int numSortedRows1 = sortedRows1.size();
+    int numSortedRows2 = sortedRows2.size();
+    if (numSortedRows1 == 0) {
+      mergedBlock.setRows(sortedRows2);
+      return;
+    }
+    if (numSortedRows2 == 0 || (numSortedRows1 == maxNumRows
+        && comparator.compare(sortedRows1.get(numSortedRows1 - 1), sortedRows2.get(0)) <= 0)) {
+      return;
+    }
+    int numRowsToMerge = Math.min(numSortedRows1 + numSortedRows2, maxNumRows);
+    List<Object[]> mergedRows = new ArrayList<>(numRowsToMerge);
+    int i1 = 0;
+    int i2 = 0;
     int numMergedRows = 0;
-    for (Object[] row : rowsToMerge) {
-      addToPriorityQueue(row, mergedRows, maxNumRows);
-      Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numMergedRows);
-      numMergedRows++;
+    while (i1 < numSortedRows1 && i2 < numSortedRows2 && numMergedRows < numRowsToMerge) {
+      Object[] row1 = sortedRows1.get(i1);
+      Object[] row2 = sortedRows2.get(i2);
+      if (comparator.compare(row1, row2) <= 0) {
+        mergedRows.add(row1);
+        i1++;
+      } else {
+        mergedRows.add(row2);
+        i2++;
+      }
+      Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numMergedRows++);
+    }
+    if (numMergedRows < numRowsToMerge) {
+      if (i1 < numSortedRows1) {
+        assert i2 == numSortedRows2;
+        mergedRows.addAll(sortedRows1.subList(i1, i1 + numRowsToMerge - numMergedRows));
+      } else {
+        assert i1 == numSortedRows1;
+        mergedRows.addAll(sortedRows2.subList(i2, i2 + numRowsToMerge - numMergedRows));
+      }
     }
+    mergedBlock.setRows(mergedRows);
   }
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
index 4258317e76..5d25cc72a2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
@@ -38,17 +38,13 @@ public class OrderByComparatorFactory {
   }
 
   public static Comparator<Object[]> getComparator(List<OrderByExpressionContext> orderByExpressions,
-      TransformResultMetadata[] orderByExpressionMetadata, boolean reverse, boolean nullHandlingEnabled) {
-    return getComparator(orderByExpressions, orderByExpressionMetadata, reverse, nullHandlingEnabled, 0,
+      TransformResultMetadata[] orderByExpressionMetadata, boolean nullHandlingEnabled) {
+    return getComparator(orderByExpressions, orderByExpressionMetadata, nullHandlingEnabled, 0,
         orderByExpressions.size());
   }
 
-  /**
-   * @param reverse if false, the comparator will order in the direction indicated by the
-   * {@link OrderByExpressionContext#isAsc()}. Otherwise, it will be in the opposite direction.
-   */
   public static Comparator<Object[]> getComparator(List<OrderByExpressionContext> orderByExpressions,
-      TransformResultMetadata[] orderByExpressionMetadata, boolean reverse, boolean nullHandlingEnabled, int from,
+      TransformResultMetadata[] orderByExpressionMetadata, boolean nullHandlingEnabled, int from,
       int to) {
     Preconditions.checkArgument(to <= orderByExpressions.size(),
         "Trying to access %sth position of orderByExpressions with size %s", to, orderByExpressions.size());
@@ -76,13 +72,11 @@ public class OrderByComparatorFactory {
     FieldSpec.DataType[] storedTypes = new FieldSpec.DataType[numValuesToCompare];
     // Use multiplier -1 or 1 to control ascending/descending order
     int[] multipliers = new int[numValuesToCompare];
-    int ascMult = reverse ? -1 : 1;
-    int descMult = reverse ? 1 : -1;
     for (int i = 0; i < numValuesToCompare; i++) {
       int valueIndex = valueIndexList.get(i);
       valueIndices[i] = valueIndex;
       storedTypes[i] = orderByExpressionMetadata[valueIndex].getDataType().getStoredType();
-      multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? ascMult : descMult;
+      multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? 1 : -1;
     }
 
     if (nullHandlingEnabled) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index 88b81d5b88..f7a73647fa 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.PriorityQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.commons.io.FileUtils;
@@ -224,12 +223,11 @@ public class SelectionCombineOperatorTest {
     SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn");
     assertEquals(combineResult.getDataSchema(),
         new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
-    PriorityQueue<Object[]> selectionResult = combineResult.getRowsAsPriorityQueue();
-    assertNotNull(selectionResult);
-    assertEquals(selectionResult.size(), 10);
-    int expectedValue = 9;
-    while (!selectionResult.isEmpty()) {
-      assertEquals((int) selectionResult.poll()[0], expectedValue--);
+    List<Object[]> rows = combineResult.getRows();
+    assertNotNull(rows);
+    assertEquals(rows.size(), 10);
+    for (int i = 0; i < 10; i++) {
+      assertEquals((int) rows.get(i)[0], i);
     }
     // Should early-terminate after processing the result of the first segment. Each thread should process at most 1
     // segment.
@@ -248,12 +246,12 @@ public class SelectionCombineOperatorTest {
     combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC");
     assertEquals(combineResult.getDataSchema(),
         new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
-    selectionResult = combineResult.getRowsAsPriorityQueue();
-    assertNotNull(selectionResult);
-    assertEquals(selectionResult.size(), 10);
-    expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 40;
-    while (!selectionResult.isEmpty()) {
-      assertEquals((int) selectionResult.poll()[0], expectedValue++);
+    rows = combineResult.getRows();
+    assertNotNull(rows);
+    assertEquals(rows.size(), 10);
+    int expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 49;
+    for (int i = 0; i < 10; i++) {
+      assertEquals((int) rows.get(i)[0], expectedValue - i);
     }
     // Should early-terminate after processing the result of the first segment. Each thread should process at most 1
     // segment.
@@ -272,9 +270,9 @@ public class SelectionCombineOperatorTest {
     combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC LIMIT 10000");
     assertEquals(combineResult.getDataSchema(),
         new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
-    selectionResult = combineResult.getRowsAsPriorityQueue();
-    assertNotNull(selectionResult);
-    assertEquals(selectionResult.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+    rows = combineResult.getRows();
+    assertNotNull(rows);
+    assertEquals(rows.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
     // Should not early-terminate
     numDocsScanned = combineResult.getNumDocsScanned();
     assertEquals(numDocsScanned, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java
index b7e2a2b6de..b7ed828aa2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java
@@ -19,9 +19,7 @@
 package org.apache.pinot.core.operator.query;
 
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import org.apache.pinot.core.operator.query.LinearSelectionOrderByOperator.PartiallySortedListBuilder;
 import org.apache.pinot.core.operator.query.LinearSelectionOrderByOperator.TotallySortedListBuilder;
 import org.testng.annotations.Test;
@@ -67,7 +65,7 @@ public class LinearSelectionOrderByOperatorTest {
   public void testPartiallySortedListBuilder() {
     int maxNumRows = 10;
     Comparator<Object[]> partitionComparator = Comparator.comparingInt(row -> (Integer) row[0]);
-    Comparator<Object[]> unsortedComparator = (row1, row2) -> Integer.compare((Integer) row2[1], (Integer) row1[1]);
+    Comparator<Object[]> unsortedComparator = Comparator.comparingInt(row -> (Integer) row[1]);
 
     // Enough rows collected without tie rows
     PartiallySortedListBuilder listBuilder =
@@ -81,7 +79,7 @@ public class LinearSelectionOrderByOperatorTest {
     List<Object[]> rows = listBuilder.build();
     assertEquals(rows.size(), maxNumRows);
     for (int i = 0; i < maxNumRows; i++) {
-      assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i});
+      assertEquals(rows.get(i), new Object[]{i / 2, i % 2 == 0 ? maxNumRows - i - 1 : maxNumRows - i + 1});
     }
 
     // Enough rows collected with tie rows
@@ -98,19 +96,13 @@ public class LinearSelectionOrderByOperatorTest {
     rows = listBuilder.build();
     assertEquals(rows.size(), maxNumRows);
     // For the last partition, should contain unsorted value 0 and 1
-    Set<Integer> unsortedValues = new HashSet<>();
     for (int i = 0; i < maxNumRows; i++) {
       if (i / 2 != lastPartitionValue) {
-        assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i});
+        assertEquals(rows.get(i), new Object[]{i / 2, i % 2 == 0 ? maxNumRows - i - 1 : maxNumRows - i + 1});
       } else {
-        Object[] row = rows.get(i);
-        assertEquals(row[0], lastPartitionValue);
-        int unsortedValue = (int) row[1];
-        assertTrue(unsortedValue == 0 || unsortedValue == 1);
-        unsortedValues.add(unsortedValue);
+        assertEquals(rows.get(i), new Object[]{lastPartitionValue, i % 2});
       }
     }
-    assertEquals(unsortedValues.size(), 2);
 
     // Not enough rows collected with tie rows
     listBuilder = new PartiallySortedListBuilder(maxNumRows, partitionComparator, unsortedComparator);
@@ -125,18 +117,12 @@ public class LinearSelectionOrderByOperatorTest {
     rows = listBuilder.build();
     assertEquals(rows.size(), maxNumRows);
     // For the last partition, should contain unsorted value 0 and 1
-    unsortedValues = new HashSet<>();
     for (int i = 0; i < maxNumRows; i++) {
       if (i / 2 != lastPartitionValue) {
-        assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i});
+        assertEquals(rows.get(i), new Object[]{i / 2, i % 2 == 0 ? maxNumRows - i - 1 : maxNumRows - i + 1});
       } else {
-        Object[] row = rows.get(i);
-        assertEquals(row[0], lastPartitionValue);
-        int unsortedValue = (int) row[1];
-        assertTrue(unsortedValue == 0 || unsortedValue == 1);
-        unsortedValues.add(unsortedValue);
+        assertEquals(rows.get(i), new Object[]{lastPartitionValue, i % 2});
       }
     }
-    assertEquals(unsortedValues.size(), 2);
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
index df95b6a5e1..cdae8a37e6 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
@@ -22,12 +22,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
-import java.util.PriorityQueue;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -38,6 +39,7 @@ import org.testng.annotations.Test;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 
@@ -168,13 +170,15 @@ public class SelectionOperatorServiceTest {
 
   @Test
   public void testCompatibleRowsMergeWithoutOrdering() {
-    ArrayList<Object[]> mergedRows = new ArrayList<>(2);
+    List<Object[]> mergedRows = new ArrayList<>(2);
     mergedRows.add(_row1);
     mergedRows.add(_row2);
-    Collection<Object[]> rowsToMerge = new ArrayList<>(2);
+    SelectionResultsBlock mergedBlock = new SelectionResultsBlock(_dataSchema, mergedRows);
+    List<Object[]> rowsToMerge = new ArrayList<>(2);
     rowsToMerge.add(_compatibleRow1);
     rowsToMerge.add(_compatibleRow2);
-    SelectionOperatorUtils.mergeWithoutOrdering(mergedRows, rowsToMerge, 3);
+    SelectionResultsBlock blockToMerge = new SelectionResultsBlock(_compatibleDataSchema, rowsToMerge);
+    SelectionOperatorUtils.mergeWithoutOrdering(mergedBlock, blockToMerge, 3);
     assertEquals(mergedRows.size(), 3);
     assertSame(mergedRows.get(0), _row1);
     assertSame(mergedRows.get(1), _row2);
@@ -183,21 +187,23 @@ public class SelectionOperatorServiceTest {
 
   @Test
   public void testCompatibleRowsMergeWithOrdering() {
-    SelectionOperatorService selectionOperatorService = new SelectionOperatorService(_queryContext, _dataSchema);
-    PriorityQueue<Object[]> mergedRows = selectionOperatorService.getRows();
+    assertNotNull(_queryContext.getOrderByExpressions());
+    Comparator<Object[]> comparator =
+        SelectionOperatorUtils.getTypeCompatibleComparator(_queryContext.getOrderByExpressions(), _dataSchema,
+            _queryContext.isNullHandlingEnabled()).reversed();
     int maxNumRows = _queryContext.getOffset() + _queryContext.getLimit();
-    Collection<Object[]> rowsToMerge1 = new ArrayList<>(2);
-    rowsToMerge1.add(_row1);
-    rowsToMerge1.add(_row2);
-    SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge1, maxNumRows);
-    Collection<Object[]> rowsToMerge2 = new ArrayList<>(2);
-    rowsToMerge2.add(_compatibleRow1);
-    rowsToMerge2.add(_compatibleRow2);
-    SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge2, maxNumRows);
+    SelectionResultsBlock mergedBlock = new SelectionResultsBlock(_dataSchema, Collections.emptyList(), comparator);
+    List<Object[]> rowsToMerge1 = Arrays.asList(_row2, _row1);
+    SelectionResultsBlock blockToMerge1 = new SelectionResultsBlock(_dataSchema, rowsToMerge1, comparator);
+    SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge1, maxNumRows);
+    List<Object[]> rowsToMerge2 = Arrays.asList(_compatibleRow2, _compatibleRow1);
+    SelectionResultsBlock blockToMerge2 = new SelectionResultsBlock(_compatibleDataSchema, rowsToMerge2, comparator);
+    SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge2, maxNumRows);
+    List<Object[]> mergedRows = mergedBlock.getRows();
     assertEquals(mergedRows.size(), 3);
-    assertSame(mergedRows.poll(), _compatibleRow1);
-    assertSame(mergedRows.poll(), _row2);
-    assertSame(mergedRows.poll(), _compatibleRow2);
+    assertSame(mergedRows.get(0), _compatibleRow2);
+    assertSame(mergedRows.get(1), _row2);
+    assertSame(mergedRows.get(2), _compatibleRow1);
   }
 
   @Test
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
index 876baf7a98..a2c62d888d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.queries;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.ExecutionStatistics;
@@ -175,7 +174,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
         DataSchema.ColumnDataType.INT_ARRAY);
-    selectionResult = (List<Object[]>) resultsBlock.getRows();
+    selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
     firstRow = selectionResult.get(0);
     assertEquals(firstRow.length, 3);
@@ -203,9 +202,9 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
         DataSchema.ColumnDataType.INT_ARRAY);
-    PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    Object[] lastRow = selectionResult.peek();
+    Object[] lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 4);
     assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ");
     assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{1252});
@@ -228,9 +227,9 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
         DataSchema.ColumnDataType.INT_ARRAY);
-    selectionResult = resultsBlock.getRowsAsPriorityQueue();
+    selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.peek();
+    lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 4);
     assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ");
     assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{2147483647});
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
index 9e3abdc929..c28e8d9d9a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.queries;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.core.operator.ExecutionStatistics;
@@ -203,9 +202,9 @@ public class InnerSegmentSelectionMultiValueRawQueriesTest extends BaseMultiValu
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
         DataSchema.ColumnDataType.INT_ARRAY);
-    PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    Object[] lastRow = selectionResult.peek();
+    Object[] lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 4);
     assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ");
     assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{1252});
@@ -228,9 +227,9 @@ public class InnerSegmentSelectionMultiValueRawQueriesTest extends BaseMultiValu
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
         DataSchema.ColumnDataType.INT_ARRAY);
-    selectionResult = resultsBlock.getRowsAsPriorityQueue();
+    selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.peek();
+    lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 4);
     assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ");
     assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{2147483647});
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index 9506bcbb9f..92d24fc72c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.queries;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -108,7 +107,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("daysSinceEpoch"));
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("daysSinceEpoch")), ColumnDataType.INT);
 
-    PriorityQueue<Object[]> selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
     for (Object[] row : selectionResult) {
       assertEquals(row.length, 1);
@@ -232,9 +231,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("column1"));
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT);
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT);
-    PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    Object[] lastRow = selectionResult.peek();
+    Object[] lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 4);
     assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
     assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 10542595);
@@ -256,9 +255,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("column1"));
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT);
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT);
-    selectionResult = resultsBlock.getRowsAsPriorityQueue();
+    selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.peek();
+    lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 4);
     assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
     assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 462769197);
@@ -281,9 +280,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "column1", "column11"});
     assertEquals(dataSchema.getColumnDataTypes(),
         new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.STRING});
-    PriorityQueue<Object[]> selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    Object[] lastRow = selectionResult.peek();
+    Object[] lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 3);
     assertEquals(lastRow[0], "gFuH");
 
@@ -302,9 +301,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "column1", "column11"});
     assertEquals(dataSchema.getColumnDataTypes(),
         new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.STRING});
-    selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.peek();
+    lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 3);
     assertEquals(lastRow[0], "gFuH");
 
@@ -322,9 +321,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     dataSchema = resultsBlock.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "daysSinceEpoch"});
     assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
-    selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.peek();
+    lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 2);
     assertEquals(lastRow[0], "gFuH");
     assertEquals(lastRow[1], 126164076);
@@ -343,9 +342,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     dataSchema = resultsBlock.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "daysSinceEpoch"});
     assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
-    selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.peek();
+    lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 2);
     assertEquals(lastRow[0], "gFuH");
     assertEquals(lastRow[1], 167572854);
@@ -364,9 +363,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     dataSchema = resultsBlock.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "daysSinceEpoch"});
     assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
-    selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.peek();
+    lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 2);
     assertEquals(lastRow[0], "gFuH");
     assertEquals(lastRow[1], 167572854);
@@ -386,9 +385,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "column6", "column1"});
     assertEquals(dataSchema.getColumnDataTypes(),
         new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.INT});
-    selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.peek();
+    lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 3);
     assertEquals(lastRow[0], "gFuH");
     // Unsorted column values should be the same as ordering by their own
@@ -410,9 +409,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "column6", "column1"});
     assertEquals(dataSchema.getColumnDataTypes(),
         new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.INT});
-    selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.peek();
+    lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 3);
     assertEquals(lastRow[0], "gFuH");
     // Unsorted column values should be the same as ordering by their own
@@ -440,9 +439,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("column1"));
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT);
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT);
-    PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    Object[] lastRow = selectionResult.peek();
+    Object[] lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 11);
     assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
     assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 10542595);
@@ -465,9 +464,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("column1"));
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT);
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT);
-    selectionResult = resultsBlock.getRowsAsPriorityQueue();
+    selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.peek();
+    lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 11);
     assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
     assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 462769197);
@@ -493,9 +492,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     assertEquals(selectionDataSchema.size(), 11);
     assertTrue(columnIndexMap.containsKey("column5"));
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column5")), ColumnDataType.STRING);
-    PriorityQueue<Object[]> selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    Object[] lastRow = selectionResult.peek();
+    Object[] lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 11);
     assertEquals((lastRow[columnIndexMap.get("column5")]), "gFuH");
 
@@ -515,9 +514,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     assertEquals(selectionDataSchema.size(), 11);
     assertTrue(columnIndexMap.containsKey("column5"));
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column5")), ColumnDataType.STRING);
-    selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+    selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 10);
-    lastRow = selectionResult.peek();
+    lastRow = selectionResult.get(9);
     assertEquals(lastRow.length, 11);
     assertEquals((lastRow[columnIndexMap.get("column5")]), "gFuH");
   }
@@ -544,9 +543,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("column1"));
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT);
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT);
-    PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue();
+    List<Object[]> selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 12000);
-    Object[] lastRow = selectionResult.peek();
+    Object[] lastRow = selectionResult.get(11999);
     assertEquals(lastRow.length, 11);
     assertEquals((int) lastRow[columnIndexMap.get("column6")], 296467636);
     assertEquals((int) lastRow[columnIndexMap.get("column1")], 1715964282);
@@ -569,9 +568,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     assertTrue(columnIndexMap.containsKey("column1"));
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT);
     assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT);
-    selectionResult = resultsBlock.getRowsAsPriorityQueue();
+    selectionResult = resultsBlock.getRows();
     assertEquals(selectionResult.size(), 6129);
-    lastRow = selectionResult.peek();
+    lastRow = selectionResult.get(6128);
     assertEquals(lastRow.length, 11);
     assertEquals((int) lastRow[columnIndexMap.get("column6")], 499968041);
     assertEquals((int) lastRow[columnIndexMap.get("column1")], 335520083);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org