You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2022/05/17 09:05:43 UTC

[ignite] branch master updated: IGNITE-16013 SQL Calcite: Optimized sort-with-limit execution - Fixes #9987.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f138a47442 IGNITE-16013 SQL Calcite: Optimized sort-with-limit execution - Fixes #9987.
9f138a47442 is described below

commit 9f138a4744269388d8ae238d6dcdae3c97f9a1ed
Author: Vladimir Steshin <vl...@gmail.com>
AuthorDate: Tue May 17 11:58:19 2022 +0300

    IGNITE-16013 SQL Calcite: Optimized sort-with-limit execution - Fixes #9987.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../query/calcite/exec/LogicalRelImplementor.java  |  6 +-
 .../query/calcite/exec/rel/SortNode.java           | 69 +++++++++++++++--
 .../query/calcite/metadata/IgniteMdRowCount.java   |  6 ++
 .../query/calcite/metadata/cost/IgniteCost.java    |  6 ++
 .../processors/query/calcite/rel/IgniteLimit.java  | 29 +-------
 .../processors/query/calcite/rel/IgniteSort.java   | 86 +++++++++++++++++++---
 .../query/calcite/rule/SortConverterRule.java      | 29 +++++++-
 .../processors/query/calcite/trait/TraitUtils.java |  2 +-
 .../processors/query/calcite/util/RexUtils.java    | 15 ++++
 .../query/calcite/exec/rel/LimitExecutionTest.java | 56 ++++++++++++++
 .../calcite/planner/LimitOffsetPlannerTest.java    | 49 +++++++++---
 11 files changed, 296 insertions(+), 57 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 17be8cb2fd7..929707fea99 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -463,7 +463,11 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
     @Override public Node<Row> visit(IgniteSort rel) {
         RelCollation collation = rel.getCollation();
 
-        SortNode<Row> node = new SortNode<>(ctx, rel.getRowType(), expressionFactory.comparator(collation));
+        Supplier<Integer> offset = (rel.offset == null) ? null : expressionFactory.execute(rel.offset);
+        Supplier<Integer> fetch = (rel.fetch == null) ? null : expressionFactory.execute(rel.fetch);
+
+        SortNode<Row> node = new SortNode<>(ctx, rel.getRowType(), expressionFactory.comparator(collation), offset,
+            fetch);
 
         Node<Row> input = visit(rel.getInput());
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
index 00f3fdb59aa..56935bf6b7d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
@@ -16,12 +16,16 @@
  */
 package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.List;
 import java.util.PriorityQueue;
-
+import java.util.function.Supplier;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.util.GridBoundedPriorityQueue;
 import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Sort node.
@@ -39,14 +43,45 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
     /** Rows buffer. */
     private final PriorityQueue<Row> rows;
 
+    /** SQL select limit. Negative if disabled. */
+    private final int limit;
+
+    /** Reverse-ordered rows in case of limited sort. */
+    private List<Row> reversed;
+
     /**
      * @param ctx Execution context.
      * @param comp Rows comparator.
+     * @param offset Offset.
+     * @param fetch Limit.
      */
-    public SortNode(ExecutionContext<Row> ctx, RelDataType rowType, Comparator<Row> comp) {
+    public SortNode(
+        ExecutionContext<Row> ctx, RelDataType rowType,
+        Comparator<Row> comp,
+        @Nullable Supplier<Integer> offset,
+        @Nullable Supplier<Integer> fetch
+    ) {
         super(ctx, rowType);
 
-        rows = comp == null ? new PriorityQueue<>() : new PriorityQueue<>(comp);
+        assert fetch == null || fetch.get() >= 0;
+        assert offset == null || offset.get() >= 0;
+
+        limit = fetch == null ? -1 : fetch.get() + (offset == null ? 0 : offset.get());
+
+        if (limit < 0)
+            rows = new PriorityQueue<>(comp);
+        else {
+            rows = new GridBoundedPriorityQueue<>(limit, comp == null ? (Comparator<Row>)Comparator.reverseOrder()
+                : comp.reversed());
+        }
+    }
+
+    /**
+     * @param ctx Execution context.
+     * @param comp Rows comparator.
+     */
+    public SortNode(ExecutionContext<Row> ctx, RelDataType rowType, Comparator<Row> comp) {
+        this(ctx, rowType, comp, null, null);
     }
 
     /** {@inheritDoc} */
@@ -54,6 +89,8 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
         requested = 0;
         waiting = 0;
         rows.clear();
+        if (reversed != null)
+            reversed.clear();
     }
 
     /** {@inheritDoc} */
@@ -84,6 +121,7 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
     @Override public void push(Row row) throws Exception {
         assert downstream() != null;
         assert waiting > 0;
+        assert reversed == null || reversed.isEmpty();
 
         checkState();
 
@@ -118,12 +156,31 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
 
         inLoop = true;
         try {
-            while (requested > 0 && !rows.isEmpty()) {
+            // Prepare final order (reversed).
+            if (limit > 0 && !rows.isEmpty()) {
+                if (reversed == null)
+                    reversed = new ArrayList<>(rows.size());
+
+                while (!rows.isEmpty()) {
+                    reversed.add(rows.poll());
+
+                    if (++processed >= IN_BUFFER_SIZE) {
+                        // Allow the others to do their job.
+                        context().execute(this::flush, this::onError);
+
+                        return;
+                    }
+                }
+
+                processed = 0;
+            }
+
+            while (requested > 0 && (reversed == null ? !rows.isEmpty() : !reversed.isEmpty())) {
                 checkState();
 
                 requested--;
 
-                downstream().push(rows.poll());
+                downstream().push(reversed == null ? rows.poll() : reversed.remove(reversed.size() - 1));
 
                 if (++processed >= IN_BUFFER_SIZE && requested > 0) {
                     // allow others to do their job
@@ -133,7 +190,7 @@ public class SortNode<Row> extends AbstractNode<Row> implements SingleNode<Row>,
                 }
             }
 
-            if (rows.isEmpty()) {
+            if (reversed == null ? rows.isEmpty() : reversed.isEmpty()) {
                 if (requested > 0)
                     downstream().end();
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
index 1d92923e1a8..68a24893321 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
@@ -22,6 +22,7 @@ import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdRowCount;
 import org.apache.calcite.rel.metadata.RelMdUtil;
@@ -53,6 +54,11 @@ public class IgniteMdRowCount extends RelMdRowCount {
         return rel.estimateRowCount(mq);
     }
 
+    /** {@inheritDoc} */
+    @Override public Double getRowCount(Sort rel, RelMetadataQuery mq) {
+        return rel.estimateRowCount(mq);
+    }
+
     /** */
     @Nullable public static Double joinRowCount(RelMetadataQuery mq, Join rel) {
         if (!rel.getJoinType().projectsRight()) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
index babfb813901..543ec623cf2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/cost/IgniteCost.java
@@ -43,6 +43,12 @@ public class IgniteCost implements RelOptCost {
     /** Cost of a lookup at the hash. */
     public static final double HASH_LOOKUP_COST = 10;
 
+    /** In case the fetch value is a DYNAMIC_PARAM. */
+    public static final double FETCH_IS_PARAM_FACTOR = 0.01;
+
+    /** In case the offset value is a DYNAMIC_PARAM. */
+    public static final double OFFSET_IS_PARAM_FACTOR = 0.5;
+
     /**
      * With broadcast distribution each row will be sent to the each distination node,
      * thus the total bytes amount will be multiplies of the destination nodes count.
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
index 7f5aae17e3a..3bd29161807 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
@@ -30,22 +30,18 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.SingleRel;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 
+import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.FETCH_IS_PARAM_FACTOR;
+import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.OFFSET_IS_PARAM_FACTOR;
+import static org.apache.ignite.internal.processors.query.calcite.util.RexUtils.doubleFromRex;
+
 /** */
 public class IgniteLimit extends SingleRel implements IgniteRel {
-    /** In case the fetch value is a DYNAMIC_PARAM. */
-    private static final double FETCH_IS_PARAM_FACTOR = 0.01;
-
-    /** In case the offset value is a DYNAMIC_PARAM. */
-    private static final double OFFSET_IS_PARAM_FACTOR = 0.5;
-
     /** Offset. */
     private final RexNode offset;
 
@@ -160,23 +156,6 @@ public class IgniteLimit extends SingleRel implements IgniteRel {
         return Math.min(lim, inputRowCount - off);
     }
 
-    /**
-     * @return Integer value of the literal expression.
-     */
-    private double doubleFromRex(RexNode n, double def) {
-        try {
-            if (n.isA(SqlKind.LITERAL))
-                return ((RexLiteral)n).getValueAs(Integer.class);
-            else
-                return def;
-        }
-        catch (Exception e) {
-            assert false : "Unable to extract value: " + e.getMessage();
-
-            return def;
-        }
-    }
-
     /**
      * @return Offset.
      */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
index 87b753421cf..9f5be7c0ffb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSort.java
@@ -37,12 +37,43 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteC
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 
+import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.FETCH_IS_PARAM_FACTOR;
+import static org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost.OFFSET_IS_PARAM_FACTOR;
 import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
+import static org.apache.ignite.internal.processors.query.calcite.util.RexUtils.doubleFromRex;
 
 /**
  * Ignite sort operator.
  */
 public class IgniteSort extends Sort implements IgniteRel {
+    /** */
+    private final boolean enforcer;
+
+    /**
+     * Constructor.
+     *
+     * @param cluster Cluster.
+     * @param traits Trait set.
+     * @param child Input node.
+     * @param collation Collation.
+     * @param offset Offset.
+     * @param fetch Limit.
+     * @param enforcer Enforcer flag.
+     */
+    public IgniteSort(
+        RelOptCluster cluster,
+        RelTraitSet traits,
+        RelNode child,
+        RelCollation collation,
+        RexNode offset,
+        RexNode fetch,
+        boolean enforcer
+    ) {
+        super(cluster, traits, child, collation, offset, fetch);
+
+        this.enforcer = enforcer;
+    }
+
     /**
      * Constructor.
      *
@@ -50,18 +81,24 @@ public class IgniteSort extends Sort implements IgniteRel {
      * @param traits Trait set.
      * @param child Input node.
      * @param collation Collation.
+     * @param enforcer Enforcer flag.
      */
     public IgniteSort(
         RelOptCluster cluster,
         RelTraitSet traits,
         RelNode child,
-        RelCollation collation) {
-        super(cluster, traits, child, collation);
+        RelCollation collation,
+        boolean enforcer
+    ) {
+        this(cluster, traits, child, collation, null, null, enforcer);
     }
 
     /** */
     public IgniteSort(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
+
+        // No need to enforce anything on ready, fragmented and sent plan.
+        enforcer = false;
     }
 
     /** {@inheritDoc} */
@@ -72,9 +109,7 @@ public class IgniteSort extends Sort implements IgniteRel {
         RexNode offset,
         RexNode fetch
     ) {
-        assert offset == null && fetch == null;
-
-        return new IgniteSort(getCluster(), traitSet, newInput, newCollation);
+        return new IgniteSort(getCluster(), traitSet, newInput, traitSet.getCollation(), offset, fetch, enforcer);
     }
 
     /** {@inheritDoc} */
@@ -92,9 +127,13 @@ public class IgniteSort extends Sort implements IgniteRel {
         if (isEnforcer() || required.getConvention() != IgniteConvention.INSTANCE)
             return null;
 
-        RelCollation collation = TraitUtils.collation(required);
+        RelCollation requiredCollation = TraitUtils.collation(required);
+        RelCollation relCollation = traitSet.getCollation();
 
-        return Pair.of(required.replace(collation), ImmutableList.of(required.replace(RelCollations.EMPTY)));
+        if (!requiredCollation.satisfies(relCollation))
+            return null;
+
+        return Pair.of(required, ImmutableList.of(required.replace(RelCollations.EMPTY)));
     }
 
     /** {@inheritDoc} */
@@ -107,16 +146,24 @@ public class IgniteSort extends Sort implements IgniteRel {
         return Pair.of(childTraits.replace(collation()), ImmutableList.of(childTraits));
     }
 
+    /** {@inheritDoc} */
+    @Override public double estimateRowCount(RelMetadataQuery mq) {
+        return memRows(mq.getRowCount(getInput()));
+    }
+
     /** {@inheritDoc} */
     @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
-        double rows = mq.getRowCount(getInput());
+        double inputRows = mq.getRowCount(getInput());
+
+        double memRows = memRows(inputRows);
 
-        double cpuCost = rows * IgniteCost.ROW_PASS_THROUGH_COST + Util.nLogN(rows) * IgniteCost.ROW_COMPARISON_COST;
-        double memory = rows * getRowType().getFieldCount() * IgniteCost.AVERAGE_FIELD_SIZE;
+        double cpuCost = inputRows * IgniteCost.ROW_PASS_THROUGH_COST + Util.nLogM(inputRows, memRows)
+            * IgniteCost.ROW_COMPARISON_COST;
+        double memory = memRows * getRowType().getFieldCount() * IgniteCost.AVERAGE_FIELD_SIZE;
 
         IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
 
-        RelOptCost cost = costFactory.makeCost(rows, cpuCost, 0, memory, 0);
+        RelOptCost cost = costFactory.makeCost(inputRows, cpuCost, 0, memory, 0);
 
         // Distributed sorting is more preferable than sorting on the single node.
         if (TraitUtils.distribution(traitSet).satisfies(IgniteDistributions.single()))
@@ -127,6 +174,21 @@ public class IgniteSort extends Sort implements IgniteRel {
 
     /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
-        return new IgniteSort(cluster, getTraitSet(), sole(inputs), collation);
+        return new IgniteSort(cluster, getTraitSet(), sole(inputs), collation, offset, fetch, enforcer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isEnforcer() {
+        return enforcer;
+    }
+
+    /** Rows number to keep in memory and sort. */
+    private double memRows(double inputRows) {
+        double fetch = this.fetch != null ? doubleFromRex(this.fetch, inputRows * FETCH_IS_PARAM_FACTOR)
+            : inputRows;
+        double offset = this.offset != null ? doubleFromRex(this.offset, inputRows * OFFSET_IS_PARAM_FACTOR)
+            : 0;
+
+        return Math.min(inputRows, fetch + offset);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
index 1b26acab3d4..a3020b4c4d4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/SortConverterRule.java
@@ -16,11 +16,13 @@
  */
 package org.apache.ignite.internal.processors.query.calcite.rule;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelRule;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.logical.LogicalSort;
@@ -67,15 +69,36 @@ public class SortConverterRule extends RelRule<SortConverterRule.Config> {
                 .replace(sort.getCollation())
                 .replace(IgniteDistributions.single());
 
-            call.transformTo(new IgniteLimit(cluster, traits, convert(sort.getInput(), traits), sort.offset,
-                sort.fetch));
+            if (sort.collation == RelCollations.EMPTY || sort.fetch == null) {
+                call.transformTo(new IgniteLimit(cluster, traits, convert(sort.getInput(), traits), sort.offset,
+                    sort.fetch));
+            }
+            else {
+                RelNode igniteSort = new IgniteSort(
+                    cluster,
+                    cluster.traitSetOf(IgniteConvention.INSTANCE).replace(sort.getCollation()),
+                    convert(sort.getInput(), cluster.traitSetOf(IgniteConvention.INSTANCE)),
+                    sort.getCollation(),
+                    sort.offset,
+                    sort.fetch,
+                    false
+                );
+
+                call.transformTo(
+                    new IgniteLimit(cluster, traits, convert(igniteSort, traits), sort.offset, sort.fetch),
+                    ImmutableMap.of(
+                        new IgniteLimit(cluster, traits, convert(sort.getInput(), traits), sort.offset, sort.fetch),
+                        sort
+                    )
+                );
+            }
         }
         else {
             RelTraitSet outTraits = cluster.traitSetOf(IgniteConvention.INSTANCE).replace(sort.getCollation());
             RelTraitSet inTraits = cluster.traitSetOf(IgniteConvention.INSTANCE);
             RelNode input = convert(sort.getInput(), inTraits);
 
-            call.transformTo(new IgniteSort(cluster, outTraits, input, sort.getCollation()));
+            call.transformTo(new IgniteSort(cluster, outTraits, input, sort.getCollation(), false));
         }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 7218562727b..29744c06070 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -132,7 +132,7 @@ public class TraitUtils {
 
         RelTraitSet traits = rel.getTraitSet().replace(toTrait);
 
-        return new IgniteSort(rel.getCluster(), traits, rel, toTrait);
+        return new IgniteSort(rel.getCluster(), traits, rel, toTrait, true);
     }
 
     /** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java
index 77ee156cb99..b00b1dcbf5e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java
@@ -525,6 +525,21 @@ public class RexUtils {
         return keys;
     }
 
+    /** @return Double value of the literal expression. */
+    public static double doubleFromRex(RexNode n, double def) {
+        try {
+            if (n.isA(SqlKind.LITERAL))
+                return ((RexLiteral)n).getValueAs(Double.class);
+            else
+                return def;
+        }
+        catch (Exception e) {
+            assert false : "Unable to extract value: " + e.getMessage();
+
+            return def;
+        }
+    }
+
     /** */
     public static Set<CorrelationId> extractCorrelationIds(List<RexNode> nodes) {
         final Set<CorrelationId> cors = new HashSet<>();
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitExecutionTest.java
index ead96fa2e19..5c927d113b3 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/LimitExecutionTest.java
@@ -17,8 +17,12 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
@@ -48,6 +52,58 @@ public class LimitExecutionTest extends AbstractExecutionTest {
         checkLimit(2000, 3000);
     }
 
+    /** Tests Sort node can limit its output when fetch param is set. */
+    @Test
+    public void testSortLimit() throws Exception {
+        int bufSize = U.field(AbstractNode.class, "IN_BUFFER_SIZE");
+
+        checkLimitSort(0, 1);
+        checkLimitSort(1, 0);
+        checkLimitSort(1, 1);
+        checkLimitSort(0, bufSize);
+        checkLimitSort(bufSize, 0);
+        checkLimitSort(bufSize, bufSize);
+        checkLimitSort(bufSize - 1, 1);
+        checkLimitSort(2000, 0);
+        checkLimitSort(0, 3000);
+        checkLimitSort(2000, 3000);
+    }
+
+    /**
+     * @param offset Rows offset.
+     * @param fetch Fetch rows count (zero means unlimited).
+     */
+    private void checkLimitSort(int offset, int fetch) {
+        assert offset >= 0;
+        assert fetch >= 0;
+
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class);
+
+        RootNode<Object[]> rootNode = new RootNode<>(ctx, rowType);
+
+        SortNode<Object[]> sortNode = new SortNode<>(ctx, rowType, F::compareArrays, () -> offset,
+            fetch == 0 ? null : () -> fetch);
+
+        List<Object[]> data = IntStream.range(0, SourceNode.IN_BUFFER_SIZE + fetch + offset).boxed()
+            .map(i -> new Object[] {i}).collect(Collectors.toList());
+        Collections.shuffle(data);
+
+        ScanNode<Object[]> srcNode = new ScanNode<>(ctx, rowType, data);
+
+        rootNode.register(sortNode);
+
+        sortNode.register(srcNode);
+
+        for (int i = 0; i < offset + fetch; i++) {
+            assertTrue(rootNode.hasNext());
+            assertEquals(i, rootNode.next()[0]);
+        }
+
+        assertEquals(fetch == 0, rootNode.hasNext());
+    }
+
     /**
      * @param offset Rows offset.
      * @param fetch Fetch rows count (zero means unlimited).
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/LimitOffsetPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/LimitOffsetPlannerTest.java
index 595f7801fb5..ae338ce608e 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/LimitOffsetPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/LimitOffsetPlannerTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
@@ -33,6 +34,8 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem
 import org.apache.ignite.internal.util.typedef.F;
 import org.junit.Test;
 
+import static org.apache.ignite.internal.processors.query.calcite.util.RexUtils.doubleFromRex;
+
 /**
  * Planner test for LIMIT and OFFSET.
  */
@@ -82,16 +85,36 @@ public class LimitOffsetPlannerTest extends AbstractPlannerTest {
         IgniteSchema publicSchema = createSchemaWithTable(IgniteDistributions.random());
 
         // Simple case, Limit can't be pushed down under Exchange or Sort. Sort before Exchange is more preferable.
-        assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 10 OFFSET 10", publicSchema,
+        assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 5 OFFSET 10", publicSchema,
+            isInstanceOf(IgniteLimit.class)
+                .and(input(isInstanceOf(IgniteExchange.class)
+                    .and(input(isInstanceOf(IgniteSort.class)
+                        .and(s -> doubleFromRex(s.fetch, -1) == 5.0)
+                        .and(s -> doubleFromRex(s.offset, -1) == 10.0))))));
+
+        // Same simple case but witout offset.
+        assertPlan("SELECT * FROM TEST ORDER BY ID LIMIT 5", publicSchema,
+            isInstanceOf(IgniteLimit.class)
+                .and(input(isInstanceOf(IgniteExchange.class)
+                    .and(input(isInstanceOf(IgniteSort.class)
+                        .and(s -> doubleFromRex(s.fetch, -1) == 5.0)
+                        .and(s -> s.offset == null))))));
+
+        // No special liited sort required if LIMIT is not set.
+        assertPlan("SELECT * FROM TEST ORDER BY ID OFFSET 10", publicSchema,
             isInstanceOf(IgniteLimit.class)
                 .and(input(isInstanceOf(IgniteExchange.class)
-                    .and(input(isInstanceOf(IgniteSort.class))))));
+                    .and(input(isInstanceOf(IgniteSort.class)
+                        .and(s -> s.fetch == null)
+                        .and(s -> s.offset == null))))));
 
         // Simple case without ordering.
-        assertPlan("SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 10 ROWS ONLY", publicSchema,
+        assertPlan("SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 5 ROWS ONLY", publicSchema,
             isInstanceOf(IgniteLimit.class)
+                .and(s -> doubleFromRex(s.fetch(), -1) == 5)
+                .and(s -> doubleFromRex(s.offset(), -1) == 10)
                 .and(input(isInstanceOf(IgniteExchange.class)))
-                .and(hasChildThat(isInstanceOf(IgniteSort.class)).negate()));
+                    .and(hasChildThat(isInstanceOf(IgniteSort.class)).negate()));
 
         // Check that Sort node is not eliminated by aggregation and Exchange node is not eliminated by distribution
         // required by parent nodes.
@@ -99,27 +122,31 @@ public class LimitOffsetPlannerTest extends AbstractPlannerTest {
             nodeOrAnyChild(isInstanceOf(IgniteUnionAll.class)
                 .and(hasChildThat(isInstanceOf(IgniteLimit.class)
                     .and(input(isInstanceOf(IgniteExchange.class)
-                        .and(input(isInstanceOf(IgniteSort.class)))))))));
+                        .and(input(isInstanceOf(IgniteSort.class)
+                            .and(s -> doubleFromRex(s.fetch, -1) == 10.0)))))))));
 
         // Check that internal Sort node is not eliminated by external Sort node with different collation.
         assertPlan("SELECT * FROM (SELECT * FROM TEST ORDER BY ID LIMIT 10) ORDER BY VAL", publicSchema,
             nodeOrAnyChild(isInstanceOf(IgniteSort.class)
                 .and(hasChildThat(isInstanceOf(IgniteLimit.class)
                     .and(input(isInstanceOf(IgniteExchange.class)
-                        .and(input(isInstanceOf(IgniteSort.class)))))))));
+                        .and(input(isInstanceOf(IgniteSort.class)
+                            .and(s -> doubleFromRex(s.fetch, -1) == 10.0)))))))));
 
         // Check that extended collation is passed through the Limit node if it satisfies the Limit collation.
         assertPlan("SELECT * FROM (SELECT * FROM TEST ORDER BY ID LIMIT 10) ORDER BY ID, VAL", publicSchema,
             isInstanceOf(IgniteLimit.class)
                 .and(input(isInstanceOf(IgniteExchange.class)
                     .and(input(isInstanceOf(IgniteSort.class)
+                        .and(input(isInstanceOf(IgniteTableScan.class)))
                         .and(s -> s.collation().getKeys().equals(ImmutableIntList.of(0, 1))))))));
 
         // Check that external Sort node is not required if external collation is subset of internal collation.
         assertPlan("SELECT * FROM (SELECT * FROM TEST ORDER BY ID, VAL LIMIT 10) ORDER BY ID", publicSchema,
             isInstanceOf(IgniteLimit.class)
                 .and(input(isInstanceOf(IgniteExchange.class)
-                    .and(input(isInstanceOf(IgniteSort.class))))));
+                    .and(input(isInstanceOf(IgniteSort.class)
+                        .and(s -> doubleFromRex(s.fetch, -1) == 10.0))))));
 
         // Check double limit when external collation is a subset of internal collation.
         assertPlan("SELECT * FROM (SELECT * FROM TEST ORDER BY ID, VAL LIMIT 10) ORDER BY ID LIMIT 5 OFFSET 3",
@@ -127,7 +154,9 @@ public class LimitOffsetPlannerTest extends AbstractPlannerTest {
             isInstanceOf(IgniteLimit.class)
                 .and(input(isInstanceOf(IgniteLimit.class)
                     .and(input(isInstanceOf(IgniteExchange.class)
-                        .and(input(isInstanceOf(IgniteSort.class))))))));
+                        .and(input(isInstanceOf(IgniteSort.class)
+                            .and(s -> doubleFromRex(s.fetch, -1) == 10.0)
+                            .and(s -> s.offset == null))))))));
 
         // Check limit/exchange/sort rel order in subquery.
         assertPlan("SELECT NULLIF((SELECT id FROM test ORDER BY id LIMIT 1 OFFSET 1), id) FROM test",
@@ -135,7 +164,9 @@ public class LimitOffsetPlannerTest extends AbstractPlannerTest {
             hasChildThat(isInstanceOf(IgniteLimit.class)
                 .and(input(isInstanceOf(IgniteExchange.class)
                     .and(e -> e.distribution() == IgniteDistributions.single())
-                    .and(input(isInstanceOf(IgniteSort.class)))))));
+                    .and(input(isInstanceOf(IgniteSort.class)
+                        .and(s -> doubleFromRex(s.offset, -1) == 1)
+                        .and(s -> doubleFromRex(s.fetch, -1) == 1)))))));
 
         publicSchema = createSchemaWithTable(IgniteDistributions.random(), 0);