You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/05/30 21:29:46 UTC

[pinot] branch master updated: [multistage][bugfix] order by limit is capped at 10_000 (#10809)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a7ca904f4 [multistage][bugfix] order by limit is capped at 10_000 (#10809)
6a7ca904f4 is described below

commit 6a7ca904f497c75d91ae0700ae551796f4b0606f
Author: Rong Rong <ro...@apache.org>
AuthorDate: Tue May 30 14:29:39 2023 -0700

    [multistage][bugfix] order by limit is capped at 10_000 (#10809)
---
 .../apache/pinot/query/runtime/operator/SortOperator.java    | 12 ++++++++----
 .../pinot/query/runtime/operator/SortOperatorTest.java       |  3 ++-
 2 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index 1350242508..908bfb9406 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -34,6 +34,7 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.operator.utils.SortUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,13 +59,14 @@ public class SortOperator extends MultiStageOperator {
       List<RexExpression> collationKeys, List<RelFieldCollation.Direction> collationDirections, int fetch, int offset,
       DataSchema dataSchema, boolean isInputSorted) {
     this(context, upstreamOperator, collationKeys, collationDirections, fetch, offset, dataSchema, isInputSorted,
-        SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY);
+        SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY,
+        CommonConstants.Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
   }
 
   @VisibleForTesting
   SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator, List<RexExpression> collationKeys,
       List<RelFieldCollation.Direction> collationDirections, int fetch, int offset, DataSchema dataSchema,
-      boolean isInputSorted, int defaultHolderCapacity) {
+      boolean isInputSorted, int defaultHolderCapacity, int defaultResponseLimit) {
     super(context);
     _upstreamOperator = upstreamOperator;
     _fetch = fetch;
@@ -72,7 +74,9 @@ public class SortOperator extends MultiStageOperator {
     _dataSchema = dataSchema;
     _upstreamErrorBlock = null;
     _isSortedBlockConstructed = false;
-    _numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultHolderCapacity;
+    // Setting numRowsToKeep as default maximum on Broker if limit not set.
+    // TODO: make this default behavior configurable.
+    _numRowsToKeep = _fetch > 0 ? _fetch + _offset : defaultResponseLimit;
     // Under the following circumstances, the SortOperator is a simple selection with row trim on limit & offset:
     // - There are no collationKeys
     // - 'isInputSorted' is set to true indicating that the data was already sorted
@@ -82,7 +86,7 @@ public class SortOperator extends MultiStageOperator {
     } else {
       // Use the opposite direction as specified by the collation directions since we need the PriorityQueue to decide
       // which elements to keep and which to remove based on the limits.
-      _priorityQueue = new PriorityQueue<>(_numRowsToKeep,
+      _priorityQueue = new PriorityQueue<>(Math.min(defaultHolderCapacity, _numRowsToKeep),
           new SortUtils.SortComparator(collationKeys, collationDirections, dataSchema, false, true));
       _rows = null;
     }
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
index f2edadeae5..ebdb23aa3e 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java
@@ -334,7 +334,8 @@ public class SortOperatorTest {
     List<Direction> directions = ImmutableList.of(Direction.ASCENDING);
     DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT});
     SortOperator op =
-        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 0, 0, schema, false, 1);
+        new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 0, 0, schema, false, 10,
+            1);
 
     Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3}))
         .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org