You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2022/05/17 09:05:43 UTC
[ignite] branch master updated: IGNITE-16013 SQL Calcite: Optimized sort-with-limit execution - Fixes #9987.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9f138a47442 IGNITE-16013 SQL Calcite: Optimized sort-with-limit execution - Fixes #9987.
9f138a47442 is described below
commit 9f138a4744269388d8ae238d6dcdae3c97f9a1ed
Author: Vladimir Steshin <vl...@gmail.com>
AuthorDate: Tue May 17 11:58:19 2022 +0300
IGNITE-16013 SQL Calcite: Optimized sort-with-limit execution - Fixes #9987.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../query/calcite/exec/LogicalRelImplementor.java | 6 +-
.../query/calcite/exec/rel/SortNode.java | 69 +++++++++++++++--
.../query/calcite/metadata/IgniteMdRowCount.java | 6 ++
.../query/calcite/metadata/cost/IgniteCost.java | 6 ++
.../processors/query/calcite/rel/IgniteLimit.java | 29 +-------
.../processors/query/calcite/rel/IgniteSort.java | 86 +++++++++++++++++++---
.../query/calcite/rule/SortConverterRule.java | 29 +++++++-
.../processors/query/calcite/trait/TraitUtils.java | 2 +-
.../processors/query/calcite/util/RexUtils.java | 15 ++++
.../query/calcite/exec/rel/LimitExecutionTest.java | 56 ++++++++++++++
.../calcite/planner/LimitOffsetPlannerTest.java | 49 +++++++++---
11 files changed, 296 insertions(+), 57 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 17be8cb2fd7..929707fea99 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -463,7 +463,11 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
@Override public Node<Row> visit(IgniteSort rel) {
RelCollation collation = rel.getCollation();
- SortNode<Row> node = new SortNode<>(ctx, rel.getRowType(), expressionFactory.comparator(collation));
+ Supplier<Integer> offset = (rel.offset == null) ? null : expressionFactory.execute(rel.offset);
+ Supplier<Integer> fetch = (rel.fetch == null) ? null : expressionFactory.execute(rel.fetch);
+
+ SortNode<Row> node = new SortNode<>(ctx, rel.getRowType(), expressionFactory.comparator(collation), offset,
+ fetch);
Node<Row> input = visit(rel.getInput());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
index 00f3fdb59aa..56935bf6b7d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
@@ -16,12 +16,16 @@
*/
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.List;
import java.util.PriorityQueue;
-
+import java.util.function.Supplier;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.util.GridBoundedPriorityQueue;
import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
/**
* Sort node.
@@ -39,14 +43,45 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
/** Rows buffer. */
private final PriorityQueue<Row> rows;
+ /** SQL select limit. Negative if disabled. */
+ private final int limit;
+
+ /** Reverse-ordered rows in case of limited sort. */
+ private List<Row> reversed;
+
/**
* @param ctx Execution context.
* @param comp Rows comparator.
+ * @param offset Offset.
+ * @param fetch Limit.
*/
- public SortNode(ExecutionContext<Row> ctx, RelDataType rowType, Comparator<Row> comp) {
+ public SortNode(
+ ExecutionContext<Row> ctx, RelDataType rowType,
+ Comparator<Row> comp,
+ @Nullable Supplier<Integer> offset,
+ @Nullable Supplier<Integer> fetch
+ ) {
super(ctx, rowType);
- rows = comp == null ? new PriorityQueue<>() : new PriorityQueue<>(comp);
+ assert fetch == null || fetch.get() >= 0;
+ assert offset == null || offset.get() >= 0;
+
+ limit = fetch == null ? -1 : fetch.get() + (offset == null ? 0 : offset.get());
+
+ if (limit < 0)
+ rows = new PriorityQueue<>(comp);
+ else {
+ rows = new GridBoundedPriorityQueue<>(limit, comp == null ? (Comparator<Row>)Comparator.reverseOrder()
+ : comp.reversed());
+ }
+ }
+
+ /**
+ * @param ctx Execution context.
+ * @param comp Rows comparator.
+ */
+ public SortNode(ExecutionContext<Row> ctx, RelDataType rowType, Comparator<Row> comp) {
+ this(ctx, rowType, comp, null, null);
}
/** {@inheritDoc} */
@@ -54,6 +89,8 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
requested = 0;
waiting = 0;
rows.clear();
+ if (reversed != null)
+ reversed.clear();
}
/** {@inheritDoc} */
@@ -84,6 +121,7 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
@Override public void push(Row row) throws Exception {
assert downstream() != null;
assert waiting > 0;
+ assert reversed == null || reversed.isEmpty();
checkState();
@@ -118,12 +156,31 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
inLoop = true;
try {
- while (requested > 0 && !rows.isEmpty()) {
+ // Prepare final order (reversed).
+ if (limit > 0 && !rows.isEmpty()) {
+ if (reversed == null)
+ reversed = new ArrayList<>(rows.size());
+
+ while (!rows.isEmpty()) {
+ reversed.add(rows.poll());
+
+ if (++processed >= IN_BUFFER_SIZE) {
+ // Allow the others to do their job.
+ context().execute(this::flush, this::onError);
+
+ return;
+ }
+ }
+
+ processed = 0;
+ }
+
+ while (requested > 0 && (reversed == null ? !rows.isEmpty() : !reversed.isEmpty())) {
checkState();
requested--;
- downstream().push(rows.poll());
+ downstream().push(reversed == null ? rows.poll() : reversed.remove(reversed.size() - 1));
if (++processed >= IN_BUFFER_SIZE && requested > 0) {
// allow others to do their job
@@ -133,7 +190,7 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
}
}
- if (rows.isEmpty()) {
+ if (reversed == null ? rows.isEmpty() : reversed.isEmpty()) {
if (requested > 0)
downstream().end();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
index 1d92923e1a8..68a24893321 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
@@ -22,6 +22,7 @@ import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMdRowCount;
import org.apache.calcite.rel.metadata.RelMdUtil;
@@ -53,6 +54,11 @@ public class IgniteMdRowCount extends RelMdRowCount {
return rel.estimateRowCount(mq);
}
+ /** {@inheritDoc} */
+ @Override public Double getRowCount(Sort rel, RelMetadataQuery mq) {
+ return rel.estimateRowCount(mq);
+ }
+
/** */
@Nullable public static Double joinRowCount(RelMetadataQuery mq, Join rel) {
if (!rel.getJoinType().projectsRight()) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
index babfb813901..543ec623cf2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
@@ -43,6 +43,12 @@ public class IgniteCost implements RelOptCost {
/** Cost of a lookup at the hash. */
public static final double HASH_LOOKUP_COST = 10;
+ /** In case the fetch value is a DYNAMIC_PARAM. */
+ public static final double FETCH_IS_PARAM_FACTOR = 0.01;
+
+ /** In case the offset value is a DYNAMIC_PARAM. */
+ public static final double OFFSET_IS_PARAM_FACTOR = 0.5;
+
/**
* With broadcast distribution each row will be sent to the each distination node,
* thus the total bytes amount will be multiplies of the destination nodes count.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
index 7f5aae17e3a..3bd29161807 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
@@ -30,22 +30,18 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.FETCH_IS_PARAM_FACTOR;
+import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.OFFSET_IS_PARAM_FACTOR;
+import static org.apache.ignite.internal.processors.query.calcite.util.RexUtils.doubleFromRex;
+
/** */
public class IgniteLimit extends SingleRel implements IgniteRel {
- /** In case the fetch value is a DYNAMIC_PARAM. */
- private static final double FETCH_IS_PARAM_FACTOR = 0.01;
-
- /** In case the offset value is a DYNAMIC_PARAM. */
- private static final double OFFSET_IS_PARAM_FACTOR = 0.5;
-
/** Offset. */
private final RexNode offset;
@@ -160,23 +156,6 @@ public class IgniteLimit extends SingleRel implements IgniteRel {
return Math.min(lim, inputRowCount - off);
}
- /**
- * @return Integer value of the literal expression.
- */
- private double doubleFromRex(RexNode n, double def) {
- try {
- if (n.isA(SqlKind.LITERAL))
- return ((RexLiteral)n).getValueAs(Integer.class);
- else
- return def;
- }
- catch (Exception e) {
- assert false : "Unable to extract value: " + e.getMessage();
-
- return def;
- }
- }
-
/**
* @return Offset.
*/
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
index 87b753421cf..9f5be7c0ffb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
@@ -37,12 +37,43 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteC
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.FETCH_IS_PARAM_FACTOR;
+import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.OFFSET_IS_PARAM_FACTOR;
import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+import static org.apache.ignite.internal.processors.query.calcite.util.RexUtils.doubleFromRex;
/**
* Ignite sort operator.
*/
public class IgniteSort extends Sort implements IgniteRel {
+ /** */
+ private final boolean enforcer;
+
+ /**
+ * Constructor.
+ *
+ * @param cluster Cluster.
+ * @param traits Trait set.
+ * @param child Input node.
+ * @param collation Collation.
+ * @param offset Offset.
+ * @param fetch Limit.
+ * @param enforcer Enforcer flag.
+ */
+ public IgniteSort(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode child,
+ RelCollation collation,
+ RexNode offset,
+ RexNode fetch,
+ boolean enforcer
+ ) {
+ super(cluster, traits, child, collation, offset, fetch);
+
+ this.enforcer = enforcer;
+ }
+
/**
* Constructor.
*
@@ -50,18 +81,24 @@ public class IgniteSort extends Sort implements IgniteRel {
* @param traits Trait set.
* @param child Input node.
* @param collation Collation.
+ * @param enforcer Enforcer flag.
*/
public IgniteSort(
RelOptCluster cluster,
RelTraitSet traits,
RelNode child,
- RelCollation collation) {
- super(cluster, traits, child, collation);
+ RelCollation collation,
+ boolean enforcer
+ ) {
+ this(cluster, traits, child, collation, null, null, enforcer);
}
/** */
public IgniteSort(RelInput input) {
super(changeTraits(input, IgniteConvention.INSTANCE));
+
+ // No need to enforce anything on ready, fragmented and sent plan.
+ enforcer = false;
}
/** {@inheritDoc} */
@@ -72,9 +109,7 @@ public class IgniteSort extends Sort implements IgniteRel {
RexNode offset,
RexNode fetch
) {
- assert offset == null && fetch == null;
-
- return new IgniteSort(getCluster(), traitSet, newInput, newCollation);
+ return new IgniteSort(getCluster(), traitSet, newInput, traitSet.getCollation(), offset, fetch, enforcer);
}
/** {@inheritDoc} */
@@ -92,9 +127,13 @@ public class IgniteSort extends Sort implements IgniteRel {
if (isEnforcer() || required.getConvention() != IgniteConvention.INSTANCE)
return null;
- RelCollation collation = TraitUtils.collation(required);
+ RelCollation requiredCollation = TraitUtils.collation(required);
+ RelCollation relCollation = traitSet.getCollation();
- return Pair.of(required.replace(collation), ImmutableList.of(required.replace(RelCollations.EMPTY)));
+ if (!requiredCollation.satisfies(relCollation))
+ return null;
+
+ return Pair.of(required, ImmutableList.of(required.replace(RelCollations.EMPTY)));
}
/** {@inheritDoc} */
@@ -107,16 +146,24 @@ public class IgniteSort extends Sort implements IgniteRel {
return Pair.of(childTraits.replace(collation()), ImmutableList.of(childTraits));
}
+ /** {@inheritDoc} */
+ @Override public double estimateRowCount(RelMetadataQuery mq) {
+ return memRows(mq.getRowCount(getInput()));
+ }
+
/** {@inheritDoc} */
@Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
- double rows = mq.getRowCount(getInput());
+ double inputRows = mq.getRowCount(getInput());
+
+ double memRows = memRows(inputRows);
- double cpuCost = rows * IgniteCost.ROW_PASS_THROUGH_COST + Util.nLogN(rows) * IgniteCost.ROW_COMPARISON_COST;
- double memory = rows * getRowType().getFieldCount() * IgniteCost.AVERAGE_FIELD_SIZE;
+ double cpuCost = inputRows * IgniteCost.ROW_PASS_THROUGH_COST + Util.nLogM(inputRows, memRows)
+ * IgniteCost.ROW_COMPARISON_COST;
+ double memory = memRows * getRowType().getFieldCount() * IgniteCost.AVERAGE_FIELD_SIZE;
IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
- RelOptCost cost = costFactory.makeCost(rows, cpuCost, 0, memory, 0);
+ RelOptCost cost = costFactory.makeCost(inputRows, cpuCost, 0, memory, 0);
// Distributed sorting is more preferable than sorting on the single node.
if (TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single()))
@@ -127,6 +174,21 @@ public class IgniteSort extends Sort implements IgniteRel {
/** {@inheritDoc} */
@Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
- return new IgniteSort(cluster, getTraitSet(), sole(inputs), collation);
+ return new IgniteSort(cluster, getTraitSet(), sole(inputs), collation, offset, fetch, enforcer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isEnforcer() {
+ return enforcer;
+ }
+
+ /** Rows number to keep in memory and sort. */
+ private double memRows(double inputRows) {
+ double fetch = this.fetch != null ? doubleFromRex(this.fetch, inputRows * FETCH_IS_PARAM_FACTOR)
+ : inputRows;
+ double offset = this.offset != null ? doubleFromRex(this.offset, inputRows * OFFSET_IS_PARAM_FACTOR)
+ : 0;
+
+ return Math.min(inputRows, fetch + offset);
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
index 1b26acab3d4..a3020b4c4d4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
@@ -16,11 +16,13 @@
*/
package org.apache.ignite.internal.processors.query.calcite.rule;
+import com.google.common.collect.ImmutableMap;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.logical.LogicalSort;
@@ -67,15 +69,36 @@ public class SortConverterRule extends RelRule<SortConverterRule.Config> {
.replace(sort.getCollation())
.replace(IgniteDistributions.single());
- call.transformTo(new IgniteLimit(cluster, traits, convert(sort.getInput(), traits), sort.offset,
- sort.fetch));
+ if (sort.collation == RelCollations.EMPTY || sort.fetch == null) {
+ call.transformTo(new IgniteLimit(cluster, traits, convert(sort.getInput(), traits), sort.offset,
+ sort.fetch));
+ }
+ else {
+ RelNode igniteSort = new IgniteSort(
+ cluster,
+ cluster.traitSetOf(IgniteConvention.INSTANCE).replace(sort.getCollation()),
+ convert(sort.getInput(), cluster.traitSetOf(IgniteConvention.INSTANCE)),
+ sort.getCollation(),
+ sort.offset,
+ sort.fetch,
+ false
+ );
+
+ call.transformTo(
+ new IgniteLimit(cluster, traits, convert(igniteSort, traits), sort.offset, sort.fetch),
+ ImmutableMap.of(
+ new IgniteLimit(cluster, traits, convert(sort.getInput(), traits), sort.offset, sort.fetch),
+ sort
+ )
+ );
+ }
}
else {
RelTraitSet outTraits = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(sort.getCollation());
RelTraitSet inTraits = cluster.traitSetOf(IgniteConvention.INSTANCE);
RelNode input = convert(sort.getInput(), inTraits);
- call.transformTo(new IgniteSort(cluster, outTraits, input, sort.getCollation()));
+ call.transformTo(new IgniteSort(cluster, outTraits, input, sort.getCollation(), false));
}
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 7218562727b..29744c06070 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -132,7 +132,7 @@ public class TraitUtils {
RelTraitSet traits = rel.getTraitSet().replace(toTrait);
- return new IgniteSort(rel.getCluster(), traits, rel, toTrait);
+ return new IgniteSort(rel.getCluster(), traits, rel, toTrait, true);
}
/** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java
index 77ee156cb99..b00b1dcbf5e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java
@@ -525,6 +525,21 @@ public class RexUtils {
return keys;
}
+ /** @return Double value of the literal expression. */
+ public static double doubleFromRex(RexNode n, double def) {
+ try {
+ if (n.isA(SqlKind.LITERAL))
+ return ((RexLiteral)n).getValueAs(Double.class);
+ else
+ return def;
+ }
+ catch (Exception e) {
+ assert false : "Unable to extract value: " + e.getMessage();
+
+ return def;
+ }
+ }
+
/** */
public static Set<CorrelationId> extractCorrelationIds(List<RexNode> nodes) {
final Set<CorrelationId> cors = new HashSet<>();
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitExecutionTest.java
index ead96fa2e19..5c927d113b3 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitExecutionTest.java
@@ -17,8 +17,12 @@
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
@@ -48,6 +52,58 @@ public class LimitExecutionTest extends AbstractExecutionTest {
checkLimit(2000, 3000);
}
+ /** Tests Sort node can limit its output when fetch param is set. */
+ @Test
+ public void testSortLimit() throws Exception {
+ int bufSize = U.field(AbstractNode.class, "IN_BUFFER_SIZE");
+
+ checkLimitSort(0, 1);
+ checkLimitSort(1, 0);
+ checkLimitSort(1, 1);
+ checkLimitSort(0, bufSize);
+ checkLimitSort(bufSize, 0);
+ checkLimitSort(bufSize, bufSize);
+ checkLimitSort(bufSize - 1, 1);
+ checkLimitSort(2000, 0);
+ checkLimitSort(0, 3000);
+ checkLimitSort(2000, 3000);
+ }
+
+ /**
+ * @param offset Rows offset.
+ * @param fetch Fetch rows count (zero means unlimited).
+ */
+ private void checkLimitSort(int offset, int fetch) {
+ assert offset >= 0;
+ assert fetch >= 0;
+
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class);
+
+ RootNode<Object[]> rootNode = new RootNode<>(ctx, rowType);
+
+ SortNode<Object[]> sortNode = new SortNode<>(ctx, rowType, F::compareArrays, () -> offset,
+ fetch == 0 ? null : () -> fetch);
+
+ List<Object[]> data = IntStream.range(0, SourceNode.IN_BUFFER_SIZE + fetch + offset).boxed()
+ .map(i -> new Object[] {i}).collect(Collectors.toList());
+ Collections.shuffle(data);
+
+ ScanNode<Object[]> srcNode = new ScanNode<>(ctx, rowType, data);
+
+ rootNode.register(sortNode);
+
+ sortNode.register(srcNode);
+
+ for (int i = 0; i < offset + fetch; i++) {
+ assertTrue(rootNode.hasNext());
+ assertEquals(i, rootNode.next()[0]);
+ }
+
+ assertEquals(fetch == 0, rootNode.hasNext());
+ }
+
/**
* @param offset Rows offset.
* @param fetch Fetch rows count (zero means unlimited).
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/LimitOffsetPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/LimitOffsetPlannerTest.java
index 595f7801fb5..ae338ce608e 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/LimitOffsetPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/LimitOffsetPlannerTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
@@ -33,6 +34,8 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem
import org.apache.ignite.internal.util.typedef.F;
import org.junit.Test;
+import static org.apache.ignite.internal.processors.query.calcite.util.RexUtils.doubleFromRex;
+
/**
* Planner test for LIMIT and OFFSET.
*/
@@ -82,16 +85,36 @@ public class LimitOffsetPlannerTest extends AbstractPlannerTest {
IgniteSchema publicSchema = createSchemaWithTable(IgniteDistributions.random());
// Simple case, Limit can't be pushed down under Exchange or Sort. Sort before Exchange is more preferable.
- assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 10 OFFSET 10", publicSchema,
+ assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 5 OFFSET 10", publicSchema,
+ isInstanceOf(IgniteLimit.class)
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5.0)
+ .and(s -> doubleFromRex(s.offset, -1) == 10.0))))));
+
+ // Same simple case but witout offset.
+ assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 5", publicSchema,
+ isInstanceOf(IgniteLimit.class)
+ .and(input(isInstanceOf(IgniteExchange.class)
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 5.0)
+ .and(s -> s.offset == null))))));
+
+ // No special liited sort required if LIMIT is not set.
+ assertPlan("SELECT * FROM TEST ORDER BY ID OFFSET 10", publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> s.fetch == null)
+ .and(s -> s.offset == null))))));
// Simple case without ordering.
- assertPlan("SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 10 ROWS ONLY", publicSchema,
+ assertPlan("SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 5 ROWS ONLY", publicSchema,
isInstanceOf(IgniteLimit.class)
+ .and(s -> doubleFromRex(s.fetch(), -1) == 5)
+ .and(s -> doubleFromRex(s.offset(), -1) == 10)
.and(input(isInstanceOf(IgniteExchange.class)))
- .and(hasChildThat(isInstanceOf(IgniteSort.class)).negate()));
+ .and(hasChildThat(isInstanceOf(IgniteSort.class)).negate()));
// Check that Sort node is not eliminated by aggregation and Exchange node is not eliminated by distribution
// required by parent nodes.
@@ -99,27 +122,31 @@ public class LimitOffsetPlannerTest extends AbstractPlannerTest {
nodeOrAnyChild(isInstanceOf(IgniteUnionAll.class)
.and(hasChildThat(isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class)))))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 10.0)))))))));
// Check that internal Sort node is not eliminated by external Sort node with different collation.
assertPlan("SELECT * FROM (SELECT * FROM TEST ORDER BY ID LIMIT 10) ORDER BY VAL", publicSchema,
nodeOrAnyChild(isInstanceOf(IgniteSort.class)
.and(hasChildThat(isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class)))))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 10.0)))))))));
// Check that extended collation is passed through the Limit node if it satisfies the Limit collation.
assertPlan("SELECT * FROM (SELECT * FROM TEST ORDER BY ID LIMIT 10) ORDER BY ID, VAL", publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
.and(input(isInstanceOf(IgniteSort.class)
+ .and(input(isInstanceOf(IgniteTableScan.class)))
.and(s -> s.collation().getKeys().equals(ImmutableIntList.of(0, 1))))))));
// Check that external Sort node is not required if external collation is subset of internal collation.
assertPlan("SELECT * FROM (SELECT * FROM TEST ORDER BY ID, VAL LIMIT 10) ORDER BY ID", publicSchema,
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 10.0))))));
// Check double limit when external collation is a subset of internal collation.
assertPlan("SELECT * FROM (SELECT * FROM TEST ORDER BY ID, VAL LIMIT 10) ORDER BY ID LIMIT 5 OFFSET 3",
@@ -127,7 +154,9 @@ public class LimitOffsetPlannerTest extends AbstractPlannerTest {
isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
- .and(input(isInstanceOf(IgniteSort.class))))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.fetch, -1) == 10.0)
+ .and(s -> s.offset == null))))))));
// Check limit/exchange/sort rel order in subquery.
assertPlan("SELECT NULLIF((SELECT id FROM test ORDER BY id LIMIT 1 OFFSET 1), id) FROM test",
@@ -135,7 +164,9 @@ public class LimitOffsetPlannerTest extends AbstractPlannerTest {
hasChildThat(isInstanceOf(IgniteLimit.class)
.and(input(isInstanceOf(IgniteExchange.class)
.and(e -> e.distribution() == IgniteDistributions.single())
- .and(input(isInstanceOf(IgniteSort.class)))))));
+ .and(input(isInstanceOf(IgniteSort.class)
+ .and(s -> doubleFromRex(s.offset, -1) == 1)
+ .and(s -> doubleFromRex(s.fetch, -1) == 1)))))));
publicSchema = createSchemaWithTable(IgniteDistributions.random(), 0);