You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/03/09 23:17:57 UTC
[pinot] branch master updated: Enhance select order-by combine to use merge sort (#10357)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6bd8a7dcff Enhance select order-by combine to use merge sort (#10357)
6bd8a7dcff is described below
commit 6bd8a7dcfff67b6fb80f96a60e502d7a6df65336
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Mar 9 15:17:51 2023 -0800
Enhance select order-by combine to use merge sort (#10357)
---
.../blocks/results/AggregationResultsBlock.java | 3 +-
.../operator/blocks/results/BaseResultsBlock.java | 3 +-
.../blocks/results/DistinctResultsBlock.java | 2 +-
.../blocks/results/ExceptionResultsBlock.java | 4 +-
.../blocks/results/ExplainResultsBlock.java | 3 +-
.../blocks/results/GroupByResultsBlock.java | 2 +-
.../blocks/results/MetadataResultsBlock.java | 4 +-
.../blocks/results/SelectionResultsBlock.java | 47 ++++-----
.../combine/BaseSingleBlockCombineOperator.java | 2 +-
...xValueBasedSelectionOrderByCombineOperator.java | 21 +---
.../combine/merger/ResultsBlockMerger.java | 10 --
.../merger/SelectionOnlyResultsBlockMerger.java | 7 +-
.../merger/SelectionOrderByResultsBlockMerger.java | 15 +--
.../query/LinearSelectionOrderByOperator.java | 111 ++++++++++-----------
.../operator/query/SelectionOrderByOperator.java | 22 ++--
.../query/selection/SelectionOperatorService.java | 7 +-
.../query/selection/SelectionOperatorUtils.java | 69 +++++++++----
.../core/query/utils/OrderByComparatorFactory.java | 14 +--
.../combine/SelectionCombineOperatorTest.java | 30 +++---
.../query/LinearSelectionOrderByOperatorTest.java | 26 ++---
.../selection/SelectionOperatorServiceTest.java | 40 ++++----
...InnerSegmentSelectionMultiValueQueriesTest.java | 11 +-
...erSegmentSelectionMultiValueRawQueriesTest.java | 9 +-
...nnerSegmentSelectionSingleValueQueriesTest.java | 63 ++++++------
24 files changed, 245 insertions(+), 280 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index 54fa0b9558..b2bb5b7308 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.operator.blocks.results;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import java.io.IOException;
import java.math.BigDecimal;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.pinot.common.datatable.DataTable;
@@ -77,7 +76,7 @@ public class AggregationResultsBlock extends BaseResultsBlock {
}
@Override
- public Collection<Object[]> getRows(QueryContext queryContext) {
+ public List<Object[]> getRows(QueryContext queryContext) {
return Collections.singletonList(_results.toArray());
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
index 9d119afce7..b611430d50 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.operator.blocks.results;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -171,7 +170,7 @@ public abstract class BaseResultsBlock implements Block {
* Returns the rows for the results. Return {@code null} when the block only contains metadata.
*/
@Nullable
- public abstract Collection<Object[]> getRows(QueryContext queryContext);
+ public abstract List<Object[]> getRows(QueryContext queryContext);
/**
* Returns a data table without metadata or exception attached.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
index 99a0f868c6..0cbeebd176 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
@@ -62,7 +62,7 @@ public class DistinctResultsBlock extends BaseResultsBlock {
}
@Override
- public Collection<Object[]> getRows(QueryContext queryContext) {
+ public List<Object[]> getRows(QueryContext queryContext) {
List<Object[]> rows = new ArrayList<>(_distinctTable.size());
for (Record record : _distinctTable.getRecords()) {
rows.add(record.getValues());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
index 783c7c22a4..02abb39b84 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.core.operator.blocks.results;
-import java.util.Collection;
+import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
@@ -51,7 +51,7 @@ public class ExceptionResultsBlock extends BaseResultsBlock {
@Nullable
@Override
- public Collection<Object[]> getRows(QueryContext queryContext) {
+ public List<Object[]> getRows(QueryContext queryContext) {
return null;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java
index bb3e71958b..5789c9ba76 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.operator.blocks.results;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -52,7 +51,7 @@ public class ExplainResultsBlock extends BaseResultsBlock {
}
@Override
- public Collection<Object[]> getRows(QueryContext queryContext) {
+ public List<Object[]> getRows(QueryContext queryContext) {
List<Object[]> rows = new ArrayList<>(_entries.size());
for (ExplainEntry entry : _entries) {
rows.add(entry.toRow());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index f431f4bd6f..7e5057b4d4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -149,7 +149,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
@Nullable
@Override
- public Collection<Object[]> getRows(QueryContext queryContext) {
+ public List<Object[]> getRows(QueryContext queryContext) {
if (_table == null) {
return Collections.emptyList();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
index 3ef747d2a1..97b941f3d6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.core.operator.blocks.results;
-import java.util.Collection;
+import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
@@ -41,7 +41,7 @@ public class MetadataResultsBlock extends BaseResultsBlock {
@Nullable
@Override
- public Collection<Object[]> getRows(QueryContext queryContext) {
+ public List<Object[]> getRows(QueryContext queryContext) {
return null;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
index 363cd1a4f5..b19c9898c6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
@@ -18,12 +18,9 @@
*/
package org.apache.pinot.core.operator.blocks.results;
-import com.google.common.base.Preconditions;
import java.io.IOException;
-import java.util.Collection;
import java.util.Comparator;
import java.util.List;
-import java.util.PriorityQueue;
import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.utils.DataSchema;
@@ -36,32 +33,37 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
*/
public class SelectionResultsBlock extends BaseResultsBlock {
private final DataSchema _dataSchema;
- private final Collection<Object[]> _rows;
private final Comparator<? super Object[]> _comparator;
+ private List<Object[]> _rows;
- public SelectionResultsBlock(DataSchema dataSchema, List<Object[]> rows) {
- this(dataSchema, rows, null);
- }
-
- public SelectionResultsBlock(DataSchema dataSchema, PriorityQueue<Object[]> rows) {
- this(dataSchema, rows, rows.comparator());
- }
-
- public SelectionResultsBlock(DataSchema dataSchema, Collection<Object[]> rows,
+ public SelectionResultsBlock(DataSchema dataSchema, List<Object[]> rows,
@Nullable Comparator<? super Object[]> comparator) {
_dataSchema = dataSchema;
_rows = rows;
_comparator = comparator;
}
+ public SelectionResultsBlock(DataSchema dataSchema, List<Object[]> rows) {
+ this(dataSchema, rows, null);
+ }
+
public DataSchema getDataSchema() {
return _dataSchema;
}
- public Collection<Object[]> getRows() {
+ public List<Object[]> getRows() {
return _rows;
}
+ public void setRows(List<Object[]> rows) {
+ _rows = rows;
+ }
+
+ @Nullable
+ public Comparator<? super Object[]> getComparator() {
+ return _comparator;
+ }
+
@Override
public int getNumRows() {
return _rows.size();
@@ -73,25 +75,10 @@ public class SelectionResultsBlock extends BaseResultsBlock {
}
@Override
- public Collection<Object[]> getRows(QueryContext queryContext) {
+ public List<Object[]> getRows(QueryContext queryContext) {
return _rows;
}
- public SelectionResultsBlock convertToPriorityQueueBased() {
- if (_rows instanceof PriorityQueue) {
- return this;
- }
- Preconditions.checkState(_comparator != null, "No comparator specified in the results block");
- PriorityQueue<Object[]> result = new PriorityQueue<>(_comparator);
- result.addAll(_rows);
- return new SelectionResultsBlock(_dataSchema, result);
- }
-
- public PriorityQueue<Object[]> getRowsAsPriorityQueue() {
- Preconditions.checkState(_rows instanceof PriorityQueue, "The results block is not backed by a priority queue");
- return (PriorityQueue<Object[]>) _rows;
- }
-
@Override
public DataTable getDataTable(QueryContext queryContext)
throws IOException {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java
index adfeb710fc..7366a61b69 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java
@@ -148,7 +148,7 @@ public abstract class BaseSingleBlockCombineOperator<T extends BaseResultsBlock>
return blockToMerge;
}
if (mergedBlock == null) {
- mergedBlock = _resultsBlockMerger.convertToMergeableBlock((T) blockToMerge);
+ mergedBlock = (T) blockToMerge;
} else {
_resultsBlockMerger.mergeResultsBlocks(mergedBlock, (T) blockToMerge);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
index f33be5538e..666332d5ac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java
@@ -285,17 +285,16 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator
return blockToMerge;
}
if (mergedBlock == null) {
- mergedBlock = convertToMergeableBlock((SelectionResultsBlock) blockToMerge);
+ mergedBlock = (SelectionResultsBlock) blockToMerge;
} else {
mergeResultsBlocks(mergedBlock, (SelectionResultsBlock) blockToMerge);
}
numBlocksMerged++;
// Update the boundary value if enough rows are collected
- PriorityQueue<Object[]> selectionResult = mergedBlock.getRowsAsPriorityQueue();
- if (selectionResult != null && selectionResult.size() == _numRowsToKeep) {
- assert selectionResult.peek() != null;
- _globalBoundaryValue.set((Comparable) selectionResult.peek()[0]);
+ List<Object[]> rows = mergedBlock.getRows();
+ if (rows.size() == _numRowsToKeep) {
+ _globalBoundaryValue.set((Comparable) rows.get(_numRowsToKeep - 1)[0]);
}
}
return mergedBlock;
@@ -315,17 +314,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator
QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
return;
}
-
- PriorityQueue<Object[]> mergedRows = mergedBlock.getRowsAsPriorityQueue();
- Collection<Object[]> rowsToMerge = blockToMerge.getRows();
- assert mergedRows != null && rowsToMerge != null;
- SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, _numRowsToKeep);
- }
-
- protected SelectionResultsBlock convertToMergeableBlock(SelectionResultsBlock resultsBlock) {
- // This may create a copy or return the same instance. Anyway, this operator is the owner of the
- // value now, so it can mutate it.
- return resultsBlock.convertToPriorityQueueBased();
+ SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge, _numRowsToKeep);
}
private static class MinMaxValueContext {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/ResultsBlockMerger.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/ResultsBlockMerger.java
index 0c77e66c93..318dfd2bd0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/ResultsBlockMerger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/ResultsBlockMerger.java
@@ -44,14 +44,4 @@ public interface ResultsBlockMerger<T extends BaseResultsBlock> {
default boolean isQuerySatisfied(T resultsBlock) {
return false;
}
-
- /**
- * Converts the given results block into a mergeable results block.
- *
- * <p>This conversion is necessary if a block is used as the first argument for:
- * {@link ResultsBlockMerger#mergeResultsBlocks(BaseResultsBlock, BaseResultsBlock)}.
- */
- default T convertToMergeableBlock(T resultsBlock) {
- return resultsBlock;
- }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOnlyResultsBlockMerger.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOnlyResultsBlockMerger.java
index 54b6185be1..aec95823c8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOnlyResultsBlockMerger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOnlyResultsBlockMerger.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.operator.combine.merger;
-import java.util.Collection;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
@@ -56,10 +55,6 @@ public class SelectionOnlyResultsBlockMerger implements ResultsBlockMerger<Selec
QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
return;
}
-
- Collection<Object[]> mergedRows = mergedBlock.getRows();
- Collection<Object[]> rowsToMerge = blockToMerge.getRows();
- assert mergedRows != null && rowsToMerge != null;
- SelectionOperatorUtils.mergeWithoutOrdering(mergedRows, rowsToMerge, _numRowsToKeep);
+ SelectionOperatorUtils.mergeWithoutOrdering(mergedBlock, blockToMerge, _numRowsToKeep);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOrderByResultsBlockMerger.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOrderByResultsBlockMerger.java
index 569b2b8e3b..a5483853cb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOrderByResultsBlockMerger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/merger/SelectionOrderByResultsBlockMerger.java
@@ -18,8 +18,6 @@
*/
package org.apache.pinot.core.operator.combine.merger;
-import java.util.Collection;
-import java.util.PriorityQueue;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
@@ -52,17 +50,6 @@ public class SelectionOrderByResultsBlockMerger implements ResultsBlockMerger<Se
QueryException.getException(QueryException.MERGE_RESPONSE_ERROR, errorMessage));
return;
}
-
- PriorityQueue<Object[]> mergedRows = mergedBlock.getRowsAsPriorityQueue();
- Collection<Object[]> rowsToMerge = blockToMerge.getRows();
- assert mergedRows != null && rowsToMerge != null;
- SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge, _numRowsToKeep);
- }
-
- @Override
- public SelectionResultsBlock convertToMergeableBlock(SelectionResultsBlock resultsBlock) {
- // This may create a copy or return the same instance. Anyway, this operator is the owner of the
- // value now, so it can mutate it.
- return resultsBlock.convertToPriorityQueueBased();
+ SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge, _numRowsToKeep);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java
index e9fe801262..c86e5a114d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperator.java
@@ -74,13 +74,8 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select
protected final List<OrderByExpressionContext> _orderByExpressions;
protected final TransformResultMetadata[] _expressionsMetadata;
protected final int _numRowsToKeep;
- private final Supplier<ListBuilder> _listBuilderSupplier;
- protected boolean _used = false;
- /**
- * The comparator used to build the resulting {@link SelectionResultsBlock}, which sorts rows in reverse order to the
- * one specified in the query.
- */
- protected Comparator<Object[]> _comparator;
+ protected final Comparator<Object[]> _comparator;
+ protected final Supplier<ListBuilder> _listBuilderSupplier;
/**
* @param expressions Order-by expressions must be at the head of the list.
@@ -107,21 +102,20 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select
}
_numRowsToKeep = queryContext.getOffset() + queryContext.getLimit();
+ _comparator =
+ OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, _nullHandlingEnabled);
if (_toSort.isEmpty()) {
_listBuilderSupplier = () -> new TotallySortedListBuilder(_numRowsToKeep);
} else {
Comparator<Object[]> sortedComparator =
- OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, false, _nullHandlingEnabled,
- 0, numSortedExpressions);
+ OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, _nullHandlingEnabled, 0,
+ numSortedExpressions);
Comparator<Object[]> unsortedComparator =
- OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, true, _nullHandlingEnabled,
+ OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, _nullHandlingEnabled,
numSortedExpressions, numOrderByExpressions);
_listBuilderSupplier = () -> new PartiallySortedListBuilder(_numRowsToKeep, sortedComparator, unsortedComparator);
}
-
- _comparator =
- OrderByComparatorFactory.getComparator(_orderByExpressions, _expressionsMetadata, true, _nullHandlingEnabled);
}
@Override
@@ -221,17 +215,7 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select
@Override
protected SelectionResultsBlock getNextBlock() {
- Preconditions.checkState(!_used, "nextBlock was called more than once");
- _used = true;
- List<Object[]> list = fetch(_listBuilderSupplier);
-
- DataSchema dataSchema = createDataSchema();
-
- if (list.size() > _numRowsToKeep) {
- list = new ArrayList<>(list.subList(0, _numRowsToKeep));
- }
-
- return new SelectionResultsBlock(dataSchema, list, _comparator);
+ return new SelectionResultsBlock(createDataSchema(), fetch(_listBuilderSupplier), _comparator);
}
protected DataSchema createDataSchema() {
@@ -335,25 +319,28 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select
*/
@VisibleForTesting
static class PartiallySortedListBuilder implements ListBuilder {
- /**
- * A list with all the elements that have been already sorted.
- */
- private final ArrayList<Object[]> _sorted;
- /**
- * This attribute is used to store the last partition when the builder already contains {@link #_maxNumRows} rows.
- */
- private PriorityQueue<Object[]> _lastPartitionQueue;
+
+ private final int _maxNumRows;
+
/**
* The comparator that defines the partitions and the one that impose in which order add has to be called.
*/
private final Comparator<Object[]> _partitionComparator;
+
/**
- * The comparator that sorts different rows on each partition, which sorts rows in reverse order to the one
- * specified in the query.
+ * The comparator that sorts different rows on each partition.
*/
private final Comparator<Object[]> _unsortedComparator;
- private final int _maxNumRows;
+ /**
+ * List of rows, where the first _numSortedRows are sorted.
+ */
+ private final ArrayList<Object[]> _rows;
+
+ /**
+ * This attribute is used to store the last partition when the builder already contains {@link #_maxNumRows} rows.
+ */
+ private PriorityQueue<Object[]> _lastPartitionQueue;
private Object[] _lastPartitionRow;
private int _numSortedRows;
@@ -361,50 +348,52 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select
public PartiallySortedListBuilder(int maxNumRows, Comparator<Object[]> partitionComparator,
Comparator<Object[]> unsortedComparator) {
_maxNumRows = maxNumRows;
- _sorted = new ArrayList<>(Integer.min(maxNumRows, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY));
_partitionComparator = partitionComparator;
_unsortedComparator = unsortedComparator;
+ _rows = new ArrayList<>(Integer.min(maxNumRows, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY));
}
@Override
public boolean add(Object[] row) {
if (_lastPartitionRow == null) {
_lastPartitionRow = row;
- _sorted.add(row);
+ _rows.add(row);
return false;
}
- int cmp = _partitionComparator.compare(row, _lastPartitionRow);
- if (cmp < 0) {
- throw new IllegalArgumentException(
- "Row with docId " + _sorted.size() + " is not sorted compared to the previous one");
- }
+ int compareResult = _partitionComparator.compare(row, _lastPartitionRow);
+ Preconditions.checkState(compareResult >= 0, "Rows are not sorted");
- boolean newPartition = cmp > 0;
- if (_sorted.size() < _maxNumRows) {
+ boolean newPartition = compareResult > 0;
+ int numRows = _rows.size();
+ if (numRows < _maxNumRows) {
// we don't have enough rows yet
if (newPartition) {
_lastPartitionRow = row;
- _numSortedRows = _sorted.size();
+ if (numRows - _numSortedRows > 1) {
+ _rows.subList(_numSortedRows, numRows).sort(_unsortedComparator);
+ }
+ _numSortedRows = numRows;
}
// just add the new row to the result list
- _sorted.add(row);
+ _rows.add(row);
return false;
}
// enough rows have been collected
- assert _sorted.size() == _maxNumRows;
- if (newPartition) { // and the new element belongs to a new partition, so we can just ignore it
+ assert numRows == _maxNumRows;
+ if (newPartition) {
+ // new element belongs to a new partition, so we can just ignore it
return true;
}
// new element doesn't belong to a new partition, so we may need to add it
- if (_lastPartitionQueue == null) { // we have exactly _numRows rows, and the new belongs to the last partition
+ if (_lastPartitionQueue == null) {
// we need to prepare the priority queue
- int numRowsInPriorityQueue = _maxNumRows - _numSortedRows;
- _lastPartitionQueue = new PriorityQueue<>(numRowsInPriorityQueue, _unsortedComparator);
- _lastPartitionQueue.addAll(_sorted.subList(_numSortedRows, _maxNumRows));
+ int numRowsInPriorityQueue = numRows - _numSortedRows;
+ _lastPartitionQueue = new PriorityQueue<>(numRowsInPriorityQueue, _unsortedComparator.reversed());
+ _lastPartitionQueue.addAll(_rows.subList(_numSortedRows, numRows));
}
// add the new element if it is lower than the greatest element stored in the partition
- if (_unsortedComparator.compare(row, _lastPartitionQueue.peek()) > 0) {
+ if (_unsortedComparator.compare(row, _lastPartitionQueue.peek()) < 0) {
_lastPartitionQueue.poll();
_lastPartitionQueue.offer(row);
}
@@ -413,14 +402,18 @@ public abstract class LinearSelectionOrderByOperator extends BaseOperator<Select
@Override
public List<Object[]> build() {
- if (_lastPartitionQueue != null) {
- assert _lastPartitionQueue.size() == _maxNumRows - _numSortedRows;
- Iterator<Object[]> lastPartitionIt = _lastPartitionQueue.iterator();
- for (int i = _numSortedRows; i < _maxNumRows; i++) {
- _sorted.set(i, lastPartitionIt.next());
+ int numRows = _rows.size();
+ if (_lastPartitionQueue == null) {
+ if (numRows - _numSortedRows > 1) {
+ _rows.subList(_numSortedRows, numRows).sort(_unsortedComparator);
+ }
+ } else {
+ assert numRows == _maxNumRows && _lastPartitionQueue.size() == numRows - _numSortedRows;
+ for (int i = numRows - 1; i >= _numSortedRows; i--) {
+ _rows.set(i, _lastPartitionQueue.poll());
}
}
- return _sorted;
+ return _rows;
}
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
index 107cbe0d7b..9c6702a481 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.query;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -80,6 +81,7 @@ public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock
private final List<OrderByExpressionContext> _orderByExpressions;
private final TransformResultMetadata[] _orderByExpressionMetadata;
private final int _numRowsToKeep;
+ private final Comparator<Object[]> _comparator;
private final PriorityQueue<Object[]> _rows;
private int _numDocsScanned = 0;
@@ -103,11 +105,10 @@ public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock
}
_numRowsToKeep = queryContext.getOffset() + queryContext.getLimit();
- Comparator<Object[]> comparator =
- OrderByComparatorFactory.getComparator(_orderByExpressions, _orderByExpressionMetadata, true,
- _nullHandlingEnabled);
+ _comparator =
+ OrderByComparatorFactory.getComparator(_orderByExpressions, _orderByExpressionMetadata, _nullHandlingEnabled);
_rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
- comparator);
+ _comparator.reversed());
}
@Override
@@ -183,7 +184,7 @@ public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock
}
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
- return new SelectionResultsBlock(dataSchema, _rows);
+ return new SelectionResultsBlock(dataSchema, getSortedRows(), _comparator);
}
/**
@@ -303,7 +304,16 @@ public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock
}
DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
- return new SelectionResultsBlock(dataSchema, _rows);
+ return new SelectionResultsBlock(dataSchema, getSortedRows(), _comparator);
+ }
+
+ private List<Object[]> getSortedRows() {
+ int numRows = _rows.size();
+ Object[][] sortedRows = new Object[numRows][];
+ for (int i = numRows - 1; i >= 0; i--) {
+ sortedRows[i] = _rows.poll();
+ }
+ return Arrays.asList(sortedRows);
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
index ff1ab76533..419a4df883 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorService.java
@@ -55,7 +55,6 @@ import org.roaringbitmap.RoaringBitmap;
* </li>
* </ul>
*/
-@SuppressWarnings("rawtypes")
public class SelectionOperatorService {
private final QueryContext _queryContext;
private final List<String> _selectionColumns;
@@ -65,7 +64,7 @@ public class SelectionOperatorService {
private final PriorityQueue<Object[]> _rows;
/**
- * Constructor for <code>SelectionOperatorService</code> with {@link DataSchema}. (Inter segment)
+ * Constructor for <code>SelectionOperatorService</code> with {@link DataSchema}. (Broker side)
*
* @param queryContext Selection order-by query
* @param dataSchema data schema.
@@ -78,6 +77,8 @@ public class SelectionOperatorService {
_offset = queryContext.getOffset();
_numRowsToKeep = _offset + queryContext.getLimit();
assert queryContext.getOrderByExpressions() != null;
+ // TODO: Do not use type compatible comparator for performance since we don't support different data schema on
+ // server side combine
_rows = new PriorityQueue<>(Math.min(_numRowsToKeep, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY),
SelectionOperatorUtils.getTypeCompatibleComparator(queryContext.getOrderByExpressions(), _dataSchema,
_queryContext.isNullHandlingEnabled()));
@@ -95,6 +96,8 @@ public class SelectionOperatorService {
/**
* Reduces a collection of {@link DataTable}s to selection rows for selection queries with <code>ORDER BY</code>.
* (Broker side)
+ * TODO: Do merge sort after releasing 0.13.0 when server side results are sorted
+ * Can also consider adding a data table metadata to indicate whether the server side results are sorted
*/
public void reduceWithOrdering(Collection<DataTable> dataTables, boolean nullHandlingEnabled) {
for (DataTable dataTable : dataTables) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 8ef2d405c3..dcb3cec6e6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -27,7 +27,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
@@ -40,6 +39,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.trace.Tracing;
@@ -192,37 +192,70 @@ public class SelectionOperatorUtils {
/**
* Merge two partial results for selection queries without <code>ORDER BY</code>. (Server side)
*
- * @param mergedRows partial results 1.
- * @param rowsToMerge partial results 2.
+ * @param mergedBlock partial results 1.
+ * @param blockToMerge partial results 2.
* @param selectionSize size of the selection.
*/
- public static void mergeWithoutOrdering(Collection<Object[]> mergedRows, Collection<Object[]> rowsToMerge,
+ public static void mergeWithoutOrdering(SelectionResultsBlock mergedBlock, SelectionResultsBlock blockToMerge,
int selectionSize) {
- Iterator<Object[]> iterator = rowsToMerge.iterator();
- int numMergedRows = 0;
- while (mergedRows.size() < selectionSize && iterator.hasNext()) {
- mergedRows.add(iterator.next());
- Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numMergedRows);
- numMergedRows++;
+ List<Object[]> mergedRows = mergedBlock.getRows();
+ List<Object[]> rowsToMerge = blockToMerge.getRows();
+ int numRowsToMerge = Math.min(selectionSize - mergedRows.size(), rowsToMerge.size());
+ if (numRowsToMerge > 0) {
+ mergedRows.addAll(rowsToMerge.subList(0, numRowsToMerge));
}
}
/**
* Merge two partial results for selection queries with <code>ORDER BY</code>. (Server side)
- * TODO: Should use type compatible comparator to compare the rows
*
- * @param mergedRows partial results 1.
- * @param rowsToMerge partial results 2.
+ * @param mergedBlock partial results 1 (sorted).
+ * @param blockToMerge partial results 2 (sorted).
* @param maxNumRows maximum number of rows need to be stored.
*/
- public static void mergeWithOrdering(PriorityQueue<Object[]> mergedRows, Collection<Object[]> rowsToMerge,
+ public static void mergeWithOrdering(SelectionResultsBlock mergedBlock, SelectionResultsBlock blockToMerge,
int maxNumRows) {
+ List<Object[]> sortedRows1 = mergedBlock.getRows();
+ List<Object[]> sortedRows2 = blockToMerge.getRows();
+ Comparator<? super Object[]> comparator = mergedBlock.getComparator();
+ assert comparator != null;
+ int numSortedRows1 = sortedRows1.size();
+ int numSortedRows2 = sortedRows2.size();
+ if (numSortedRows1 == 0) {
+ mergedBlock.setRows(sortedRows2);
+ return;
+ }
+ if (numSortedRows2 == 0 || (numSortedRows1 == maxNumRows
+ && comparator.compare(sortedRows1.get(numSortedRows1 - 1), sortedRows2.get(0)) <= 0)) {
+ return;
+ }
+ int numRowsToMerge = Math.min(numSortedRows1 + numSortedRows2, maxNumRows);
+ List<Object[]> mergedRows = new ArrayList<>(numRowsToMerge);
+ int i1 = 0;
+ int i2 = 0;
int numMergedRows = 0;
- for (Object[] row : rowsToMerge) {
- addToPriorityQueue(row, mergedRows, maxNumRows);
- Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numMergedRows);
- numMergedRows++;
+ while (i1 < numSortedRows1 && i2 < numSortedRows2 && numMergedRows < numRowsToMerge) {
+ Object[] row1 = sortedRows1.get(i1);
+ Object[] row2 = sortedRows2.get(i2);
+ if (comparator.compare(row1, row2) <= 0) {
+ mergedRows.add(row1);
+ i1++;
+ } else {
+ mergedRows.add(row2);
+ i2++;
+ }
+ Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(numMergedRows++);
+ }
+ if (numMergedRows < numRowsToMerge) {
+ if (i1 < numSortedRows1) {
+ assert i2 == numSortedRows2;
+ mergedRows.addAll(sortedRows1.subList(i1, i1 + numRowsToMerge - numMergedRows));
+ } else {
+ assert i1 == numSortedRows1;
+ mergedRows.addAll(sortedRows2.subList(i2, i2 + numRowsToMerge - numMergedRows));
+ }
}
+ mergedBlock.setRows(mergedRows);
}
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
index 4258317e76..5d25cc72a2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/utils/OrderByComparatorFactory.java
@@ -38,17 +38,13 @@ public class OrderByComparatorFactory {
}
public static Comparator<Object[]> getComparator(List<OrderByExpressionContext> orderByExpressions,
- TransformResultMetadata[] orderByExpressionMetadata, boolean reverse, boolean nullHandlingEnabled) {
- return getComparator(orderByExpressions, orderByExpressionMetadata, reverse, nullHandlingEnabled, 0,
+ TransformResultMetadata[] orderByExpressionMetadata, boolean nullHandlingEnabled) {
+ return getComparator(orderByExpressions, orderByExpressionMetadata, nullHandlingEnabled, 0,
orderByExpressions.size());
}
- /**
- * @param reverse if false, the comparator will order in the direction indicated by the
- * {@link OrderByExpressionContext#isAsc()}. Otherwise, it will be in the opposite direction.
- */
public static Comparator<Object[]> getComparator(List<OrderByExpressionContext> orderByExpressions,
- TransformResultMetadata[] orderByExpressionMetadata, boolean reverse, boolean nullHandlingEnabled, int from,
+ TransformResultMetadata[] orderByExpressionMetadata, boolean nullHandlingEnabled, int from,
int to) {
Preconditions.checkArgument(to <= orderByExpressions.size(),
"Trying to access %sth position of orderByExpressions with size %s", to, orderByExpressions.size());
@@ -76,13 +72,11 @@ public class OrderByComparatorFactory {
FieldSpec.DataType[] storedTypes = new FieldSpec.DataType[numValuesToCompare];
// Use multiplier -1 or 1 to control ascending/descending order
int[] multipliers = new int[numValuesToCompare];
- int ascMult = reverse ? -1 : 1;
- int descMult = reverse ? 1 : -1;
for (int i = 0; i < numValuesToCompare; i++) {
int valueIndex = valueIndexList.get(i);
valueIndices[i] = valueIndex;
storedTypes[i] = orderByExpressionMetadata[valueIndex].getDataType().getStoredType();
- multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? ascMult : descMult;
+ multipliers[i] = orderByExpressions.get(valueIndex).isAsc() ? 1 : -1;
}
if (nullHandlingEnabled) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
index 88b81d5b88..f7a73647fa 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/SelectionCombineOperatorTest.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
@@ -224,12 +223,11 @@ public class SelectionCombineOperatorTest {
SelectionResultsBlock combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn");
assertEquals(combineResult.getDataSchema(),
new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
- PriorityQueue<Object[]> selectionResult = combineResult.getRowsAsPriorityQueue();
- assertNotNull(selectionResult);
- assertEquals(selectionResult.size(), 10);
- int expectedValue = 9;
- while (!selectionResult.isEmpty()) {
- assertEquals((int) selectionResult.poll()[0], expectedValue--);
+ List<Object[]> rows = combineResult.getRows();
+ assertNotNull(rows);
+ assertEquals(rows.size(), 10);
+ for (int i = 0; i < 10; i++) {
+ assertEquals((int) rows.get(i)[0], i);
}
// Should early-terminate after processing the result of the first segment. Each thread should process at most 1
// segment.
@@ -248,12 +246,12 @@ public class SelectionCombineOperatorTest {
combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC");
assertEquals(combineResult.getDataSchema(),
new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
- selectionResult = combineResult.getRowsAsPriorityQueue();
- assertNotNull(selectionResult);
- assertEquals(selectionResult.size(), 10);
- expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 40;
- while (!selectionResult.isEmpty()) {
- assertEquals((int) selectionResult.poll()[0], expectedValue++);
+ rows = combineResult.getRows();
+ assertNotNull(rows);
+ assertEquals(rows.size(), 10);
+ int expectedValue = NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT / 2 + 49;
+ for (int i = 0; i < 10; i++) {
+ assertEquals((int) rows.get(i)[0], expectedValue - i);
}
// Should early-terminate after processing the result of the first segment. Each thread should process at most 1
// segment.
@@ -272,9 +270,9 @@ public class SelectionCombineOperatorTest {
combineResult = getCombineResult("SELECT * FROM testTable ORDER BY intColumn DESC LIMIT 10000");
assertEquals(combineResult.getDataSchema(),
new DataSchema(new String[]{INT_COLUMN}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}));
- selectionResult = combineResult.getRowsAsPriorityQueue();
- assertNotNull(selectionResult);
- assertEquals(selectionResult.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
+ rows = combineResult.getRows();
+ assertNotNull(rows);
+ assertEquals(rows.size(), NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
// Should not early-terminate
numDocsScanned = combineResult.getNumDocsScanned();
assertEquals(numDocsScanned, NUM_SEGMENTS * NUM_RECORDS_PER_SEGMENT);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java
index b7e2a2b6de..b7ed828aa2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/query/LinearSelectionOrderByOperatorTest.java
@@ -19,9 +19,7 @@
package org.apache.pinot.core.operator.query;
import java.util.Comparator;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import org.apache.pinot.core.operator.query.LinearSelectionOrderByOperator.PartiallySortedListBuilder;
import org.apache.pinot.core.operator.query.LinearSelectionOrderByOperator.TotallySortedListBuilder;
import org.testng.annotations.Test;
@@ -67,7 +65,7 @@ public class LinearSelectionOrderByOperatorTest {
public void testPartiallySortedListBuilder() {
int maxNumRows = 10;
Comparator<Object[]> partitionComparator = Comparator.comparingInt(row -> (Integer) row[0]);
- Comparator<Object[]> unsortedComparator = (row1, row2) -> Integer.compare((Integer) row2[1], (Integer) row1[1]);
+ Comparator<Object[]> unsortedComparator = Comparator.comparingInt(row -> (Integer) row[1]);
// Enough rows collected without tie rows
PartiallySortedListBuilder listBuilder =
@@ -81,7 +79,7 @@ public class LinearSelectionOrderByOperatorTest {
List<Object[]> rows = listBuilder.build();
assertEquals(rows.size(), maxNumRows);
for (int i = 0; i < maxNumRows; i++) {
- assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i});
+ assertEquals(rows.get(i), new Object[]{i / 2, i % 2 == 0 ? maxNumRows - i - 1 : maxNumRows - i + 1});
}
// Enough rows collected with tie rows
@@ -98,19 +96,13 @@ public class LinearSelectionOrderByOperatorTest {
rows = listBuilder.build();
assertEquals(rows.size(), maxNumRows);
// For the last partition, should contain unsorted value 0 and 1
- Set<Integer> unsortedValues = new HashSet<>();
for (int i = 0; i < maxNumRows; i++) {
if (i / 2 != lastPartitionValue) {
- assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i});
+ assertEquals(rows.get(i), new Object[]{i / 2, i % 2 == 0 ? maxNumRows - i - 1 : maxNumRows - i + 1});
} else {
- Object[] row = rows.get(i);
- assertEquals(row[0], lastPartitionValue);
- int unsortedValue = (int) row[1];
- assertTrue(unsortedValue == 0 || unsortedValue == 1);
- unsortedValues.add(unsortedValue);
+ assertEquals(rows.get(i), new Object[]{lastPartitionValue, i % 2});
}
}
- assertEquals(unsortedValues.size(), 2);
// Not enough rows collected with tie rows
listBuilder = new PartiallySortedListBuilder(maxNumRows, partitionComparator, unsortedComparator);
@@ -125,18 +117,12 @@ public class LinearSelectionOrderByOperatorTest {
rows = listBuilder.build();
assertEquals(rows.size(), maxNumRows);
// For the last partition, should contain unsorted value 0 and 1
- unsortedValues = new HashSet<>();
for (int i = 0; i < maxNumRows; i++) {
if (i / 2 != lastPartitionValue) {
- assertEquals(rows.get(i), new Object[]{i / 2, maxNumRows - i});
+ assertEquals(rows.get(i), new Object[]{i / 2, i % 2 == 0 ? maxNumRows - i - 1 : maxNumRows - i + 1});
} else {
- Object[] row = rows.get(i);
- assertEquals(row[0], lastPartitionValue);
- int unsortedValue = (int) row[1];
- assertTrue(unsortedValue == 0 || unsortedValue == 1);
- unsortedValues.add(unsortedValue);
+ assertEquals(rows.get(i), new Object[]{lastPartitionValue, i % 2});
}
}
- assertEquals(unsortedValues.size(), 2);
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
index df95b6a5e1..cdae8a37e6 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/selection/SelectionOperatorServiceTest.java
@@ -22,12 +22,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
-import java.util.PriorityQueue;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.segment.spi.IndexSegment;
@@ -38,6 +39,7 @@ import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
@@ -168,13 +170,15 @@ public class SelectionOperatorServiceTest {
@Test
public void testCompatibleRowsMergeWithoutOrdering() {
- ArrayList<Object[]> mergedRows = new ArrayList<>(2);
+ List<Object[]> mergedRows = new ArrayList<>(2);
mergedRows.add(_row1);
mergedRows.add(_row2);
- Collection<Object[]> rowsToMerge = new ArrayList<>(2);
+ SelectionResultsBlock mergedBlock = new SelectionResultsBlock(_dataSchema, mergedRows);
+ List<Object[]> rowsToMerge = new ArrayList<>(2);
rowsToMerge.add(_compatibleRow1);
rowsToMerge.add(_compatibleRow2);
- SelectionOperatorUtils.mergeWithoutOrdering(mergedRows, rowsToMerge, 3);
+ SelectionResultsBlock blockToMerge = new SelectionResultsBlock(_compatibleDataSchema, rowsToMerge);
+ SelectionOperatorUtils.mergeWithoutOrdering(mergedBlock, blockToMerge, 3);
assertEquals(mergedRows.size(), 3);
assertSame(mergedRows.get(0), _row1);
assertSame(mergedRows.get(1), _row2);
@@ -183,21 +187,23 @@ public class SelectionOperatorServiceTest {
@Test
public void testCompatibleRowsMergeWithOrdering() {
- SelectionOperatorService selectionOperatorService = new SelectionOperatorService(_queryContext, _dataSchema);
- PriorityQueue<Object[]> mergedRows = selectionOperatorService.getRows();
+ assertNotNull(_queryContext.getOrderByExpressions());
+ Comparator<Object[]> comparator =
+ SelectionOperatorUtils.getTypeCompatibleComparator(_queryContext.getOrderByExpressions(), _dataSchema,
+ _queryContext.isNullHandlingEnabled()).reversed();
int maxNumRows = _queryContext.getOffset() + _queryContext.getLimit();
- Collection<Object[]> rowsToMerge1 = new ArrayList<>(2);
- rowsToMerge1.add(_row1);
- rowsToMerge1.add(_row2);
- SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge1, maxNumRows);
- Collection<Object[]> rowsToMerge2 = new ArrayList<>(2);
- rowsToMerge2.add(_compatibleRow1);
- rowsToMerge2.add(_compatibleRow2);
- SelectionOperatorUtils.mergeWithOrdering(mergedRows, rowsToMerge2, maxNumRows);
+ SelectionResultsBlock mergedBlock = new SelectionResultsBlock(_dataSchema, Collections.emptyList(), comparator);
+ List<Object[]> rowsToMerge1 = Arrays.asList(_row2, _row1);
+ SelectionResultsBlock blockToMerge1 = new SelectionResultsBlock(_dataSchema, rowsToMerge1, comparator);
+ SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge1, maxNumRows);
+ List<Object[]> rowsToMerge2 = Arrays.asList(_compatibleRow2, _compatibleRow1);
+ SelectionResultsBlock blockToMerge2 = new SelectionResultsBlock(_compatibleDataSchema, rowsToMerge2, comparator);
+ SelectionOperatorUtils.mergeWithOrdering(mergedBlock, blockToMerge2, maxNumRows);
+ List<Object[]> mergedRows = mergedBlock.getRows();
assertEquals(mergedRows.size(), 3);
- assertSame(mergedRows.poll(), _compatibleRow1);
- assertSame(mergedRows.poll(), _row2);
- assertSame(mergedRows.poll(), _compatibleRow2);
+ assertSame(mergedRows.get(0), _compatibleRow2);
+ assertSame(mergedRows.get(1), _row2);
+ assertSame(mergedRows.get(2), _compatibleRow1);
}
@Test
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
index 876baf7a98..a2c62d888d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.queries;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.PriorityQueue;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
@@ -175,7 +174,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
DataSchema.ColumnDataType.INT_ARRAY);
- selectionResult = (List<Object[]>) resultsBlock.getRows();
+ selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
firstRow = selectionResult.get(0);
assertEquals(firstRow.length, 3);
@@ -203,9 +202,9 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
DataSchema.ColumnDataType.INT_ARRAY);
- PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- Object[] lastRow = selectionResult.peek();
+ Object[] lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 4);
assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ");
assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{1252});
@@ -228,9 +227,9 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
DataSchema.ColumnDataType.INT_ARRAY);
- selectionResult = resultsBlock.getRowsAsPriorityQueue();
+ selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.peek();
+ lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 4);
assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ");
assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{2147483647});
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
index 9e3abdc929..c28e8d9d9a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueRawQueriesTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.queries;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.PriorityQueue;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
@@ -203,9 +202,9 @@ public class InnerSegmentSelectionMultiValueRawQueriesTest extends BaseMultiValu
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
DataSchema.ColumnDataType.INT_ARRAY);
- PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- Object[] lastRow = selectionResult.peek();
+ Object[] lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 4);
assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ");
assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{1252});
@@ -228,9 +227,9 @@ public class InnerSegmentSelectionMultiValueRawQueriesTest extends BaseMultiValu
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), DataSchema.ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")),
DataSchema.ColumnDataType.INT_ARRAY);
- selectionResult = resultsBlock.getRowsAsPriorityQueue();
+ selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.peek();
+ lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 4);
assertEquals((String) lastRow[columnIndexMap.get("column5")], "AKXcXcIqsqOJFsdwxZ");
assertEquals(lastRow[columnIndexMap.get("column6")], new int[]{2147483647});
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index 9506bcbb9f..92d24fc72c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.queries;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.PriorityQueue;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -108,7 +107,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("daysSinceEpoch"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("daysSinceEpoch")), ColumnDataType.INT);
- PriorityQueue<Object[]> selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
for (Object[] row : selectionResult) {
assertEquals(row.length, 1);
@@ -232,9 +231,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("column1"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT);
- PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- Object[] lastRow = selectionResult.peek();
+ Object[] lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 4);
assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 10542595);
@@ -256,9 +255,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("column1"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT);
- selectionResult = resultsBlock.getRowsAsPriorityQueue();
+ selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.peek();
+ lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 4);
assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 462769197);
@@ -281,9 +280,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "column1", "column11"});
assertEquals(dataSchema.getColumnDataTypes(),
new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.STRING});
- PriorityQueue<Object[]> selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- Object[] lastRow = selectionResult.peek();
+ Object[] lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 3);
assertEquals(lastRow[0], "gFuH");
@@ -302,9 +301,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "column1", "column11"});
assertEquals(dataSchema.getColumnDataTypes(),
new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.STRING});
- selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.peek();
+ lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 3);
assertEquals(lastRow[0], "gFuH");
@@ -322,9 +321,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
dataSchema = resultsBlock.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "daysSinceEpoch"});
assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
- selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.peek();
+ lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 2);
assertEquals(lastRow[0], "gFuH");
assertEquals(lastRow[1], 126164076);
@@ -343,9 +342,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
dataSchema = resultsBlock.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "daysSinceEpoch"});
assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
- selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.peek();
+ lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 2);
assertEquals(lastRow[0], "gFuH");
assertEquals(lastRow[1], 167572854);
@@ -364,9 +363,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
dataSchema = resultsBlock.getDataSchema();
assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "daysSinceEpoch"});
assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT});
- selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.peek();
+ lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 2);
assertEquals(lastRow[0], "gFuH");
assertEquals(lastRow[1], 167572854);
@@ -386,9 +385,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "column6", "column1"});
assertEquals(dataSchema.getColumnDataTypes(),
new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.INT});
- selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.peek();
+ lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 3);
assertEquals(lastRow[0], "gFuH");
// Unsorted column values should be the same as ordering by their own
@@ -410,9 +409,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
assertEquals(dataSchema.getColumnNames(), new String[]{"column5", "column6", "column1"});
assertEquals(dataSchema.getColumnDataTypes(),
new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.INT});
- selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.peek();
+ lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 3);
assertEquals(lastRow[0], "gFuH");
// Unsorted column values should be the same as ordering by their own
@@ -440,9 +439,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("column1"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT);
- PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- Object[] lastRow = selectionResult.peek();
+ Object[] lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 11);
assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 10542595);
@@ -465,9 +464,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("column1"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT);
- selectionResult = resultsBlock.getRowsAsPriorityQueue();
+ selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.peek();
+ lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 11);
assertEquals(((Integer) lastRow[columnIndexMap.get("column6")]).intValue(), 6043515);
assertEquals(((Integer) lastRow[columnIndexMap.get("column1")]).intValue(), 462769197);
@@ -493,9 +492,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
assertEquals(selectionDataSchema.size(), 11);
assertTrue(columnIndexMap.containsKey("column5"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column5")), ColumnDataType.STRING);
- PriorityQueue<Object[]> selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- Object[] lastRow = selectionResult.peek();
+ Object[] lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 11);
assertEquals((lastRow[columnIndexMap.get("column5")]), "gFuH");
@@ -515,9 +514,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
assertEquals(selectionDataSchema.size(), 11);
assertTrue(columnIndexMap.containsKey("column5"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column5")), ColumnDataType.STRING);
- selectionResult = resultsBlock.convertToPriorityQueueBased().getRowsAsPriorityQueue();
+ selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 10);
- lastRow = selectionResult.peek();
+ lastRow = selectionResult.get(9);
assertEquals(lastRow.length, 11);
assertEquals((lastRow[columnIndexMap.get("column5")]), "gFuH");
}
@@ -544,9 +543,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("column1"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT);
- PriorityQueue<Object[]> selectionResult = resultsBlock.getRowsAsPriorityQueue();
+ List<Object[]> selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 12000);
- Object[] lastRow = selectionResult.peek();
+ Object[] lastRow = selectionResult.get(11999);
assertEquals(lastRow.length, 11);
assertEquals((int) lastRow[columnIndexMap.get("column6")], 296467636);
assertEquals((int) lastRow[columnIndexMap.get("column1")], 1715964282);
@@ -569,9 +568,9 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
assertTrue(columnIndexMap.containsKey("column1"));
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column6")), ColumnDataType.INT);
assertEquals(selectionDataSchema.getColumnDataType(columnIndexMap.get("column1")), ColumnDataType.INT);
- selectionResult = resultsBlock.getRowsAsPriorityQueue();
+ selectionResult = resultsBlock.getRows();
assertEquals(selectionResult.size(), 6129);
- lastRow = selectionResult.peek();
+ lastRow = selectionResult.get(6128);
assertEquals(lastRow.length, 11);
assertEquals((int) lastRow[columnIndexMap.get("column6")], 499968041);
assertEquals((int) lastRow[columnIndexMap.get("column1")], 335520083);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org