You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/01/15 09:59:03 UTC

[ignite] branch sql-calcite updated: IGNITE-13545 Calcite integration. Index spool (#8596)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new f55f922  IGNITE-13545 Calcite integration. Index spool (#8596)
f55f922 is described below

commit f55f922c5c21667ae9e5cc3fb1c9491d17c38c3c
Author: tledkov <tl...@gridgain.com>
AuthorDate: Fri Jan 15 12:58:47 2021 +0300

    IGNITE-13545 Calcite integration. Index spool (#8596)
---
 .../query/calcite/exec/AbstractIndexScan.java      | 166 ++++++
 .../processors/query/calcite/exec/IndexScan.java   | 145 ++---
 .../query/calcite/exec/LogicalRelImplementor.java  |  39 +-
 .../query/calcite/exec/RuntimeTreeIndex.java       | 203 +++++++
 .../{TableSpoolNode.java => IndexSpoolNode.java}   | 152 +++--
 .../query/calcite/exec/rel/TableSpoolNode.java     |   2 +-
 .../query/calcite/metadata/IgniteMdRowCount.java   |  10 +
 .../calcite/metadata/IgniteMdSelectivity.java      |  15 +
 .../processors/query/calcite/prepare/Cloner.java   |  16 +-
 .../processors/query/calcite/prepare/Fragment.java |   6 +-
 .../query/calcite/prepare/FragmentSplitter.java    |   1 +
 .../query/calcite/prepare/IgniteRelShuttle.java    |   6 +
 .../query/calcite/prepare/PlannerPhase.java        |   4 +-
 .../processors/query/calcite/prepare/Splitter.java |   6 +-
 .../query/calcite/rel/AbstractIndexScan.java       |  73 +--
 .../query/calcite/rel/IgniteAggregate.java         |   4 +-
 .../rel/IgniteCorrelatedNestedLoopJoin.java        |  59 +-
 .../query/calcite/rel/IgniteIndexScan.java         |  25 +-
 .../query/calcite/rel/IgniteIndexSpool.java        | 150 +++++
 .../processors/query/calcite/rel/IgniteLimit.java  |  13 +
 .../query/calcite/rel/IgniteMergeJoin.java         |  70 +--
 .../query/calcite/rel/IgniteRelVisitor.java        |   5 +
 .../query/calcite/rel/IgniteTrimExchange.java      |   5 +-
 .../rel/ProjectableFilterableTableScan.java        |   2 +-
 .../rel/logical/IgniteLogicalIndexScan.java        |  44 +-
 .../rel/logical/IgniteLogicalTableScan.java        |   5 +-
 .../calcite/rule/CorrelatedNestedLoopJoinRule.java |  18 +-
 .../query/calcite/rule/FilterSpoolMergeRule.java   | 116 ++++
 .../calcite/rule/LogicalScanConverterRule.java     |  29 +-
 .../calcite/rule/logical/ProjectScanMergeRule.java |  57 +-
 .../query/calcite/schema/IgniteIndex.java          |   4 +-
 .../query/calcite/schema/TableDescriptorImpl.java  |   8 +-
 .../processors/query/calcite/trait/TraitUtils.java |  14 +
 .../processors/query/calcite/util/Commons.java     |  51 +-
 .../query/calcite/util/IndexConditions.java        | 141 +++++
 .../processors/query/calcite/util/RexUtils.java    |  56 +-
 ...oolTest.java => IndexSpoolIntegrationTest.java} |  15 +-
 .../query/calcite/exec/RuntimeTreeIndexTest.java   | 196 ++++++
 .../calcite/exec/rel/AbstractExecutionTest.java    | 100 ++++
 .../query/calcite/exec/rel/ExecutionTest.java      | 144 -----
 .../calcite/exec/rel/IndexSpoolExecutionTest.java  | 191 ++++++
 .../calcite/exec/rel/TableSpoolExecutionTest.java  | 104 ++++
 .../query/calcite/planner/AbstractPlannerTest.java | 654 +++++++++++++++++++++
 .../CorrelatedNestedLoopJoinPlannerTest.java       | 172 ++++++
 .../calcite/planner/IndexSpoolPlannerTest.java     | 267 +++++++++
 .../query/calcite/{ => planner}/PlannerTest.java   | 549 +----------------
 .../calcite/planner/TableSpoolPlannerTest.java     | 138 +++++
 ...lciteTestSuite.java => ExecutionTestSuite.java} |  32 +-
 .../ignite/testsuites/IgniteCalciteTestSuite.java  |  12 +-
 .../apache/ignite/testsuites/PlannerTestSuite.java |  38 ++
 50 files changed, 3227 insertions(+), 1105 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
new file mode 100644
index 0000000..096e4ff
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractIndexScan.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.query.GridIndex;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Abstract index scan.
+ */
+public abstract class AbstractIndexScan<Row, IdxRow> implements Iterable<Row>, AutoCloseable {
+    /** */
+    private final GridIndex<IdxRow> idx;
+
+    /** Additional filters. */
+    private final Predicate<Row> filters;
+
+    /** Lower index scan bound. */
+    private final Supplier<Row> lowerBound;
+
+    /** Upper index scan bound. */
+    private final Supplier<Row> upperBound;
+
+    /** */
+    private final Function<Row, Row> rowTransformer;
+
+    /** */
+    protected final ExecutionContext<Row> ectx;
+
+    /** */
+    protected final RelDataType rowType;
+
+    /**
+     * @param ectx Execution context.
+     * @param idx Physical index.
+     * @param filters Additional filters.
+     * @param lowerBound Lower index scan bound.
+     * @param upperBound Upper index scan bound.
+     */
+    protected AbstractIndexScan(
+        ExecutionContext<Row> ectx,
+        RelDataType rowType,
+        GridIndex<IdxRow> idx,
+        Predicate<Row> filters,
+        Supplier<Row> lowerBound,
+        Supplier<Row> upperBound,
+        Function<Row, Row> rowTransformer
+    ) {
+        this.ectx = ectx;
+        this.rowType = rowType;
+        this.idx = idx;
+        this.filters = filters;
+        this.lowerBound = lowerBound;
+        this.upperBound = upperBound;
+        this.rowTransformer = rowTransformer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized Iterator<Row> iterator() {
+        IdxRow lower = lowerBound == null ? null : row2indexRow(lowerBound.get());
+        IdxRow upper = upperBound == null ? null : row2indexRow(upperBound.get());
+
+        return new IteratorImpl(idx.find(lower, upper, filterClosure()));
+    }
+
+    /** */
+    protected abstract IdxRow row2indexRow(Row bound);
+
+    /** */
+    protected abstract Row indexRow2Row(IdxRow idxRow) throws IgniteCheckedException;
+
+    /** */
+    protected abstract BPlusTree.TreeRowClosure<IdxRow, IdxRow> filterClosure();
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+
+    /** */
+    private class IteratorImpl extends GridIteratorAdapter<Row> {
+        /** */
+        private final GridCursor<IdxRow> cursor;
+
+        /** Next element. */
+        private Row next;
+
+        /** */
+        private IteratorImpl(@NotNull GridCursor<IdxRow> cursor) {
+            this.cursor = cursor;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNextX() throws IgniteCheckedException {
+            advance();
+
+            return next != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Row nextX() throws IgniteCheckedException {
+            advance();
+
+            if (next == null)
+                throw new NoSuchElementException();
+
+            Row res = next;
+
+            next = null;
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeX() {
+            throw new UnsupportedOperationException("Remove is not supported.");
+        }
+
+        /** */
+        private void advance() throws IgniteCheckedException {
+            assert cursor != null;
+
+            if (next != null)
+                return;
+
+            while (next == null && cursor.next()) {
+                IdxRow idxRow = cursor.get();
+
+                Row r = indexRow2Row(idxRow);
+
+                if (filters != null && !filters.test(r))
+                    continue;
+
+                if (rowTransformer != null)
+                    r = rowTransformer.apply(r);
+
+                next = r;
+            }
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
index fbacfde..b9d3c9d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java
@@ -20,11 +20,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
-import org.apache.calcite.rel.type.RelDataType;
+
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -47,20 +46,17 @@ import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.database.H2TreeFilterClosure;
 import org.apache.ignite.internal.processors.query.h2.opt.H2PlainRow;
 import org.apache.ignite.internal.processors.query.h2.opt.H2Row;
-import org.apache.ignite.internal.util.lang.GridCursor;
-import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
 import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
 import org.h2.value.DataType;
 import org.h2.value.Value;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Scan on index.
  */
-public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
+public class IndexScan<Row> extends AbstractIndexScan<Row, H2Row> {
     /** */
     private final GridKernalContext kctx;
 
@@ -68,9 +64,6 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
     private final GridCacheContext<?, ?> cctx;
 
     /** */
-    private final ExecutionContext<Row> ectx;
-
-    /** */
     private final CacheObjectContext coCtx;
 
     /** */
@@ -80,20 +73,8 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
     private final RowFactory<Row> factory;
 
     /** */
-    private final GridIndex<H2Row> idx;
-
-    /** */
     private final AffinityTopologyVersion topVer;
 
-    /** Additional filters. */
-    private final Predicate<Row> filters;
-
-    /** Lower index scan bound. */
-    private final Supplier<Row> lowerBound;
-
-    /** Upper index scan bound. */
-    private final Supplier<Row> upperBound;
-
     /** */
     private final int[] parts;
 
@@ -104,10 +85,7 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
     private volatile List<GridDhtLocalPartition> reserved;
 
     /** */
-    private final Function<Row, Row> rowTransformer;
-
-    /** */
-    private final ImmutableBitSet requiredColunms;
+    private final ImmutableBitSet requiredColumns;
 
     /**
      * @param ectx Execution context.
@@ -126,36 +104,36 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
         Supplier<Row> lowerBound,
         Supplier<Row> upperBound,
         Function<Row, Row> rowTransformer,
-        @Nullable ImmutableBitSet requiredColunms
+        @Nullable ImmutableBitSet requiredColumns
     ) {
-        this.ectx = ectx;
+        super(
+            ectx,
+            desc.rowType(ectx.getTypeFactory(), requiredColumns),
+            idx,
+            filters,
+            lowerBound,
+            upperBound,
+            rowTransformer
+        );
+
         this.desc = desc;
         cctx = desc.cacheContext();
         kctx = cctx.kernalContext();
         coCtx = cctx.cacheObjectContext();
 
-        RelDataType rowType = desc.rowType(this.ectx.getTypeFactory(), requiredColunms);
-
-        factory = this.ectx.rowHandler().factory(this.ectx.getTypeFactory(), rowType);
-        this.idx = idx;
+        factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType);
         topVer = ectx.planningContext().topologyVersion();
-        this.filters = filters;
-        this.lowerBound = lowerBound;
-        this.upperBound = upperBound;
         this.parts = parts;
         mvccSnapshot = ectx.mvccSnapshot();
-        this.rowTransformer = rowTransformer;
-        this.requiredColunms = requiredColunms;
+        this.requiredColumns = requiredColumns;
     }
 
     /** {@inheritDoc} */
     @Override public synchronized Iterator<Row> iterator() {
         reserve();
-        try {
-            H2Row lower = lowerBound == null ? null : new H2PlainRow(values(coCtx, ectx, lowerBound.get()));
-            H2Row upper = upperBound == null ? null : new H2PlainRow(values(coCtx, ectx, upperBound.get()));
 
-            return new IteratorImpl(idx.find(lower, upper, filterClosure()));
+        try {
+            return super.iterator();
         }
         catch (Exception e) {
             release();
@@ -164,6 +142,16 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override protected H2Row row2indexRow(Row bound) {
+        return new H2PlainRow(values(coCtx, ectx, bound));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Row indexRow2Row(H2Row row) throws IgniteCheckedException {
+        return desc.toRow(ectx, (CacheDataRow)row, factory, requiredColumns);
+    }
+
     /** */
     @Override public void close() {
         release();
@@ -213,12 +201,17 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
 
         try {
             for (GridDhtLocalPartition part : toReserve) {
-                if (part == null || !part.reserve())
-                    throw new ClusterTopologyException("Failed to reserve partition for query execution. Retry on stable topology.");
+                if (part == null || !part.reserve()) {
+                    throw new ClusterTopologyException(
+                        "Failed to reserve partition for query execution. Retry on stable topology."
+                    );
+                }
                 else if (part.state() != GridDhtPartitionState.OWNING) {
                     part.release();
 
-                    throw new ClusterTopologyException("Failed to reserve partition for query execution. Retry on stable topology.");
+                    throw new ClusterTopologyException(
+                        "Failed to reserve partition for query execution. Retry on stable topology."
+                    );
                 }
 
                 reserved.add(part);
@@ -245,8 +238,8 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
         reserved = null;
     }
 
-    /** */
-    private H2TreeFilterClosure filterClosure() {
+    /** {@inheritDoc} */
+    @Override protected H2TreeFilterClosure filterClosure() {
         IndexingQueryFilter filter = new IndexingQueryFilterImpl(kctx, topVer, parts);
         IndexingQueryCacheFilter f = filter.forCache(cctx.name());
         H2TreeFilterClosure filterC = null;
@@ -277,66 +270,4 @@ public class IndexScan<Row> implements Iterable<Row>, AutoCloseable {
             throw new IgniteException("Failed to wrap object into H2 Value.", e);
         }
     }
-
-    /** */
-    private class IteratorImpl extends GridIteratorAdapter<Row> {
-        /** */
-        private final GridCursor<H2Row> cursor;
-
-        /** Next element. */
-        private Row next;
-
-        /** */
-        public IteratorImpl(@NotNull GridCursor<H2Row> cursor) {
-            this.cursor = cursor;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNextX() throws IgniteCheckedException {
-            advance();
-
-            return next != null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Row nextX() throws IgniteCheckedException {
-            advance();
-
-            if (next == null)
-                throw new NoSuchElementException();
-
-            Row res = next;
-
-            next = null;
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void removeX() {
-            throw new UnsupportedOperationException("Remove is not supported.");
-        }
-
-        /** */
-        private void advance() throws IgniteCheckedException {
-            assert cursor != null;
-
-            if (next != null)
-                return;
-
-            while (next == null && cursor.next()) {
-                H2Row h2Row = cursor.get();
-
-                Row r = desc.toRow(ectx, (CacheDataRow)h2Row, factory, requiredColunms);
-
-                if (filters != null && !filters.test(r))
-                    continue;
-
-                if (rowTransformer != null)
-                    r = rowTransformer.apply(r);
-
-                next = r;
-            }
-        }
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index e39349f..671e481 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.AggregateNod
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.CorrelatedNestedLoopJoinNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.FilterNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.LimitNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinNode;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ModifyNode;
@@ -57,6 +58,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedN
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
@@ -165,7 +167,6 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
     /** {@inheritDoc} */
     @Override public Node<Row> visit(IgniteTrimExchange rel) {
         assert TraitUtils.distribution(rel).getType() == HASH_DISTRIBUTED;
-        assert TraitUtils.distribution(rel.getInput()).getType() == BROADCAST_DISTRIBUTED;
 
         IgniteDistribution distr = rel.distribution();
         Destination<Row> dest = distr.destination(ctx, affSrvc, ctx.group(rel.sourceId()));
@@ -264,11 +265,11 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
         IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
         IgniteTypeFactory typeFactory = ctx.getTypeFactory();
 
-        ImmutableBitSet requiredColunms = rel.requiredColumns();
+        ImmutableBitSet requiredColumns = rel.requiredColumns();
         List<RexNode> lowerCond = rel.lowerBound();
         List<RexNode> upperCond = rel.upperBound();
 
-        RelDataType rowType = tbl.getRowType(typeFactory, requiredColunms);
+        RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
 
         Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
         Supplier<Row> lower = lowerCond == null ? null : expressionFactory.rowSource(lowerCond);
@@ -279,7 +280,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
 
         CollocationGroup group = ctx.group(rel.sourceId());
 
-        Iterable<Row> rowsIter = idx.scan(ctx, group, filters, lower, upper, prj, requiredColunms);
+        Iterable<Row> rowsIter = idx.scan(ctx, group, filters, lower, upper, prj, requiredColumns);
 
         return new ScanNode<>(ctx, rowType, rowsIter);
     }
@@ -364,6 +365,36 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
     }
 
     /** {@inheritDoc} */
+    @Override public Node<Row> visit(IgniteIndexSpool rel) {
+        RelCollation collation = rel.collation();
+
+        assert rel.indexCondition() != null : rel;
+
+        List<RexNode> lowerBound = rel.indexCondition().lowerBound();
+        List<RexNode> upperBound = rel.indexCondition().upperBound();
+
+        Predicate<Row> filter = expressionFactory.predicate(rel.condition(), rel.getRowType());
+        Supplier<Row> lower = lowerBound == null ? null : expressionFactory.rowSource(lowerBound);
+        Supplier<Row> upper = upperBound == null ? null : expressionFactory.rowSource(upperBound);
+
+        IndexSpoolNode<Row> node = new IndexSpoolNode<>(
+            ctx,
+            rel.getRowType(),
+            collation,
+            expressionFactory.comparator(collation),
+            filter,
+            lower,
+            upper
+        );
+
+        Node<Row> input = visit(rel.getInput());
+
+        node.register(input);
+
+        return node;
+    }
+
+    /** {@inheritDoc} */
     @Override public Node<Row> visit(IgniteTableModify rel) {
         switch (rel.getOperation()) {
             case INSERT:
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeTreeIndex.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeTreeIndex.java
new file mode 100644
index 0000000..cfd5669
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeTreeIndex.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.query.GridIndex;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Runtime sorted index based on on-heap tree.
+ */
+public class RuntimeTreeIndex<Row> implements GridIndex<Row>, AutoCloseable {
+    /** */
+    protected final ExecutionContext<Row> ectx;
+
+    /** */
+    protected final Comparator<Row> comp;
+
+    /** Collation. */
+    private final RelCollation collation;
+
+    /** Rows. */
+    private TreeMap<Row, List<Row>> rows;
+
+    /**
+     *
+     */
+    public RuntimeTreeIndex(
+        ExecutionContext<Row> ectx,
+        RelCollation collation,
+        Comparator<Row> comp
+    ) {
+        this.ectx = ectx;
+        this.comp = comp;
+
+        assert Objects.nonNull(collation);
+
+        this.collation = collation;
+        rows = new TreeMap<>(comp);
+    }
+
+    /**
+     * Add row to index.
+     */
+    public void push(Row r) {
+        List<Row> newEqRows = new ArrayList<>();
+
+        List<Row> eqRows = rows.putIfAbsent(r, newEqRows);
+
+        if (eqRows != null)
+            eqRows.add(r);
+        else
+            newEqRows.add(r);
+    }
+
+    /** */
+    @Override public void close() {
+        rows.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCursor<Row> find(Row lower, Row upper, BPlusTree.TreeRowClosure<Row, Row> filterC) {
+        assert filterC == null;
+
+        int firstCol = F.first(collation.getKeys());
+
+        if (ectx.rowHandler().get(firstCol, lower) != null && ectx.rowHandler().get(firstCol, upper) != null)
+            return new Cursor(rows.subMap(lower, true, upper, true));
+        else if (ectx.rowHandler().get(firstCol, lower) == null && ectx.rowHandler().get(firstCol, upper) != null)
+            return new Cursor(rows.headMap(upper, true));
+        else if (ectx.rowHandler().get(firstCol, lower) != null && ectx.rowHandler().get(firstCol, upper) == null)
+            return new Cursor(rows.tailMap(lower, true));
+        else
+            return new Cursor(rows);
+    }
+
+    /**
+     * Return an iterable to scan index range from lower to upper bounds inclusive,
+     * filtered by {@code filter} predicate.
+     */
+    public Iterable<Row> scan(
+        ExecutionContext<Row> ectx,
+        RelDataType rowType,
+        Predicate<Row> filter,
+        Supplier<Row> lowerBound,
+        Supplier<Row> upperBound
+    ) {
+        return new IndexScan(rowType, this, filter, lowerBound, upperBound);
+    }
+
+    /**
+     *
+     */
+    private class Cursor implements GridCursor<Row> {
+        /** Sub map iterator. */
+        private final Iterator<Map.Entry<Row, List<Row>>> mapIt;
+
+        /** Iterator over rows with equal index keys. */
+        private Iterator<Row> listIt;
+
+        /** */
+        private Row row;
+
+        /** */
+        Cursor(SortedMap<Row, List<Row>> subMap) {
+            mapIt = subMap.entrySet().iterator();
+            listIt = null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() throws IgniteCheckedException {
+            if (!hasNext())
+                return false;
+
+            next0();
+
+            return true;
+        }
+
+        /** */
+        private boolean hasNext() {
+            return listIt != null && listIt.hasNext() || mapIt.hasNext();
+        }
+
+        /** */
+        private void next0() {
+            if (listIt == null || !listIt.hasNext())
+                listIt = mapIt.next().getValue().iterator();
+
+            row = listIt.next();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Row get() throws IgniteCheckedException {
+            return row;
+        }
+    }
+
+    /**
+     *
+     */
+    private class IndexScan extends AbstractIndexScan<Row, Row> {
+        /**
+         * @param rowType Row type.
+         * @param idx Physical index.
+         * @param filter Additional filters.
+         * @param lowerBound Lower index scan bound.
+         * @param upperBound Upper index scan bound.
+         */
+        IndexScan(
+            RelDataType rowType,
+            GridIndex<Row> idx,
+            Predicate<Row> filter,
+            Supplier<Row> lowerBound,
+            Supplier<Row> upperBound) {
+            super(RuntimeTreeIndex.this.ectx, rowType, idx, filter, lowerBound, upperBound, null);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Row row2indexRow(Row bound) {
+            return bound;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Row indexRow2Row(Row row) {
+            return row;
+        }
+
+        /** */
+        @Override protected BPlusTree.TreeRowClosure<Row, Row> filterClosure() {
+            return null;
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
similarity index 53%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
index 4750998..d8745f6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolNode.java
@@ -17,47 +17,74 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Comparator;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RuntimeTreeIndex;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
- * Table spool node.
+ * Index spool node.
  */
-public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row> {
-    /** How many rows are requested by downstream. */
-    private int requested;
-
-    /** How many rows are we waiting for from the upstream. {@code -1} means end of source stream. */
-    private int waiting;
+public class IndexSpoolNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row> {
+    /** Scan. */
+    private final ScanNode<Row> scan;
 
-    /** Index of the current row to push. */
-    private int rowIdx;
+    /** Runtime index */
+    private final RuntimeTreeIndex<Row> idx;
 
-    /** Rows buffer. */
-    private final List<Row> rows;
+    /** */
+    private int requested;
 
-    /**
-     * Flag indicates that spool pushes row to downstream.
-     * Need to check a case when a downstream produces requests on push.
-     */
-    private boolean inLoop;
+    /** */
+    private int waiting;
 
     /**
      * @param ctx Execution context.
      */
-    public TableSpoolNode(ExecutionContext<Row> ctx, RelDataType rowType) {
+    public IndexSpoolNode(
+        ExecutionContext<Row> ctx,
+        RelDataType rowType,
+        RelCollation collation,
+        Comparator<Row> comp,
+        Predicate<Row> filter,
+        Supplier<Row> lowerIdxBound,
+        Supplier<Row> upperIdxBound
+    ) {
         super(ctx, rowType);
 
-        rows = new ArrayList<>();
+        idx = new RuntimeTreeIndex<>(ctx, collation, comp);
+
+        scan = new ScanNode<>(
+            ctx,
+            rowType,
+            idx.scan(
+                ctx,
+                rowType,
+                filter,
+                lowerIdxBound,
+                upperIdxBound
+            )
+        );
+    }
+
+    /** */
+    @Override public void onRegister(Downstream<Row> downstream) {
+        scan.onRegister(downstream);
+    }
+
+    /** */
+    @Override public Downstream<Row> downstream() {
+        return scan.downstream();
     }
 
     /** {@inheritDoc} */
     @Override protected void rewindInternal() {
-        requested = 0;
-        rowIdx = 0;
+        scan.rewind();
     }
 
     /** {@inheritDoc} */
@@ -81,12 +108,13 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
         try {
             checkState();
 
-            requested += rowsCnt;
+            if (!indexReady()) {
+                requested = rowsCnt;
 
-            if (waiting == -1 && !inLoop)
-                context().execute(this::doPush);
-            else if (waiting == 0)
-                source().request(waiting = IN_BUFFER_SIZE);
+                requestSource();
+            }
+            else
+                scan.request(rowsCnt);
         }
         catch (Exception e) {
             onError(e);
@@ -94,49 +122,23 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
     }
 
     /** */
-    private void doPush() {
-        if (rowIdx >= rows.size() && waiting == -1 && requested > 0) {
-            downstream().end();
-
-            return;
-        }
-
-        while (requested > 0 && rowIdx < rows.size())
-            pushToDownstream();
-    }
-
-    /** */
-    private void pushToDownstream() {
-        inLoop = true;
+    private void requestSource() {
+        waiting = IN_BUFFER_SIZE;
 
-        downstream().push(rows.get(rowIdx));
-
-        inLoop = false;
-
-        rowIdx++;
-        requested--;
-
-        if (rowIdx >= rows.size() && waiting == -1 && requested > 0)
-            downstream().end();
+        source().request(IN_BUFFER_SIZE);
     }
 
     /** {@inheritDoc} */
     @Override public void push(Row row) {
-        assert downstream() != null;
-        assert waiting > 0;
-
         try {
             checkState();
 
-            waiting--;
+            idx.push(row);
 
-            rows.add(row);
+            waiting--;
 
             if (waiting == 0)
-                source().request(waiting = IN_BUFFER_SIZE);
-
-            if (requested > 0 && rowIdx < rows.size())
-                pushToDownstream();
+                context().execute(this::requestSource);
         }
         catch (Exception e) {
             onError(e);
@@ -145,19 +147,39 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
 
     /** {@inheritDoc} */
     @Override public void end() {
-        assert downstream() != null;
-        assert waiting > 0;
-
         try {
             checkState();
 
             waiting = -1;
 
-            if (rowIdx >= rows.size() && requested > 0)
-                downstream().end();
+            scan.request(requested);
         }
         catch (Exception e) {
-            downstream().onError(e);
+            scan.downstream().onError(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void closeInternal() {
+        try {
+            scan.close();
+        }
+        catch (Exception ex) {
+            onError(ex);
+        }
+
+        try {
+            idx.close();
         }
+        catch (Exception ex) {
+            onError(ex);
+        }
+
+        super.closeInternal();
+    }
+
+    /** */
+    private boolean indexReady() {
+        return waiting == -1;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
index 4750998..d1ad00a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolNode.java
@@ -83,7 +83,7 @@ public class TableSpoolNode<Row> extends AbstractNode<Row> implements SingleNode
 
             requested += rowsCnt;
 
-            if (waiting == -1 && !inLoop)
+            if ((waiting == -1 || rowIdx < rows.size()) && !inLoop)
                 context().execute(this::doPush);
             else if (waiting == 0)
                 source().request(waiting = IN_BUFFER_SIZE);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
index 5192d92..57cc7b2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdRowCount.java
@@ -30,6 +30,7 @@ import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.calcite.util.Util;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
@@ -104,4 +105,13 @@ public class IgniteMdRowCount extends RelMdRowCount {
 
         return rowsCount;
     }
+
+    /**
+     * RowCount of Spool equals to estimated row count of its child by default,
+     * but IndexSpool has internal filter that could filter out some rows,
+     * hence we need to estimate it differently.
+     */
+    public double getRowCount(IgniteIndexSpool rel, RelMetadataQuery mq) {
+        return rel.estimateRowCount(mq);
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSelectivity.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSelectivity.java
index 1de7f26..624466c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSelectivity.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdSelectivity.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
 import java.util.List;
+
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMdSelectivity;
@@ -30,6 +31,7 @@ import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.ignite.internal.processors.query.calcite.rel.AbstractIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.ProjectableFilterableTableScan;
 import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
 import org.apache.ignite.internal.util.typedef.F;
@@ -91,4 +93,17 @@ public class IgniteMdSelectivity extends RelMdSelectivity {
         RexNode diff = RelMdUtil.minusPreds(RexUtils.builder(rel), predicate, condition);
         return RelMdUtil.guessSelectivity(diff);
     }
+
+    /** */
+    public Double getSelectivity(IgniteIndexSpool rel, RelMetadataQuery mq, RexNode predicate) {
+        if (predicate != null) {
+            return mq.getSelectivity(rel.getInput(),
+                RelMdUtil.minusPreds(
+                    rel.getCluster().getRexBuilder(),
+                    predicate,
+                    rel.condition()));
+        }
+
+        return mq.getSelectivity(rel.getInput(), rel.condition());
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
index bc52fa8..34e1ee7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedN
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
@@ -45,13 +46,14 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 
 /** */
-class Cloner implements IgniteRelVisitor<IgniteRel> {
+public class Cloner implements IgniteRelVisitor<IgniteRel> {
     /** */
     private final RelOptCluster cluster;
 
     /** */
     private ImmutableList.Builder<IgniteReceiver> remotes;
 
+    /** */
     Cloner(RelOptCluster cluster) {
         this.cluster = cluster;
     }
@@ -77,6 +79,13 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** */
+    public static IgniteRel clone(IgniteRel r) {
+        Cloner c = new Cloner(r.getCluster());
+
+        return c.visit(r);
+    }
+
+    /** */
     private IgniteReceiver collect(IgniteReceiver receiver) {
         if (remotes != null)
             remotes.add(receiver);
@@ -158,6 +167,11 @@ class Cloner implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteIndexSpool rel) {
+        return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteLimit rel) {
         return rel.clone(cluster, F.asList(visit((IgniteRel) rel.getInput())));
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index 9db3bf1..4b213e6 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Supplier;
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.calcite.metadata.CollocationMappingException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
@@ -34,6 +35,8 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -50,6 +53,7 @@ public class Fragment {
     private final IgniteRel root;
 
     /** Serialized root representation. */
+    @GridToStringExclude
     private final String rootSer;
 
     /** */
@@ -183,6 +187,6 @@ public class Fragment {
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return rootSer;
+        return S.toString(Fragment.class, this, "root", RelOptUtil.toString(root));
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
index 83fe9e7..bf55da8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java
@@ -46,6 +46,7 @@ public class FragmentSplitter extends IgniteRelShuttle {
     /** */
     private FragmentProto curr;
 
+    /** */
     public FragmentSplitter(RelNode cutPoint) {
         this.cutPoint = cutPoint;
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
index c29e2e2..870962c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedN
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMergeJoin;
@@ -145,6 +146,11 @@ public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteRel visit(IgniteIndexSpool rel) {
+        return processNode(rel);
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteRel rel) {
         return rel.accept(this);
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
index a18899d..768fc24 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerPhase.java
@@ -42,6 +42,7 @@ import org.apache.calcite.tools.RuleSets;
 import org.apache.ignite.internal.processors.query.calcite.rule.AggregateConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.CorrelatedNestedLoopJoinRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.FilterConverterRule;
+import org.apache.ignite.internal.processors.query.calcite.rule.FilterSpoolMergeRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.LogicalScanConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.MergeJoinConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.rule.NestedLoopJoinConverterRule;
@@ -165,7 +166,8 @@ public enum PlannerPhase {
                     FilterConverterRule.INSTANCE,
                     TableModifyConverterRule.INSTANCE,
                     UnionConverterRule.INSTANCE,
-                    SortConverterRule.INSTANCE
+                    SortConverterRule.INSTANCE,
+                    FilterSpoolMergeRule.INSTANCE
                 )
             );
         }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
index f170a4b..735fa85 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
@@ -49,8 +50,11 @@ public class Splitter extends IgniteRelShuttle {
 
         while (!stack.isEmpty()) {
             curr = stack.pop();
+
             curr.root = visit(curr.root);
+
             res.add(curr.build());
+
             curr = null;
         }
 
@@ -81,7 +85,7 @@ public class Splitter extends IgniteRelShuttle {
 
     /** {@inheritDoc} */
     @Override public IgniteRel visit(IgniteTrimExchange rel) {
-        return rel.clone(IdGenerator.nextId());
+        return ((IgniteTrimExchange)processNode(rel)).clone(IdGenerator.nextId());
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
index 5281757..7260e75 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/AbstractIndexScan.java
@@ -28,16 +28,12 @@ import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.mapping.Mappings;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
-import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
-import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -48,16 +44,7 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
     protected final String idxName;
 
     /** */
-    protected List<RexNode> lowerCond;
-
-    /** */
-    protected List<RexNode> upperCond;
-
-    /** */
-    protected List<RexNode> lowerBound;
-
-    /** */
-    protected List<RexNode> upperBound;
+    protected final IndexConditions idxCond;
 
     /**
      * Constructor used for deserialization.
@@ -67,8 +54,7 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
     protected AbstractIndexScan(RelInput input) {
         super(input);
         idxName = input.getString("index");
-        lowerBound = input.get("lower") == null ? null : input.getExpressionList("lower");
-        upperBound = input.get("upper") == null ? null : input.getExpressionList("upper");
+        idxCond = new IndexConditions(input);
     }
 
     /** */
@@ -80,23 +66,21 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
         String idxName,
         @Nullable List<RexNode> proj,
         @Nullable RexNode cond,
-        @Nullable List<RexNode> lowerCond,
-        @Nullable List<RexNode> upperCond,
-        @Nullable ImmutableBitSet reqColunms
+        @Nullable IndexConditions idxCond,
+        @Nullable ImmutableBitSet reqColumns
     ) {
-        super(cluster, traitSet, hints, table, proj, cond, reqColunms);
+        super(cluster, traitSet, hints, table, proj, cond, reqColumns);
+
         this.idxName = idxName;
-        this.lowerCond = lowerCond;
-        this.upperCond = upperCond;
+        this.idxCond = idxCond;
     }
 
     /** {@inheritDoc} */
     @Override protected RelWriter explainTerms0(RelWriter pw) {
         pw = pw.item("index", idxName);
         pw = super.explainTerms0(pw);
-        return pw
-            .itemIf("lower", lowerBound, !F.isEmpty(lowerBound()))
-            .itemIf("upper", upperBound, !F.isEmpty(upperBound()));
+
+        return idxCond.explainTerms(pw);
     }
 
     /**
@@ -110,44 +94,28 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
      * @return Lower index condition.
      */
     public List<RexNode> lowerCondition() {
-        return lowerCond;
+        return idxCond == null ? null : idxCond.lowerCondition();
     }
 
     /**
      * @return Lower index condition.
      */
     public List<RexNode> lowerBound() {
-        if (lowerBound == null && lowerCond != null) {
-            RelDataType rowType = getTable().getRowType();
-            Mappings.TargetMapping mapping = null;
-            if (requiredColumns() != null)
-                mapping = Commons.inverceMapping(requiredColumns(), rowType.getFieldCount());
-            lowerBound = RexUtils.asBound(getCluster(), lowerCond, rowType, mapping);
-        }
-
-        return lowerBound;
+        return idxCond == null ? null : idxCond.lowerBound();
     }
 
     /**
      * @return Upper index condition.
      */
     public List<RexNode> upperCondition() {
-        return upperCond;
+        return idxCond == null ? null : idxCond.upperCondition();
     }
 
     /**
      * @return Upper index condition.
      */
     public List<RexNode> upperBound() {
-        if (upperBound == null && upperCond != null) {
-            RelDataType rowType = getTable().getRowType();
-            Mappings.TargetMapping mapping = null;
-            if (requiredColumns() != null)
-                mapping = Commons.inverceMapping(requiredColumns(), rowType.getFieldCount());
-            upperBound = RexUtils.asBound(getCluster(), upperCond, rowType, mapping);
-        }
-
-        return upperBound;
+        return idxCond == null ? null : idxCond.upperBound();
     }
 
     /** {@inheritDoc} */
@@ -163,16 +131,16 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
 
             cost = 0;
 
-            if (lowerCond != null) {
-                double selectivity0 = mq.getSelectivity(this, RexUtil.composeDisjunction(builder, lowerCond));
+            if (lowerCondition() != null) {
+                double selectivity0 = mq.getSelectivity(this, RexUtil.composeDisjunction(builder, lowerCondition()));
 
                 selectivity -= 1 - selectivity0;
 
                 cost += Math.log(rows);
             }
 
-            if (upperCond != null) {
-                double selectivity0 = mq.getSelectivity(this, RexUtil.composeDisjunction(builder, upperCond));
+            if (upperCondition() != null) {
+                double selectivity0 = mq.getSelectivity(this, RexUtil.composeDisjunction(builder, upperCondition()));
 
                 selectivity -= 1 - selectivity0;
             }
@@ -187,4 +155,9 @@ public abstract class AbstractIndexScan extends ProjectableFilterableTableScan {
 
         return planner.getCostFactory().makeCost(rows, cost, 0);
     }
+
+    /** */
+    public IndexConditions indexConditions() {
+        return idxCond;
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
index f3eef52..e951b34 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteAggregate.java
@@ -113,7 +113,7 @@ public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
                 ImmutableIntList keys = distribution.getKeys();
 
                 if (isSimple(this) && groupSet.cardinality() == keys.size()) {
-                    Mappings.TargetMapping mapping = Commons.inverceMapping(
+                    Mappings.TargetMapping mapping = Commons.inverseMapping(
                         groupSet, getInput().getRowType().getFieldCount());
 
                     List<Integer> srcKeys = new ArrayList<>(keys.size());
@@ -188,7 +188,7 @@ public class IgniteAggregate extends Aggregate implements TraitsAwareIgniteRel {
                     ImmutableIntList keys = distribution.getKeys();
 
                     if (groupSet.cardinality() == keys.size()) {
-                        Mappings.TargetMapping mapping = Commons.inverceMapping(
+                        Mappings.TargetMapping mapping = Commons.inverseMapping(
                             groupSet, getInput().getRowType().getFieldCount());
 
                         IgniteDistribution outDistr = distribution.apply(mapping);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
index e270f12..da6cab5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteCorrelatedNestedLoopJoin.java
@@ -27,6 +27,7 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
@@ -43,6 +44,9 @@ import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrai
 import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
+
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.maxPrefix;
 
 /**
  * Relational expression that combines two relational expressions according to
@@ -92,21 +96,33 @@ public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin {
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
+        RelCollation leftCollation = TraitUtils.collation(left), rightCollation = TraitUtils.collation(right);
+
+        List<Integer> newRightCollationFields = maxPrefix(rightCollation.getKeys(), joinInfo.leftKeys);
+
+        if (F.isEmpty(newRightCollationFields))
+            return ImmutableList.of();
+
         // We preserve left edge collation only if batch size == 1
         if (variablesSet.size() == 1)
-            return super.deriveCollation(nodeTraits, inputTraits);
-
-        RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
+            nodeTraits = nodeTraits.replace(leftCollation);
+        else
+            nodeTraits = nodeTraits.replace(RelCollations.EMPTY);
 
-        return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            ImmutableList.of(left.replace(RelCollations.EMPTY), right.replace(RelCollations.EMPTY))));
+        return ImmutableList.of(Pair.of(nodeTraits, inputTraits));
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
         // Correlated nested loop requires rewindable right edge.
-
         RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
 
         RewindabilityTrait rewindability = TraitUtils.rewindability(left);
@@ -116,15 +132,30 @@ public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin {
     }
 
     /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
-        // We preserve left edge collation only if batch size == 1
-        if (variablesSet.size() == 1)
-            return super.passThroughCollation(nodeTraits, inputTraits);
-
+    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
         RelTraitSet left = inputTraits.get(0), right = inputTraits.get(1);
 
+        // Index lookup (collation) is required for right input.
+        RelCollation rightReplace = RelCollations.of(joinInfo.rightKeys);
+
+        // We preserve left edge collation only if batch size == 1
+        if (variablesSet.size() == 1) {
+            Pair<RelTraitSet, List<RelTraitSet>> baseTraits = super.passThroughCollation(nodeTraits, inputTraits);
+
+            return Pair.of(
+                baseTraits.getKey(),
+                ImmutableList.of(
+                    baseTraits.getValue().get(0),
+                    baseTraits.getValue().get(1).replace(rightReplace)
+                )
+            );
+        }
+
         return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
-            ImmutableList.of(left.replace(RelCollations.EMPTY), right.replace(RelCollations.EMPTY)));
+            ImmutableList.of(left.replace(RelCollations.EMPTY), right.replace(rightReplace)));
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
index d5e24c4..d2dbe0c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
@@ -65,7 +66,7 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
         RelTraitSet traits,
         RelOptTable tbl,
         String idxName) {
-        this(cluster, traits, tbl, idxName, null, null, null, null, null);
+        this(cluster, traits, tbl, idxName, null, null, null, null);
     }
 
     /**
@@ -76,7 +77,7 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
      * @param idxName Index name.
      * @param proj Projects.
      * @param cond Filters.
-     * @param requiredColunms Participating colunms.
+     * @param requiredCols Participating columns.
      */
     public IgniteIndexScan(
         RelOptCluster cluster,
@@ -85,11 +86,10 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
         String idxName,
         @Nullable List<RexNode> proj,
         @Nullable RexNode cond,
-        @Nullable List<RexNode> lowerIdxCond,
-        @Nullable List<RexNode> upperIdxCond,
-        @Nullable ImmutableBitSet requiredColunms
+        @Nullable IndexConditions idxCond,
+        @Nullable ImmutableBitSet requiredCols
     ) {
-        this(-1L, cluster, traits, tbl, idxName, proj, cond, lowerIdxCond, upperIdxCond, requiredColunms);
+        this(-1L, cluster, traits, tbl, idxName, proj, cond, idxCond, requiredCols);
     }
 
     /**
@@ -100,7 +100,7 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
      * @param idxName Index name.
      * @param proj Projects.
      * @param cond Filters.
-     * @param requiredColunms Participating colunms.
+     * @param requiredCols Participating colunms.
      */
     private IgniteIndexScan(
         long sourceId,
@@ -110,11 +110,10 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
         String idxName,
         @Nullable List<RexNode> proj,
         @Nullable RexNode cond,
-        @Nullable List<RexNode> lowerIdxCond,
-        @Nullable List<RexNode> upperIdxCond,
-        @Nullable ImmutableBitSet requiredColunms
+        @Nullable IndexConditions idxCond,
+        @Nullable ImmutableBitSet requiredCols
     ) {
-        super(cluster, traits, ImmutableList.of(), tbl, idxName, proj, cond, lowerIdxCond, upperIdxCond, requiredColunms);
+        super(cluster, traits, ImmutableList.of(), tbl, idxName, proj, cond, idxCond, requiredCols);
 
         this.sourceId = sourceId;
     }
@@ -138,12 +137,12 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
     /** {@inheritDoc} */
     @Override public IgniteRel clone(long sourceId) {
         return new IgniteIndexScan(sourceId, getCluster(), getTraitSet(), getTable(),
-            idxName, projects, condition, lowerCond, upperCond, requiredColumns);
+            idxName, projects, condition, idxCond, requiredColumns);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
         return new IgniteIndexScan(sourceId, cluster, getTraitSet(), getTable(),
-            idxName, projects, condition, lowerCond, upperCond, requiredColumns);
+            idxName, projects, condition, idxCond, requiredColumns);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexSpool.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexSpool.java
new file mode 100644
index 0000000..dc9e062
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexSpool.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Spool;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
+import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
+
+/**
+ * Relational operator that returns the sorted contents of a table
+ * and allow to lookup rows by specified bounds.
+ */
+public class IgniteIndexSpool extends Spool implements IgniteRel {
+    /** */
+    private final RelCollation collation;
+
+    /** Index condition. */
+    private final IndexConditions idxCond;
+
+    /** Filters. */
+    protected final RexNode condition;
+
+    /** */
+    public IgniteIndexSpool(
+        RelOptCluster cluster,
+        RelTraitSet traits,
+        RelNode input,
+        RelCollation collation,
+        RexNode condition,
+        IndexConditions idxCond
+    ) {
+        super(cluster, traits, input, Type.LAZY, Type.EAGER);
+
+        assert Objects.nonNull(idxCond);
+        assert Objects.nonNull(condition);
+
+        this.idxCond = idxCond;
+        this.condition = condition;
+        this.collation = collation;
+    }
+
+    /**
+     * Constructor used for deserialization.
+     *
+     * @param input Serialized representation.
+     */
+    public IgniteIndexSpool(RelInput input) {
+        this(input.getCluster(),
+            input.getTraitSet().replace(IgniteConvention.INSTANCE),
+            input.getInputs().get(0),
+            input.getCollation(),
+            input.getExpression("condition"),
+            new IndexConditions(input)
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** */
+    @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteIndexSpool(cluster, getTraitSet(), inputs.get(0), collation, condition, idxCond);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Spool copy(RelTraitSet traitSet, RelNode input, Type readType, Type writeType) {
+        return new IgniteIndexSpool(getCluster(), traitSet, input, collation, condition, idxCond);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isEnforcer() {
+        return true;
+    }
+
+    /** */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        RelWriter writer = super.explainTerms(pw);
+
+        writer.item("condition", condition);
+        writer.item("collation", collation);
+
+        return idxCond.explainTerms(writer);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        double rowCnt = mq.getRowCount(getInput());
+        double bytesPerRow = getRowType().getFieldCount() * IgniteCost.AVERAGE_FIELD_SIZE;
+        double totalBytes = rowCnt * bytesPerRow;
+        double cpuCost = rowCnt * IgniteCost.ROW_PASS_THROUGH_COST;
+
+        if (idxCond.lowerCondition() != null)
+            cpuCost += Math.log(rowCnt) * IgniteCost.ROW_COMPARISON_COST;
+
+        IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory();
+
+        return costFactory.makeCost(rowCnt, cpuCost, 0, totalBytes, 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public double estimateRowCount(RelMetadataQuery mq) {
+        return mq.getRowCount(getInput()) * mq.getSelectivity(this, null);
+    }
+
+    /** */
+    public IndexConditions indexCondition() {
+        return idxCond;
+    }
+
+    /** */
+    @Override public RelCollation collation() {
+        return collation;
+    }
+
+    /** */
+    public RexNode condition() {
+        return condition;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
index 534719d..bef81a9 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteLimit.java
@@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.SingleRel;
@@ -69,6 +70,18 @@ public class IgniteLimit extends SingleRel implements IgniteRel {
         this.fetch = fetch;
     }
 
+    /** */
+    public IgniteLimit(RelInput input) {
+        super(
+            input.getCluster(),
+            input.getTraitSet().replace(IgniteConvention.INSTANCE),
+            input.getInputs().get(0)
+        );
+
+        offset = input.getExpression("offset");
+        fetch = input.getExpression("fetch");
+    }
+
     /** {@inheritDoc} */
     @Override public final IgniteLimit copy(RelTraitSet traitSet, List<RelNode> inputs) {
         return new IgniteLimit(getCluster(), traitSet, sole(inputs), offset, fetch);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
index 11a7216..a202604 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteMergeJoin.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -32,8 +31,6 @@ import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.CorrelationId;
@@ -47,6 +44,10 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteC
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
+import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.createCollation;
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.isPrefix;
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.maxPrefix;
+
 /** */
 public class IgniteMergeJoin extends AbstractIgniteJoin {
     /** */
@@ -98,7 +99,7 @@ public class IgniteMergeJoin extends AbstractIgniteJoin {
             Map<Integer, Integer> leftToRight = joinInfo.pairs().stream()
                 .collect(Collectors.toMap(p -> p.source, p -> p.target));
 
-            newRightCollation = newLeftCollation.stream().map(leftToRight::get).collect(Collectors.toList());
+            newRightCollation = newLeftCollation.subList(0, leftToRight.size()).stream().map(leftToRight::get).collect(Collectors.toList());
         }
         else if (isPrefix(rightCollation.getKeys(), joinInfo.rightKeys)) { // preserve right collation
             newRightCollation = new ArrayList<>(rightCollation.getKeys());
@@ -106,7 +107,7 @@ public class IgniteMergeJoin extends AbstractIgniteJoin {
             Map<Integer, Integer> rightToLeft = joinInfo.pairs().stream()
                 .collect(Collectors.toMap(p -> p.target, p -> p.source));
 
-            newLeftCollation = newRightCollation.stream().map(rightToLeft::get).collect(Collectors.toList());
+            newLeftCollation = newRightCollation.subList(0, rightToLeft.size()).stream().map(rightToLeft::get).collect(Collectors.toList());
         }
         else { // generate new collations
             // TODO: generate permutations when there will be multitraits
@@ -226,63 +227,4 @@ public class IgniteMergeJoin extends AbstractIgniteJoin {
         return costFactory.makeCost(rows,
             rows * (IgniteCost.ROW_COMPARISON_COST + IgniteCost.ROW_PASS_THROUGH_COST), 0);
     }
-
-    /**
-     * Returns the longest possible prefix of {@code seq} that could be form from provided {@code elems}.
-     *
-     * @param seq Sequence.
-     * @param elems Elems.
-     * @return The longest possible prefix of {@code seq}.
-     */
-    private static <T> List<T> maxPrefix(List<T> seq, Collection<T> elems) {
-        List<T> res = new ArrayList<>();
-
-        Set<T> elems0 = new HashSet<>(elems);
-
-        for (T e : seq) {
-            if (!elems0.remove(e))
-                break;
-
-            res.add(e);
-        }
-
-        return res;
-    }
-
-    /**
-     * Checks if there is a such permutation of all {@code elems} that is prefix of
-     * provided {@code seq}.
-     *
-     * @param seq Sequence.
-     * @param elems Elems.
-     * @return {@code true} if there is a permutation of all {@code elems} that is prefix of {@code seq}.
-     */
-    private static <T> boolean isPrefix(List<T> seq, Collection<T> elems) {
-        Set<T> elems0 = new HashSet<>(elems);
-
-        if (seq.size() < elems0.size())
-            return false;
-
-        for (T e : seq) {
-            if (!elems0.remove(e))
-                return false;
-
-            if (elems0.isEmpty())
-                break;
-        }
-
-        return true;
-    }
-
-    /**
-     * Creates collations from provided keys.
-     *
-     * @param keys The keys to create collation from.
-     * @return New collation.
-     */
-    private static RelCollation createCollation(List<Integer> keys) {
-        return RelCollations.of(
-            keys.stream().map(RelFieldCollation::new).collect(Collectors.toList())
-        );
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index b86596b..2db2cc1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -119,6 +119,11 @@ public interface IgniteRelVisitor<T> {
     /**
      * See {@link IgniteRelVisitor#visit(IgniteRel)}
      */
+    T visit(IgniteIndexSpool rel);
+
+    /**
+     * See {@link IgniteRelVisitor#visit(IgniteRel)}
+     */
     T visit(IgniteLimit rel);
 
     /**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
index be16129..fae13ad 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTrimExchange.java
@@ -30,10 +30,8 @@ import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
 
-import static org.apache.calcite.rel.RelDistribution.Type.BROADCAST_DISTRIBUTED;
 import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
 import static org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils.changeTraits;
 
@@ -55,14 +53,13 @@ public class IgniteTrimExchange extends Exchange implements SourceAwareIgniteRel
         super(cluster, traits, input, distribution);
 
         assert distribution.getType() == HASH_DISTRIBUTED;
-        assert input.getTraitSet().getTrait(DistributionTraitDef.INSTANCE).getType() == BROADCAST_DISTRIBUTED;
 
         this.sourceId = sourceId;
     }
 
     /** */
     public IgniteTrimExchange(RelInput input) {
-        super(changeTraits(input, IgniteConvention.INSTANCE));
+        super(changeTraits(input, IgniteConvention.INSTANCE, input.getDistribution()));
 
         Object srcIdObj = input.get("sourceId");
         if (srcIdObj != null)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
index f484558..0747df2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/ProjectableFilterableTableScan.java
@@ -158,7 +158,7 @@ public abstract class ProjectableFilterableTableScan extends TableScan {
         IgniteTypeFactory typeFactory = Commons.typeFactory(getCluster());
         IgniteTable tbl = getTable().unwrap(IgniteTable.class);
 
-        Mappings.TargetMapping mapping = RexUtils.invercePermutation(projects,
+        Mappings.TargetMapping mapping = RexUtils.inversePermutation(projects,
             tbl.getRowType(typeFactory, requiredColumns), true);
 
         RexShuttle shuttle = new RexShuttle() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
index 08c1609..a174cf8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalIndexScan.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.rel.logical;
 
 import java.util.List;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
@@ -26,15 +27,14 @@ import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableBitSet;
-import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.mapping.Mappings;
 import org.apache.ignite.internal.processors.query.calcite.rel.AbstractIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
 import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
-import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
 /** */
@@ -47,27 +47,37 @@ public class IgniteLogicalIndexScan extends AbstractIndexScan {
         String idxName,
         @Nullable List<RexNode> proj,
         @Nullable RexNode cond,
-        @Nullable ImmutableBitSet requiredColunms
+        @Nullable ImmutableBitSet requiredColumns
     ) {
         IgniteTable tbl = table.unwrap(IgniteTable.class);
         IgniteTypeFactory typeFactory = Commons.typeFactory(cluster);
-        RelDataType rowType = tbl.getRowType(typeFactory, requiredColunms);
+        RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
         RelCollation collation = tbl.getIndex(idxName).collation();
 
-        if (requiredColunms != null) {
-            Mappings.TargetMapping targetMapping = Commons.mapping(requiredColunms,
+        if (requiredColumns != null) {
+            Mappings.TargetMapping targetMapping = Commons.mapping(requiredColumns,
                 tbl.getRowType(typeFactory).getFieldCount());
             collation = collation.apply(targetMapping);
             if (proj != null)
                 collation = TraitUtils.projectCollation(collation, proj, rowType);
         }
 
-        Pair<List<RexNode>, List<RexNode>> pair = RexUtils.buildIndexConditions(cluster, collation, cond, rowType);
-
-        List<RexNode> lowerIdxCond = F.isEmpty(pair.left) ? null : pair.left;
-        List<RexNode> upperIdxCond = F.isEmpty(pair.right) ? null : pair.right;
+        IndexConditions idxCond = RexUtils.buildIndexConditions(
+            cluster,
+            collation,
+            cond,
+            tbl.getRowType(typeFactory),
+            requiredColumns);
 
-        return new IgniteLogicalIndexScan(cluster, traits, table, idxName, proj, cond, lowerIdxCond, upperIdxCond, requiredColunms);
+        return new IgniteLogicalIndexScan(
+            cluster,
+            traits,
+            table,
+            idxName,
+            proj,
+            cond,
+            idxCond,
+            requiredColumns);
     }
 
     /**
@@ -78,9 +88,8 @@ public class IgniteLogicalIndexScan extends AbstractIndexScan {
      * @param idxName Index name.
      * @param proj Projects.
      * @param cond Filters.
-     * @param lowerCond Lower condition.
-     * @param upperCond Upper condition.
-     * @param requiredColunms Participating colunms.
+     * @param idxCond Index conditions.
+     * @param requiredCols Participating columns.
      */
     private IgniteLogicalIndexScan(
         RelOptCluster cluster,
@@ -89,10 +98,9 @@ public class IgniteLogicalIndexScan extends AbstractIndexScan {
         String idxName,
         @Nullable List<RexNode> proj,
         @Nullable RexNode cond,
-        @Nullable List<RexNode> lowerCond,
-        @Nullable List<RexNode> upperCond,
-        @Nullable ImmutableBitSet requiredColunms
+        @Nullable IndexConditions idxCond,
+        @Nullable ImmutableBitSet requiredCols
     ) {
-        super(cluster, traits, ImmutableList.of(), tbl, idxName, proj, cond, lowerCond, upperCond, requiredColunms);
+        super(cluster, traits, ImmutableList.of(), tbl, idxName, proj, cond, idxCond, requiredCols);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
index 3a08413..4992795 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/logical/IgniteLogicalTableScan.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.calcite.rel.logical;
 
 import java.util.List;
+
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
@@ -36,9 +37,9 @@ public class IgniteLogicalTableScan extends ProjectableFilterableTableScan {
         RelOptTable tbl,
         @Nullable List<RexNode> proj,
         @Nullable RexNode cond,
-        @Nullable ImmutableBitSet requiredColunms
+        @Nullable ImmutableBitSet requiredColumns
     ) {
-        return new IgniteLogicalTableScan(cluster, traits, tbl, proj, cond, requiredColunms);
+        return new IgniteLogicalTableScan(cluster, traits, tbl, proj, cond, requiredColumns);
     }
 
     /**
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/CorrelatedNestedLoopJoinRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/CorrelatedNestedLoopJoinRule.java
index d4f2da0..d035404 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/CorrelatedNestedLoopJoinRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/CorrelatedNestedLoopJoinRule.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptRule;
@@ -117,7 +118,10 @@ public class CorrelatedNestedLoopJoinRule extends ConverterRule {
         RelTraitSet filterInTraits = rel.getRight().getTraitSet().replace(RewindabilityTrait.REWINDABLE);
 
         // Push a filter with batchSize disjunctions
-        relBuilder.push(rel.getRight().copy(filterInTraits, rel.getRight().getInputs())).filter(relBuilder.or(conditionList));
+        relBuilder
+            .push(rel.getRight().copy(filterInTraits, rel.getRight().getInputs()))
+            .filter(relBuilder.or(conditionList));
+
         RelNode right = relBuilder.build();
 
         CorrelationTrait corrTrait = CorrelationTrait.correlations(correlationIds);
@@ -135,7 +139,17 @@ public class CorrelatedNestedLoopJoinRule extends ConverterRule {
         RelNode left = convert(rel.getLeft(), leftInTraits);
         right = convert(right, rightInTraits);
 
-        call.transformTo(new IgniteCorrelatedNestedLoopJoin(cluster, outTraits, left, right, rel.getCondition(), correlationIds, joinType));
+        call.transformTo(
+            new IgniteCorrelatedNestedLoopJoin(
+                cluster,
+                outTraits,
+                left,
+                right,
+                rel.getCondition(),
+                correlationIds,
+                joinType
+            )
+        );
     }
 
     /** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterSpoolMergeRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterSpoolMergeRule.java
new file mode 100644
index 0000000..eb989ea
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/FilterSpoolMergeRule.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.rule;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.core.Spool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
+import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Rule that pushes filter into the spool.
+ */
+public class FilterSpoolMergeRule extends RelRule<FilterSpoolMergeRule.Config> {
+    /** Instance. */
+    public static final RelOptRule INSTANCE = Config.DEFAULT.toRule();
+
+    /** */
+    private FilterSpoolMergeRule(Config cfg) {
+        super(cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMatch(RelOptRuleCall call) {
+        final IgniteFilter filter = call.rel(0);
+        final IgniteTableSpool spool = call.rel(1);
+
+        RelOptCluster cluster = spool.getCluster();
+
+        RelTraitSet trait = spool.getTraitSet();
+        CorrelationTrait filterCorr = TraitUtils.correlation(filter);
+
+        if (filterCorr.correlated())
+            trait = trait.replace(filterCorr);
+
+        RelNode input = spool.getInput();
+
+        IndexConditions idxCond = RexUtils.buildIndexConditions(
+            cluster,
+            TraitUtils.collation(input),
+            filter.getCondition(),
+            spool.getRowType(),
+            null
+        );
+
+        if (F.isEmpty(idxCond.lowerCondition()) && F.isEmpty(idxCond.upperCondition()))
+            return;
+
+        RelCollation collation = TraitUtils.collation(input);
+        
+        RelNode res = new IgniteIndexSpool(
+            cluster,
+            trait.replace(collation),
+            convert(input, input.getTraitSet().replace(collation)),
+            collation,
+            filter.getCondition(),
+            idxCond
+        );
+
+        call.transformTo(res);
+    }
+
+    /** */
+    @SuppressWarnings("ClassNameSameAsAncestorName")
+    public interface Config extends RelRule.Config {
+        /** */
+        Config DEFAULT = RelRule.Config.EMPTY
+            .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
+            .withDescription("FilterSpoolMergeRule")
+            .as(FilterSpoolMergeRule.Config.class)
+            .withOperandFor(IgniteFilter.class, IgniteTableSpool.class);
+
+        /** Defines an operand tree for the given classes. */
+        default Config withOperandFor(Class<? extends Filter> filterClass, Class<? extends Spool> spoolClass) {
+            return withOperandSupplier(
+                o0 -> o0.operand(filterClass)
+                    .oneInput(o1 -> o1.operand(spoolClass)
+                        .anyInputs()
+                    )
+            )
+                .as(Config.class);
+        }
+
+        /** {@inheritDoc} */
+        @Override default FilterSpoolMergeRule toRule() {
+            return new FilterSpoolMergeRule(this);
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
index 5d6f546..7c3226d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
@@ -36,20 +36,35 @@ import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
 public abstract class LogicalScanConverterRule<T extends ProjectableFilterableTableScan> extends AbstractIgniteConverterRule<T> {
     /** Instance. */
     public static final LogicalScanConverterRule<IgniteLogicalIndexScan> INDEX_SCAN =
-        new LogicalScanConverterRule<IgniteLogicalIndexScan>(IgniteLogicalIndexScan.class, "LogicalTableScanConverterRule") {
+        new LogicalScanConverterRule<IgniteLogicalIndexScan>(IgniteLogicalIndexScan.class, "LogicalIndexScanConverterRule") {
             /** {@inheritDoc} */
-            @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, IgniteLogicalIndexScan rel) {
-                return new IgniteIndexScan(rel.getCluster(), rel.getTraitSet().replace(IgniteConvention.INSTANCE),
-                    rel.getTable(), rel.indexName(), rel.projects(), rel.condition(), rel.lowerCondition(),
-                    rel.upperCondition(), rel.requiredColumns());
+            @Override protected PhysicalNode convert(
+                RelOptPlanner planner,
+                RelMetadataQuery mq,
+                IgniteLogicalIndexScan rel
+            ) {
+                return new IgniteIndexScan(
+                    rel.getCluster(),
+                    rel.getTraitSet().replace(IgniteConvention.INSTANCE),
+                    rel.getTable(),
+                    rel.indexName(),
+                    rel.projects(),
+                    rel.condition(),
+                    rel.indexConditions(),
+                    rel.requiredColumns()
+                );
             }
         };
 
     /** Instance. */
     public static final LogicalScanConverterRule<IgniteLogicalTableScan> TABLE_SCAN =
-        new LogicalScanConverterRule<IgniteLogicalTableScan>(IgniteLogicalTableScan.class, "LogicalIndexScanConverterRule") {
+        new LogicalScanConverterRule<IgniteLogicalTableScan>(IgniteLogicalTableScan.class, "LogicalTableScanConverterRule") {
             /** {@inheritDoc} */
-            @Override protected PhysicalNode convert(RelOptPlanner planner, RelMetadataQuery mq, IgniteLogicalTableScan rel) {
+            @Override protected PhysicalNode convert(
+                RelOptPlanner planner,
+                RelMetadataQuery mq,
+                IgniteLogicalTableScan rel
+            ) {
                 RelTraitSet traits = rel.getTraitSet().replace(IgniteConvention.INSTANCE);
 
                 Set<CorrelationId> corrIds = RexUtils.extractCorrelationIds(rel.condition());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java
index f92de43..95bb653 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java
@@ -50,10 +50,22 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
             "ProjectIndexScanMergeRule"
         ) {
             /** {@inheritDoc} */
-            @Override protected IgniteLogicalIndexScan createNode(RelOptCluster cluster, IgniteLogicalIndexScan scan,
-                RelTraitSet traits, List<RexNode> projections, RexNode cond, ImmutableBitSet requiredColunms) {
-                return IgniteLogicalIndexScan.create(cluster, traits, scan.getTable(), scan.indexName(), projections,
-                    cond, requiredColunms);
+            @Override protected IgniteLogicalIndexScan createNode(
+                RelOptCluster cluster,
+                IgniteLogicalIndexScan scan,
+                RelTraitSet traits,
+                List<RexNode> projections,
+                RexNode cond,
+                ImmutableBitSet requiredColumns
+            ) {
+                return IgniteLogicalIndexScan.create(
+                    cluster,
+                    traits,
+                    scan.getTable(),
+                    scan.indexName(),
+                    projections,
+                    cond, requiredColumns
+                );
             }
         };
 
@@ -65,15 +77,34 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
             "ProjectTableScanMergeRule"
         ) {
             /** {@inheritDoc} */
-            @Override protected IgniteLogicalTableScan createNode(RelOptCluster cluster, IgniteLogicalTableScan scan,
-                RelTraitSet traits, List<RexNode> projections, RexNode cond, ImmutableBitSet requiredColunms) {
-                return IgniteLogicalTableScan.create(cluster, traits, scan.getTable(), projections, cond, requiredColunms);
+            @Override protected IgniteLogicalTableScan createNode(
+                RelOptCluster cluster,
+                IgniteLogicalTableScan scan,
+                RelTraitSet traits,
+                List<RexNode> projections,
+                RexNode cond,
+                ImmutableBitSet requiredColumns
+            ) {
+                return IgniteLogicalTableScan.create(
+                    cluster,
+                    traits,
+                    scan.getTable(),
+                    projections,
+                    cond,
+                    requiredColumns
+                );
             }
         };
 
     /** */
-    protected abstract T createNode(RelOptCluster cluster, T scan, RelTraitSet traits, List<RexNode> projections,
-                                    RexNode cond, ImmutableBitSet requiredColunms);
+    protected abstract T createNode(
+        RelOptCluster cluster,
+        T scan,
+        RelTraitSet traits,
+        List<RexNode> projections,
+        RexNode cond,
+        ImmutableBitSet requiredColumns
+    );
 
     /**
      * Constructor.
@@ -134,9 +165,9 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
             }
         }.apply(cond);
 
-        ImmutableBitSet requiredColunms = builder.build();
+        ImmutableBitSet requiredColumns = builder.build();
 
-        Mappings.TargetMapping targetMapping = Commons.mapping(requiredColunms,
+        Mappings.TargetMapping targetMapping = Commons.mapping(requiredColumns,
             tbl.getRowType(typeFactory).getFieldCount());
 
         projects = new RexShuttle() {
@@ -145,7 +176,7 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
             }
         }.apply(projects);
 
-        if (RexUtils.isIdentity(projects, tbl.getRowType(typeFactory, requiredColunms), true))
+        if (RexUtils.isIdentity(projects, tbl.getRowType(typeFactory, requiredColumns), true))
             projects = null;
 
         cond = new RexShuttle() {
@@ -154,6 +185,6 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
             }
         }.apply(cond);
 
-        call.transformTo(createNode(cluster, scan, traits, projects, cond, requiredColunms));
+        call.transformTo(createNode(cluster, scan, traits, projects, cond, requiredColumns));
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
index 8b6ad1a..f99a9be 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteIndex.java
@@ -82,11 +82,11 @@ public class IgniteIndex {
         Supplier<Row> lowerIdxConditions,
         Supplier<Row> upperIdxConditions,
         Function<Row, Row> rowTransformer,
-        @Nullable ImmutableBitSet requiredColunms) {
+        @Nullable ImmutableBitSet requiredColumns) {
         UUID localNodeId = execCtx.planningContext().localNodeId();
         if (group.nodeIds().contains(localNodeId))
             return new IndexScan<>(execCtx, table().descriptor(), idx, group.partitions(localNodeId),
-                filters, lowerIdxConditions, upperIdxConditions, rowTransformer, requiredColunms);
+                filters, lowerIdxConditions, upperIdxConditions, rowTransformer, requiredColumns);
 
         return Collections.emptyList();
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
index 61073d4..3e06946 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/TableDescriptorImpl.java
@@ -207,7 +207,7 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
         ExecutionContext<Row> ectx,
         CacheDataRow row,
         RowHandler.RowFactory<Row> factory,
-        @Nullable ImmutableBitSet requiredColunms
+        @Nullable ImmutableBitSet requiredColumns
     ) throws IgniteCheckedException {
         RowHandler<Row> handler = factory.handler();
 
@@ -215,9 +215,9 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
 
         Row res = factory.create();
 
-        assert handler.columnCount(res) == (requiredColunms == null ? descriptors.length : requiredColunms.cardinality());
+        assert handler.columnCount(res) == (requiredColumns == null ? descriptors.length : requiredColumns.cardinality());
 
-        if (requiredColunms == null) {
+        if (requiredColumns == null) {
             for (int i = 0; i < descriptors.length; i++) {
                 ColumnDescriptor desc = descriptors[i];
 
@@ -226,7 +226,7 @@ public class TableDescriptorImpl extends NullInitializerExpressionFactory
             }
         }
         else {
-            for (int i = 0, j = requiredColunms.nextSetBit(0); j != -1; j = requiredColunms.nextSetBit(j + 1), i++) {
+            for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = requiredColumns.nextSetBit(j + 1), i++) {
                 ColumnDescriptor desc = descriptors[j];
 
                 handler.set(i, res, TypeUtils.toInternal(ectx,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
index 9ea6bb1..3856c83 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/TraitUtils.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -35,6 +36,7 @@ import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
@@ -436,6 +438,18 @@ public class TraitUtils {
         return processed;
     }
 
+    /**
+     * Creates collations from provided keys.
+     *
+     * @param keys The keys to create collation from.
+     * @return New collation.
+     */
+    public static RelCollation createCollation(List<Integer> keys) {
+        return RelCollations.of(
+            keys.stream().map(RelFieldCollation::new).collect(Collectors.toList())
+        );
+    }
+
     /** */
     private static class PropagationContext {
         /** */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 867882f..7dc7a32 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.calcite.util;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,6 +31,7 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+
 import org.apache.calcite.config.CalciteSystemProperty;
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.Context;
@@ -325,10 +327,57 @@ public final class Commons {
     }
 
     /** */
-    public static Mappings.TargetMapping inverceMapping(ImmutableBitSet bitSet, int sourceSize) {
+    public static Mappings.TargetMapping inverseMapping(ImmutableBitSet bitSet, int sourceSize) {
         Mapping mapping = Mappings.create(MappingType.INVERSE_FUNCTION, sourceSize, bitSet.cardinality());
         for (Ord<Integer> ord : Ord.zip(bitSet))
             mapping.set(ord.e, ord.i);
         return mapping;
     }
+
+    /**
+     * Checks if there is a such permutation of all {@code elems} that is prefix of
+     * provided {@code seq}.
+     *
+     * @param seq Sequence.
+     * @param elems Elems.
+     * @return {@code true} if there is a permutation of all {@code elems} that is prefix of {@code seq}.
+     */
+    public static <T> boolean isPrefix(List<T> seq, Collection<T> elems) {
+        Set<T> elems0 = new HashSet<>(elems);
+
+        if (seq.size() < elems0.size())
+            return false;
+
+        for (T e : seq) {
+            if (!elems0.remove(e))
+                return false;
+
+            if (elems0.isEmpty())
+                break;
+        }
+
+        return true;
+    }
+
+    /**
+     * Returns the longest possible prefix of {@code seq} that could be form from provided {@code elems}.
+     *
+     * @param seq Sequence.
+     * @param elems Elems.
+     * @return The longest possible prefix of {@code seq}.
+     */
+    public static <T> List<T> maxPrefix(List<T> seq, Collection<T> elems) {
+        List<T> res = new ArrayList<>();
+
+        Set<T> elems0 = new HashSet<>(elems);
+
+        for (T e : seq) {
+            if (!elems0.remove(e))
+                break;
+
+            res.add(e);
+        }
+
+        return res;
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IndexConditions.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IndexConditions.java
new file mode 100644
index 0000000..230652b
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IndexConditions.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Index conditions and bounds holder.
+ * Conditions are not printed to terms (serialized). They are used only to calculate selectivity.
+ */
+public class IndexConditions {
+    /** */
+    private final List<RexNode> lowerCond;
+
+    /** */
+    private final List<RexNode> upperCond;
+
+    /** */
+    private final List<RexNode> lowerBound;
+
+    /** */
+    private final List<RexNode> upperBound;
+
+    /** */
+    public IndexConditions() {
+        this(null, null, null, null);
+    }
+
+    /**
+     */
+    public IndexConditions(
+        @Nullable List<RexNode> lowerCond,
+        @Nullable List<RexNode> upperCond,
+        @Nullable List<RexNode> lowerBound,
+        @Nullable List<RexNode> upperBound
+    ) {
+        this.lowerCond = lowerCond;
+        this.upperCond = upperCond;
+        this.lowerBound = lowerBound;
+        this.upperBound = upperBound;
+    }
+
+    /** */
+    public IndexConditions(RelInput input) {
+        lowerCond = null;
+        upperCond = null;
+        lowerBound = input.get("lower") == null ? null : input.getExpressionList("lower");
+        upperBound = input.get("upper") == null ? null : input.getExpressionList("upper");
+    }
+
+    /**
+     * @return Lower index condition.
+     */
+    public List<RexNode> lowerCondition() {
+        return lowerCond;
+    }
+
+    /**
+     * @return Upper index condition.
+     */
+    public List<RexNode> upperCondition() {
+        return upperCond;
+    }
+
+    /**
+     * @return Lower index bounds (a row with values at the index columns).
+     */
+    public List<RexNode> lowerBound() {
+        return lowerBound;
+    }
+
+    /**
+     * @return Upper index bounds (a row with values at the index columns).
+     */
+    public List<RexNode> upperBound() {
+        return upperBound;
+    }
+
+    /** */
+    public ImmutableIntList keys() {
+        if (upperBound == null && lowerBound == null)
+            return ImmutableIntList.of();
+
+        List<Integer> keys = new ArrayList<>();
+
+        int cols = lowerBound != null ? lowerBound.size() : upperBound.size();
+
+        for (int i = 0; i < cols; ++i ) {
+            if (upperBound != null && RexUtils.isNotNull(upperBound.get(i))
+                || lowerBound != null && RexUtils.isNotNull(lowerBound.get(i)))
+                keys.add(i);
+        }
+
+        return ImmutableIntList.copyOf(keys);
+    }
+
+    /**
+     * Describes index bounds.
+     *
+     * @param pw Plan writer
+     * @return Plan writer for fluent-explain pattern
+     */
+    public RelWriter explainTerms(RelWriter pw) {
+        return pw
+            .itemIf("lower", lowerBound, !F.isEmpty(lowerBound))
+            .itemIf("upper", upperBound, !F.isEmpty(upperBound));
+    }
+
+    /** */
+    @Override public String toString() {
+        return S.toString(IndexConditions.class, this,
+            "lower", lowerCond,
+            "upper", upperCond,
+            "lowerBound", lowerBound,
+            "upperBound", upperBound);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java
index f67b44e..da3a94e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RexUtils.java
@@ -26,7 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import com.google.common.collect.ImmutableList;
+
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptPredicateList;
@@ -58,8 +58,8 @@ import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.ControlFlowException;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Litmus;
-import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 import org.apache.calcite.util.mapping.MappingType;
 import org.apache.calcite.util.mapping.Mappings;
@@ -159,16 +159,22 @@ public class RexUtils {
     /**
      * Builds index conditions.
      */
-    public static Pair<List<RexNode>, List<RexNode>> buildIndexConditions(RelOptCluster cluster, RelCollation collation, RexNode condition, RelDataType rowType) {
+    public static IndexConditions buildIndexConditions(
+        RelOptCluster cluster,
+        RelCollation collation,
+        RexNode condition,
+        RelDataType rowType,
+        ImmutableBitSet requiredColumns
+    ) {
         if (condition == null || collation == null || collation.getFieldCollations().isEmpty())
-            return Pair.of(ImmutableList.of(), ImmutableList.of());
+            return new IndexConditions();
 
         condition = RexUtil.toCnf(builder(cluster), condition);
 
         Map<Integer, List<RexCall>> fieldsToPredicates = mapPredicatesToFields(condition, cluster);
 
         if (F.isEmpty(fieldsToPredicates))
-            return Pair.of(ImmutableList.of(), ImmutableList.of());
+            return new IndexConditions();
 
         List<RexNode> lower = new ArrayList<>();
         List<RexNode> upper = new ArrayList<>();
@@ -248,7 +254,25 @@ public class RexUtils {
             }
         }
 
-        return Pair.of(lower, upper);
+        Mappings.TargetMapping mapping = null;
+
+        if (requiredColumns != null)
+            mapping = Commons.inverseMapping(requiredColumns, rowType.getFieldCount());
+
+        List<RexNode> lowerBound = null;
+        List<RexNode> upperBound = null;
+
+        if (!F.isEmpty(lower))
+            lowerBound = asBound(cluster, lower, rowType, mapping);
+        else
+            lower = null;
+
+        if (!F.isEmpty(upper))
+            upperBound = asBound(cluster, upper, rowType, mapping);
+        else
+            upper = null;
+
+        return new IndexConditions(lower, upper, lowerBound, upperBound);
     }
 
     /** */
@@ -262,7 +286,7 @@ public class RexUtils {
                 continue;
 
             RexCall predCall = (RexCall)rexNode;
-            RexLocalRef ref = (RexLocalRef)extractRef(predCall);
+            RexSlot ref = (RexSlot)extractRef(predCall);
 
             if (ref == null)
                 continue;
@@ -288,9 +312,9 @@ public class RexUtils {
         leftOp = RexUtil.removeCast(leftOp);
         rightOp = RexUtil.removeCast(rightOp);
 
-        if (leftOp instanceof RexLocalRef && idxOpSupports(rightOp))
+        if ((leftOp instanceof RexLocalRef || leftOp instanceof RexInputRef) && idxOpSupports(rightOp))
             return leftOp;
-        else if (rightOp instanceof RexLocalRef && idxOpSupports(leftOp))
+        else if ((rightOp instanceof RexLocalRef || rightOp instanceof RexInputRef) && idxOpSupports(leftOp))
             return rightOp;
 
         return null;
@@ -302,7 +326,7 @@ public class RexUtils {
 
         rightOp = RexUtil.removeCast(rightOp);
 
-        return rightOp.isA(SqlKind.LOCAL_REF);
+        return rightOp.isA(SqlKind.LOCAL_REF) || rightOp.isA(SqlKind.INPUT_REF);
     }
 
     /** */
@@ -325,6 +349,14 @@ public class RexUtils {
     }
 
     /** */
+    public static boolean isNotNull(RexNode op) {
+        if (op == null)
+            return false;
+
+        return !(op instanceof RexLiteral) || !((RexLiteral)op).isNull();
+    }
+
+    /** */
     public static List<RexNode> asBound(RelOptCluster cluster, Iterable<RexNode> idxCond, RelDataType rowType, @Nullable Mappings.TargetMapping mapping) {
         if (F.isEmpty(idxCond))
             return null;
@@ -337,7 +369,7 @@ public class RexUtils {
             assert pred instanceof RexCall;
 
             RexCall call = (RexCall)pred;
-            RexLocalRef ref = (RexLocalRef)RexUtil.removeCast(call.operands.get(0));
+            RexSlot ref = (RexSlot)RexUtil.removeCast(call.operands.get(0));
             RexNode cond = RexUtil.removeCast(call.operands.get(1));
 
             assert idxOpSupports(cond) : cond;
@@ -367,7 +399,7 @@ public class RexUtils {
     }
 
     /** */
-    public static Mappings.TargetMapping invercePermutation(List<RexNode> nodes, RelDataType inputRowType, boolean local) {
+    public static Mappings.TargetMapping inversePermutation(List<RexNode> nodes, RelDataType inputRowType, boolean local) {
         final Mappings.TargetMapping mapping =
             Mappings.create(MappingType.INVERSE_FUNCTION, nodes.size(), inputRowType.getFieldCount());
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/TableSpoolTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexSpoolIntegrationTest.java
similarity index 91%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/TableSpoolTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexSpoolIntegrationTest.java
index ec6b926..a040acc 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/TableSpoolTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/IndexSpoolIntegrationTest.java
@@ -19,10 +19,13 @@ package org.apache.ignite.internal.processors.query.calcite;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -41,10 +44,10 @@ import static org.junit.Assert.assertThat;
 
 
 /**
- * IndexSpool test.
+ * Index spool test.
  */
 @RunWith(Parameterized.class)
-public class TableSpoolTest extends GridCommonAbstractTest {
+public class IndexSpoolIntegrationTest extends GridCommonAbstractTest {
     /** Rows. */
     private static final int[] ROWS = {1, 10, 512, 513, 2000};
 
@@ -95,7 +98,8 @@ public class TableSpoolTest extends GridCommonAbstractTest {
             .setKeyFieldName("ID")
             .addQueryField("ID", Integer.class.getName(), null)
             .addQueryField("JID", Integer.class.getName(), null)
-            .addQueryField("VAL", String.class.getName(), null);
+            .addQueryField("VAL", String.class.getName(), null)
+            .setIndexes(Collections.singletonList(new QueryIndex("JID")));
 
         QueryEntity part1 = new QueryEntity()
             .setTableName("TEST1")
@@ -104,7 +108,8 @@ public class TableSpoolTest extends GridCommonAbstractTest {
             .setKeyFieldName("ID")
             .addQueryField("ID", Integer.class.getName(), null)
             .addQueryField("JID", Integer.class.getName(), null)
-            .addQueryField("VAL", String.class.getName(), null);
+            .addQueryField("VAL", String.class.getName(), null)
+            .setIndexes(Collections.singletonList(new QueryIndex("JID")));
 
         return super.getConfiguration(igniteInstanceName)
             .setCacheConfiguration(
@@ -131,7 +136,7 @@ public class TableSpoolTest extends GridCommonAbstractTest {
         List<FieldsQueryCursor<List<?>>> cursors = engine.query(
             null,
             "PUBLIC",
-            "SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter') */" +
+            "SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'MergeJoinConverter') */" +
                 "T0.val, T1.val FROM TEST0 as T0 " +
                 "JOIN TEST1 as T1 on T0.jid = T1.jid ",
             X.EMPTY_OBJECT_ARRAY
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeTreeIndexTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeTreeIndexTest.java
new file mode 100644
index 0000000..31a1d6a
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeTreeIndexTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class RuntimeTreeIndexTest extends GridCommonAbstractTest {
+    /** */
+    private static final int UNIQUE_GROUPS = 10_000;
+
+    /** */
+    private static final int[] NOT_UNIQUE_ROWS_IN_GROUP = new int[] {1, 10};
+
+    /** */
+    private static final Pair<Class<?>[], ImmutableIntList>[] ROW_TYPES = new Pair[] {
+        new Pair(new Class<?>[] {int.class, int.class, int.class}, ImmutableIntList.of(1)),
+        new Pair(new Class<?>[] {int.class, long.class, int.class}, ImmutableIntList.of(1)),
+        new Pair(new Class<?>[] {int.class, String.class, int.class}, ImmutableIntList.of(1)),
+        new Pair(new Class<?>[] {int.class, Date.class, int.class}, ImmutableIntList.of(1)),
+        new Pair(new Class<?>[] {int.class, Time.class, int.class}, ImmutableIntList.of(1)),
+        new Pair(new Class<?>[] {int.class, Timestamp.class, int.class}, ImmutableIntList.of(1)),
+        new Pair(new Class<?>[] {int.class, String.class, Time.class, Date.class, Timestamp.class, int.class},
+            ImmutableIntList.of(1, 2, 3, 4))
+    };
+
+    /** Search count. */
+    private static final int SEARCH_CNT = UNIQUE_GROUPS / 100;
+
+    /** */
+    @Test
+    public void test() throws Exception {
+        IgniteTypeFactory tf = new IgniteTypeFactory();
+
+        List<Pair<RelDataType, ImmutableIntList>> testIndexes = Arrays.stream(ROW_TYPES)
+            .map(rt -> Pair.of(TypeUtils.createRowType(tf, rt.getKey()), rt.getValue()))
+            .collect(Collectors.toList());
+
+        for (Pair<RelDataType, ImmutableIntList> testIdx : testIndexes) {
+            for (int notUnique : NOT_UNIQUE_ROWS_IN_GROUP) {
+                RuntimeTreeIndex<Object[]> idx0 = generate(testIdx.getKey(), testIdx.getValue(), notUnique);
+
+                int rowIdLow = ThreadLocalRandom.current().nextInt(UNIQUE_GROUPS * notUnique);
+                int rowIdUp = rowIdLow + ThreadLocalRandom.current().nextInt(UNIQUE_GROUPS * notUnique - rowIdLow);
+
+                for (int searchNum = 0; searchNum < SEARCH_CNT; ++searchNum) {
+                    Object[] lower = generateFindRow(rowIdLow, testIdx.getKey(), notUnique, testIdx.getValue());
+                    Object[] upper = generateFindRow(rowIdUp, testIdx.getKey(), notUnique, testIdx.getValue());
+
+                    GridCursor<Object[]> cur = idx0.find(lower, upper, null);
+
+                    int rows = 0;
+                    while (cur.next()) {
+                        cur.get();
+
+                        rows++;
+                    }
+
+                    assertEquals("Invalid results [rowType=" + testIdx.getKey() + ", notUnique=" + notUnique +
+                            ", rowIdLow=" + rowIdLow + ", rowIdUp=" + rowIdUp,
+                        (rowIdUp / notUnique - rowIdLow / notUnique + 1) * notUnique,
+                        rows);
+                }
+            }
+        }
+    }
+
+    /** */
+    private RuntimeTreeIndex<Object[]> generate(RelDataType rowType, final List<Integer> idxCols, int notUnique) {
+        RuntimeTreeIndex<Object[]> idx = new RuntimeTreeIndex<>(
+            new ExecutionContext<>(
+                null,
+                PlanningContext.builder()
+                    .logger(log())
+                    .build(),
+                null,
+                null,
+                ArrayRowHandler.INSTANCE,
+                null),
+            RelCollations.of(ImmutableIntList.copyOf(idxCols)),
+            (o1, o2) -> {
+                for (int colIdx : idxCols) {
+                    int res = ((Comparable)o1[colIdx]).compareTo(o2[colIdx]);
+
+                    if (res != 0)
+                        return res;
+                }
+
+                return 0;
+            });
+
+        BitSet rowIds = new BitSet(UNIQUE_GROUPS);
+
+        // First random fill
+        for (int i = 0; i < UNIQUE_GROUPS * notUnique; ++i) {
+            int rowId = ThreadLocalRandom.current().nextInt(UNIQUE_GROUPS);
+
+            if (!rowIds.get(rowId)) {
+                idx.push(generateRow(rowId, rowType, notUnique));
+                rowIds.set(rowId);
+            }
+        }
+
+        for (int i = 0; i < UNIQUE_GROUPS * notUnique; ++i) {
+            if (!rowIds.get(i))
+                idx.push(generateRow(i, rowType, notUnique));
+        }
+
+        return idx;
+    }
+
+    /** */
+    private Object[] generateRow(int rowId, RelDataType rowType, int notUnique) {
+        Object[] row = new Object[rowType.getFieldCount()];
+
+        for (int i = 0; i < rowType.getFieldCount(); ++i)
+            row[i] = generateValue(rowId, rowType.getFieldList().get(i), notUnique);
+
+        return row;
+    }
+
+    /** */
+    private Object[] generateFindRow(int rowId, RelDataType rowType, int notUnique, final List<Integer> idxCols) {
+        Object[] row = generateRow(rowId, rowType, notUnique);
+
+        for (int i = 0; i < rowType.getFieldCount(); ++i) {
+            if (!idxCols.contains(i))
+                row[i] = null;
+        }
+
+        return row;
+    }
+
+    /** */
+    private Object generateValue(int rowId, RelDataTypeField field, int notUnique) {
+        long mod = rowId / notUnique;
+        long baseDate = 1_000_000L;
+
+        switch (field.getType().getSqlTypeName().getFamily()) {
+            case NUMERIC:
+                return mod;
+
+            case CHARACTER:
+                return "val " + String.format("%07d", mod);
+
+            case DATE:
+                return new Date(baseDate + mod * 10_000);
+
+            case TIME:
+                return new Time(mod);
+
+            case TIMESTAMP:
+                return new Timestamp(baseDate + mod);
+
+            default:
+                assert false : "Not supported type for test: " + field.getType();
+                return null;
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index 9523d6a..02bc083 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -28,11 +29,14 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.LockSupport;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
@@ -52,6 +56,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.runner.RunWith;
@@ -317,4 +322,99 @@ public class AbstractExecutionTest extends GridCommonAbstractTest {
             // No-op;
         }
     }
+
+    /**
+     *
+     */
+    public static class TestTable implements Iterable<Object[]> {
+        /** */
+        private int rowsCnt;
+
+        /** */
+        private RelDataType rowType;
+
+        /** */
+        private Function<Integer, Object>[] fieldCreators;
+
+        /** */
+        TestTable(int rowsCnt, RelDataType rowType) {
+            this.rowsCnt = rowsCnt;
+            this.rowType = rowType;
+
+            fieldCreators = rowType.getFieldList().stream()
+                .map((Function<RelDataTypeField, Function<Integer, Object>>) (t) -> {
+                    switch (t.getType().getSqlTypeName().getFamily()) {
+                        case NUMERIC:
+                            return TestTable::intField;
+
+                        case CHARACTER:
+                            return TestTable::stringField;
+
+                        default:
+                            assert false : "Not supported type for test: " + t;
+                            return null;
+                    }
+                })
+                .collect(Collectors.toList()).toArray(new Function[rowType.getFieldCount()]);
+        }
+
+        /** */
+        private static Object stringField(Integer rowNum) {
+            return "val_" + rowNum;
+        }
+
+        /** */
+        private static Object intField(Integer rowNum) {
+            return rowNum;
+        }
+
+        /** */
+        private Object[] createRow(int rowNum) {
+            Object[] row = new Object[rowType.getFieldCount()];
+
+            for (int i = 0; i < fieldCreators.length; ++i)
+                row[i] = fieldCreators[i].apply(rowNum);
+
+            return row;
+        }
+
+        /** {@inheritDoc} */
+        @NotNull @Override public Iterator<Object[]> iterator() {
+            return new Iterator<Object[]>() {
+                private int curRow;
+
+                @Override public boolean hasNext() {
+                    return curRow < rowsCnt;
+                }
+
+                @Override public Object[] next() {
+                    return createRow(curRow++);
+                }
+            };
+        }
+    }
+
+    /** */
+    public static class RootRewindable<Row> extends RootNode<Row> {
+        /** */
+        public RootRewindable(ExecutionContext<Row> ctx, RelDataType rowType) {
+            super(ctx, rowType);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void rewindInternal() {
+            GridTestUtils.setFieldValue(this, RootNode.class, "waiting", 0);
+            GridTestUtils.setFieldValue(this, RootNode.class, "closed", false);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void closeInternal() {
+            // No-op
+        }
+
+        /** */
+        public void closeRewindableRoot() {
+            super.closeInternal();
+        }
+    }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
index 71d2249..729b79b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
@@ -19,12 +19,9 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
-import java.util.function.Function;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -32,7 +29,6 @@ import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
@@ -49,7 +45,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
-import org.jetbrains.annotations.NotNull;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -1054,74 +1049,6 @@ public class ExecutionTest extends AbstractExecutionTest {
     /**
      *
      */
-    @Test
-    public void testTableSpool() {
-        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
-        IgniteTypeFactory tf = ctx.getTypeFactory();
-        RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
-
-        int[] leftSizes = {1, 99, 100, 101, 512, 513, 2000};
-        int[] rightSizes = {1, 99, 100, 101, 512, 513, 2000};
-        int[] rightBufSizes = {1, 100, 512};
-
-        for (int rightBufSize : rightBufSizes) {
-            for (int leftSize : leftSizes) {
-                for (int rightSize : rightSizes) {
-                    log.info("Check: rightBufSize=" + rightBufSize + ", leftSize=" + leftSize + ", rightSize=" + rightSize);
-
-                    ScanNode<Object[]> left = new ScanNode<>(ctx, rowType, new TestTable(leftSize, rowType));
-                    ScanNode<Object[]> right = new ScanNode<>(ctx, rowType, new TestTable(rightSize, rowType) {
-                        boolean first = true;
-
-                        @Override public @NotNull Iterator<Object[]> iterator() {
-                            assertTrue("Rewind right", first);
-
-                            first = false;
-                            return super.iterator();
-                        }
-                    });
-
-                    TableSpoolNode<Object[]> rightSpool = new TableSpoolNode<>(ctx, rowType);
-
-                    rightSpool.register(Arrays.asList(right));
-
-                    RelDataType joinRowType = TypeUtils.createRowType(
-                        tf,
-                        int.class, String.class, int.class,
-                        int.class, String.class, int.class);
-
-                    CorrelatedNestedLoopJoinNode<Object[]> join = new CorrelatedNestedLoopJoinNode<>(
-                        ctx,
-                        joinRowType,
-                        r -> r[0].equals(r[3]),
-                        ImmutableSet.of(new CorrelationId(0)));
-
-                    GridTestUtils.setFieldValue(join, "rightInBufferSize", rightBufSize);
-
-                    join.register(Arrays.asList(left, rightSpool));
-
-                    RootNode<Object[]> root = new RootNode<>(ctx, joinRowType);
-                    root.register(join);
-
-                    int cnt = 0;
-                    while (root.hasNext()) {
-                        root.next();
-
-                        cnt++;
-                    }
-
-                    assertEquals(
-                        "Invalid result size. [left=" + leftSize + ", right=" + rightSize + ", results=" + cnt,
-                        min(leftSize, rightSize),
-                        cnt);
-                }
-            }
-        }
-    }
-
-    /**
-     *
-     */
     private Object[] row(Object... fields) {
         return fields;
     }
@@ -1155,75 +1082,4 @@ public class ExecutionTest extends AbstractExecutionTest {
             }
         };
     }
-
-    /**
-     *
-     */
-    public static class TestTable implements Iterable<Object[]> {
-        /** */
-        private int rowsCnt;
-
-        /** */
-        private RelDataType rowType;
-
-        /** */
-        private Function<Integer, Object>[] fieldCreators;
-
-        /** */
-        TestTable(int rowsCnt, RelDataType rowType) {
-            this.rowsCnt = rowsCnt;
-            this.rowType = rowType;
-
-            fieldCreators = rowType.getFieldList().stream()
-                .map((Function<RelDataTypeField, Function<Integer, Object>>) (t) -> {
-                    switch (t.getType().getSqlTypeName().getFamily()) {
-                        case NUMERIC:
-                            return TestTable::intField;
-
-                        case CHARACTER:
-                            return TestTable::stringField;
-
-                        default:
-                            assert false : "Not supported type for test: " + t;
-                            return null;
-                    }
-                })
-                .collect(Collectors.toList()).toArray(new Function[rowType.getFieldCount()]);
-        }
-
-        /** */
-        private static Object stringField(Integer rowNum) {
-            return "val_" + rowNum;
-        }
-
-        /** */
-        private static Object intField(Integer rowNum) {
-            return rowNum;
-        }
-
-        /** */
-        private Object[] createRow(int rowNum) {
-            Object[] row = new Object[rowType.getFieldCount()];
-
-            for (int i = 0; i < fieldCreators.length; ++i)
-                row[i] = fieldCreators[i].apply(rowNum);
-
-            return row;
-        }
-
-        /** {@inheritDoc} */
-        @NotNull @Override public Iterator<Object[]> iterator() {
-            return new Iterator<Object[]>() {
-                private int curRow;
-
-                @Override public boolean hasNext() {
-                    return curRow < rowsCnt;
-                }
-
-                @Override public Object[] next() {
-                    return createRow(curRow++);
-                }
-            };
-        }
-    }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolExecutionTest.java
new file mode 100644
index 0000000..e0b0806
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IndexSpoolExecutionTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.lang.GridTuple4;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+@WithSystemProperty(key = "calcite.debug", value = "true")
+public class IndexSpoolExecutionTest extends AbstractExecutionTest {
+    /**
+     * @throws Exception If failed.
+     */
+    @Before
+    @Override public void setup() throws Exception {
+        nodesCnt = 1;
+        super.setup();
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testIndexSpool() throws Exception {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
+
+        int inBufSize = U.field(AbstractNode.class, "IN_BUFFER_SIZE");
+
+        int[] sizes = {1, inBufSize / 2 - 1, inBufSize / 2, inBufSize / 2 + 1, inBufSize, inBufSize + 1, inBufSize * 4};
+
+        for (int size : sizes) {
+            // (filter, lower, upper, expected result size)
+            GridTuple4<Predicate<Object[]>, Object[], Object[], Integer>[] testBounds;
+
+            if (size == 1) {
+                testBounds = new GridTuple4[] {
+                    new GridTuple4(null, new Object[] {null, null, null}, new Object[] {null, null, null}, 1),
+                    new GridTuple4(null, new Object[] {0, null, null}, new Object[] {0, null, null}, 1)
+                };
+            }
+            else {
+                testBounds = new GridTuple4[] {
+                    new GridTuple4(
+                        null,
+                        new Object[] {null, null, null},
+                        new Object[] {null, null, null},
+                        size
+                    ),
+                    new GridTuple4(
+                        null,
+                        new Object[] {size / 2, null, null},
+                        new Object[] {size / 2, null, null},
+                        1
+                    ),
+                    new GridTuple4(
+                        null,
+                        new Object[] {size / 2 + 1, null, null},
+                        new Object[] {size / 2 + 1, null, null},
+                        1
+                    ),
+                    new GridTuple4(
+                        null,
+                        new Object[] {size / 2, null, null},
+                        new Object[] {null, null, null},
+                        size - size / 2
+                    ),
+                    new GridTuple4(
+                        null,
+                        new Object[] {null, null, null},
+                        new Object[] {size / 2, null, null},
+                        size / 2 + 1
+                    ),
+                    new GridTuple4<>(
+                        (Predicate<Object[]>)(r -> ((int)r[0]) < size / 2),
+                        new Object[] {null, null, null},
+                        new Object[] {size / 2, null, null},
+                        size / 2
+                    ),
+                };
+            }
+
+            log.info("Check: size=" + size);
+
+            ScanNode<Object[]> scan = new ScanNode<>(ctx, rowType, new TestTable(size, rowType) {
+                boolean first = true;
+
+                @Override public @NotNull Iterator<Object[]> iterator() {
+                    assertTrue("Rewind right", first);
+
+                    first = false;
+                    return super.iterator();
+                }
+            });
+
+            Object[] lower = new Object[3];
+            Object[] upper = new Object[3];
+            TestPredicate testFilter = new TestPredicate();
+
+            IndexSpoolNode<Object[]> spool = new IndexSpoolNode<>(
+                ctx,
+                rowType,
+                RelCollations.of(ImmutableIntList.of(0)),
+                (o1, o2) -> o1[0] != null ? ((Comparable)o1[0]).compareTo(o2[0]) : 0,
+                testFilter,
+                () -> lower,
+                () -> upper
+            );
+
+            spool.register(Arrays.asList(scan));
+
+            RootRewindable<Object[]> root = new RootRewindable<>(ctx, rowType);
+            root.register(spool);
+
+            for (GridTuple4<Predicate<Object[]>, Object[], Object[], Integer> bound : testBounds) {
+                log.info("Check: bound=" + bound);
+
+                // Set up bounds
+                testFilter.delegate = bound.get1();
+                System.arraycopy(bound.get2(), 0, lower, 0, lower.length);
+                System.arraycopy(bound.get3(), 0, upper, 0, upper.length);
+
+                int cnt = 0;
+
+                while (root.hasNext()) {
+                    root.next();
+
+                    cnt++;
+                }
+
+                assertEquals(
+                    "Invalid result size",
+                    (int)bound.get4(),
+                    cnt);
+
+                root.rewind();
+            }
+
+            root.closeRewindableRoot();
+        }
+    }
+
+    /** */
+    static class TestPredicate implements Predicate<Object[]> {
+        /** */
+        Predicate<Object[]> delegate;
+
+        /** {@inheritDoc} */
+        @Override public boolean test(Object[] objects) {
+            if (delegate == null)
+                return true;
+            else
+                return delegate.test(objects);
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java
new file mode 100644
index 0000000..6a08401
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TableSpoolExecutionTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ */
+@SuppressWarnings("TypeMayBeWeakened")
+@WithSystemProperty(key = "calcite.debug", value = "true")
+public class TableSpoolExecutionTest extends AbstractExecutionTest {
+    /**
+     * @throws Exception If failed.
+     */
+    @Before
+    @Override public void setup() throws Exception {
+        nodesCnt = 1;
+        super.setup();
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testTableSpool() throws Exception {
+        ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), UUID.randomUUID(), 0);
+        IgniteTypeFactory tf = ctx.getTypeFactory();
+        RelDataType rowType = TypeUtils.createRowType(tf, int.class, String.class, int.class);
+
+        int inBufSize = U.field(AbstractNode.class, "IN_BUFFER_SIZE");
+
+        int[] sizes = {1, inBufSize / 2 - 1, inBufSize / 2, inBufSize / 2 + 1, inBufSize, inBufSize + 1, inBufSize * 4};
+//        int[] sizes = {inBufSize * 4};
+        int rewindCnts = 32;
+
+        for (int size : sizes) {
+            log.info("Check: size=" + size);
+
+            ScanNode<Object[]> right = new ScanNode<>(ctx, rowType, new TestTable(size, rowType) {
+                boolean first = true;
+
+                @Override public @NotNull Iterator<Object[]> iterator() {
+                    assertTrue("Rewind table", first);
+
+                    first = false;
+                    return super.iterator();
+                }
+            });
+
+            TableSpoolNode<Object[]> spool = new TableSpoolNode<>(ctx, rowType);
+
+            spool.register(Arrays.asList(right));
+
+            RootRewindable<Object[]> root = new RootRewindable<>(ctx, rowType);
+            root.register(spool);
+
+            for (int i = 0; i < rewindCnts; ++i) {
+                int cnt = 0;
+
+                while (root.hasNext()) {
+                    root.next();
+
+                    cnt++;
+                }
+
+                assertEquals(
+                    "Invalid result size",
+                    size,
+                    cnt);
+
+                root.rewind();
+            }
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
new file mode 100644
index 0000000..6e5b112
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelReferentialConstraint;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
+import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
+import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
+import org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
+import org.apache.ignite.internal.processors.query.calcite.metadata.CollocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Cloner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
+import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTraitDef;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.junit.After;
+import org.junit.Before;
+
+import static org.apache.calcite.tools.Frameworks.createRootSchema;
+import static org.apache.calcite.tools.Frameworks.newConfigBuilder;
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonWriter.toJson;
+
+/**
+ *
+ */
+//@WithSystemProperty(key = "calcite.debug", value = "true")
+@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened", "unchecked"})
+public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
+    /** */
+    protected List<UUID> nodes;
+
+    /** */
+    protected List<QueryTaskExecutorImpl> executors;
+
+    /** */
+    protected volatile Throwable lastE;
+
+    /** */
+    @Before
+    public void setup() {
+        nodes = new ArrayList<>(4);
+
+        for (int i = 0; i < 4; i++)
+            nodes.add(UUID.randomUUID());
+    }
+
+    /** */
+    @After
+    public void tearDown() throws Throwable {
+        if (!F.isEmpty(executors))
+            executors.forEach(QueryTaskExecutorImpl::tearDown);
+
+        if (lastE != null)
+            throw lastE;
+    }
+
+    /** */
+    interface TestVisitor {
+        public void visit(RelNode node, int ordinal, RelNode parent);
+    }
+
+    /** */
+    public static class TestRelVisitor extends RelVisitor {
+        /** */
+        final TestVisitor v;
+
+        /** */
+        TestRelVisitor(TestVisitor v) {
+            this.v = v;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void visit(RelNode node, int ordinal, RelNode parent) {
+            v.visit(node, ordinal, parent);
+
+            super.visit(node, ordinal, parent);
+        }
+    }
+
+    /** */
+    protected static void relTreeVisit(RelNode n, TestVisitor v) {
+        v.visit(n, -1, null);
+
+        n.childrenAccept(new TestRelVisitor(v));
+    }
+
+    /** */
+    public static <T extends RelNode> T findFirstNode(RelNode plan, Predicate<RelNode> pred) {
+        return F.first(findNodes(plan, pred));
+    }
+
+    /** */
+    public static <T extends RelNode> List<T> findNodes(RelNode plan, Predicate<RelNode> pred) {
+        List<T> ret = new ArrayList<>();
+
+        plan.childrenAccept(
+            new RelVisitor() {
+                @Override public void visit(RelNode node, int ordinal, RelNode parent) {
+                    if (pred.test(node))
+                        ret.add((T)node);
+
+                    super.visit(node, ordinal, parent);
+                }
+            }
+        );
+
+        return ret;
+    }
+
+    /** */
+    public static <T extends RelNode> Predicate<RelNode> byClass(Class<T> cls) {
+        return node -> cls.isInstance(node);
+    }
+
+    /** */
+    public static <T extends RelNode> Predicate<RelNode> byClass(Class<T> cls, Predicate<RelNode> pred) {
+        return node -> cls.isInstance(node) && pred.test(node);
+    }
+
+    /** */
+    protected IgniteRel physicalPlan(String sql, IgniteSchema publicSchema, String... disabledRules) throws Exception {
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        RelTraitDef<?>[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(F.first(nodes))
+            .originatingNodeId(F.first(nodes))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .logger(log)
+            .query(sql)
+            .topologyVersion(AffinityTopologyVersion.NONE)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(qry);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            assertNotNull(rel);
+
+            // Transformation chain
+            RelTraitSet desired = rel.getTraitSet()
+                .replace(IgniteConvention.INSTANCE)
+                .replace(IgniteDistributions.single())
+                .replace(CorrelationTrait.UNCORRELATED)
+                .replace(RewindabilityTrait.ONE_WAY)
+                .simplify();
+
+            planner.setDisabledRules(ImmutableSet.copyOf(disabledRules));
+
+            try {
+                IgniteRel res = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+
+                return res;
+            }
+            catch (Throwable ex) {
+                System.err.println(planner.dump());
+
+                throw ex;
+            }
+        }
+    }
+
+    /** */
+    protected RelNode originalLogicalTree(String sql, IgniteSchema publicSchema, String... disabledRules) throws Exception {
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        RelTraitDef<?>[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(F.first(nodes))
+            .originatingNodeId(F.first(nodes))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .logger(log)
+            .query(sql)
+            .topologyVersion(AffinityTopologyVersion.NONE)
+            .build();
+
+        RelRoot relRoot;
+
+        try (IgnitePlanner planner = ctx.planner()) {
+            assertNotNull(planner);
+
+            String qry = ctx.query();
+
+            assertNotNull(qry);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(qry);
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            assertNotNull(rel);
+
+            return rel;
+        }
+    }
+
+    /** */
+    protected void checkSplitAndSerialization(IgniteRel rel, IgniteSchema publicSchema) {
+        rel = Cloner.clone(rel);
+
+        SchemaPlus schema = createRootSchema(false)
+            .add("PUBLIC", publicSchema);
+
+        List<Fragment> fragments = new Splitter().go(rel);
+        List<String> serialized = new ArrayList<>(fragments.size());
+
+        for (Fragment fragment : fragments)
+            serialized.add(toJson(fragment.root()));
+
+        assertNotNull(serialized);
+
+        RelTraitDef<?>[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE,
+            RelCollationTraitDef.INSTANCE,
+            RewindabilityTraitDef.INSTANCE,
+            CorrelationTraitDef.INSTANCE
+        };
+
+        PlanningContext ctx = PlanningContext.builder()
+            .localNodeId(F.first(nodes))
+            .originatingNodeId(F.first(nodes))
+            .parentContext(Contexts.empty())
+            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
+                .defaultSchema(schema)
+                .traitDefs(traitDefs)
+                .build())
+            .logger(log)
+            .topologyVersion(AffinityTopologyVersion.NONE)
+            .build();
+
+        List<RelNode> deserializedNodes = new ArrayList<>();
+
+        try (IgnitePlanner ignored = ctx.planner()) {
+            for (String s : serialized) {
+                RelJsonReader reader = new RelJsonReader(ctx.cluster(), ctx.catalogReader());
+                deserializedNodes.add(reader.read(s));
+            }
+        }
+
+        List<RelNode> expectedRels = fragments.stream()
+            .map(f -> f.root())
+            .collect(Collectors.toList());
+
+        assertEquals("Invalid deserialization fragments count", expectedRels.size(), deserializedNodes.size());
+
+        for (int i = 0; i < expectedRels.size(); ++i) {
+            RelNode expected = expectedRels.get(i);
+            RelNode deserialized = deserializedNodes.get(i);
+
+            clearTraits(expected);
+            clearTraits(deserialized);
+
+            if (!expected.deepEquals(deserialized))
+            assertTrue(
+                "Invalid serialization / deserialization.\n" +
+                    "Expected:\n" + RelOptUtil.toString(expected) +
+                    "Deserialized:\n" + RelOptUtil.toString(deserialized),
+                expected.deepEquals(deserialized)
+            );
+        }
+    }
+
+    /** */
+    protected void clearTraits(RelNode rel) {
+        GridTestUtils.setFieldValue(rel, AbstractRelNode.class, "traitSet", RelTraitSet.createEmpty());
+        rel.getInputs().forEach(this::clearTraits);
+    }
+
+    /** */
+    protected List<UUID> intermediateMapping(@NotNull AffinityTopologyVersion topVer, boolean single, @Nullable Predicate<ClusterNode> filter) {
+        return single ? select(nodes, 0) : select(nodes, 0, 1, 2, 3);
+    }
+
+    /** */
+    public static <T> List<T> select(List<T> src, int... idxs) {
+        ArrayList<T> res = new ArrayList<>(idxs.length);
+
+        for (int idx : idxs)
+            res.add(src.get(idx));
+
+        return res;
+    }
+
+    /** */
+    protected <Row> Row row(ExecutionContext<Row> ctx, ImmutableBitSet requiredColumns, Object... fields) {
+        Type[] types = new Type[fields.length];
+        for (int i = 0; i < fields.length; i++)
+            types[i] = fields[i] == null ? Object.class : fields[i].getClass();
+
+        if (requiredColumns == null) {
+            for (int i = 0; i < fields.length; i++)
+                types[i] = fields[i] == null ? Object.class : fields[i].getClass();
+        }
+        else {
+            for (int i = 0, j = requiredColumns.nextSetBit(0); j != -1; j = requiredColumns.nextSetBit(j + 1), i++)
+                types[i] = fields[i] == null ? Object.class : fields[i].getClass();
+        }
+
+        return ctx.rowHandler().factory(types).create(fields);
+    }
+
+    /** */
+    abstract static class TestTable implements IgniteTable {
+        /** */
+        private final RelProtoDataType protoType;
+
+        /** */
+        private final Map<String, IgniteIndex> indexes = new HashMap<>();
+
+        /** */
+        private final RewindabilityTrait rewindable;
+
+        /** */
+        private final double rowCnt;
+
+        /** */
+        TestTable(RelDataType type) {
+            this(type, RewindabilityTrait.REWINDABLE);
+        }
+
+        /** */
+        TestTable(RelDataType type, RewindabilityTrait rewindable) {
+            this(type, rewindable, 100.0);
+        }
+
+        /** */
+        TestTable(RelDataType type, RewindabilityTrait rewindable, double rowCnt) {
+            protoType = RelDataTypeImpl.proto(type);
+            this.rewindable = rewindable;
+            this.rowCnt = rowCnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteLogicalTableScan toRel(RelOptCluster cluster, RelOptTable relOptTbl) {
+            RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE)
+                .replaceIf(RewindabilityTraitDef.INSTANCE, () -> rewindable)
+                .replaceIf(DistributionTraitDef.INSTANCE, this::distribution);
+
+            return IgniteLogicalTableScan.create(cluster, traitSet, relOptTbl, null, null, null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteLogicalIndexScan toRel(RelOptCluster cluster, RelOptTable relOptTbl, String idxName) {
+            RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE)
+                .replaceIf(DistributionTraitDef.INSTANCE, this::distribution)
+                .replaceIf(RewindabilityTraitDef.INSTANCE, () -> rewindable)
+                .replaceIf(RelCollationTraitDef.INSTANCE, getIndex(idxName)::collation);
+
+            return IgniteLogicalIndexScan.create(cluster, traitSet, relOptTbl, idxName, null, null, null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet bitSet) {
+            RelDataType rowType = protoType.apply(typeFactory);
+
+            if (bitSet != null) {
+                RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(typeFactory);
+                for (int i = bitSet.nextSetBit(0); i != -1; i = bitSet.nextSetBit(i + 1))
+                    b.add(rowType.getFieldList().get(i));
+                rowType = b.build();
+            }
+
+            return rowType;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Statistic getStatistic() {
+            return new Statistic() {
+                /** {@inheritDoc */
+                @Override public Double getRowCount() {
+                    return rowCnt;
+                }
+
+                /** {@inheritDoc */
+                @Override public boolean isKey(ImmutableBitSet cols) {
+                    return false;
+                }
+
+                /** {@inheritDoc */
+                @Override public List<ImmutableBitSet> getKeys() {
+                    throw new AssertionError();
+                }
+
+                /** {@inheritDoc */
+                @Override public List<RelReferentialConstraint> getReferentialConstraints() {
+                    throw new AssertionError();
+                }
+
+                /** {@inheritDoc */
+                @Override public List<RelCollation> getCollations() {
+                    return Collections.emptyList();
+                }
+
+                /** {@inheritDoc */
+                @Override public RelDistribution getDistribution() {
+                    throw new AssertionError();
+                }
+            };
+        }
+
+        /** {@inheritDoc} */
+        @Override public <Row> Iterable<Row> scan(
+            ExecutionContext<Row> execCtx,
+            CollocationGroup group, Predicate<Row> filter,
+            Function<Row, Row> transformer,
+            ImmutableBitSet bitSet
+        ) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Schema.TableType getJdbcTableType() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isRolledUp(String col) {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean rolledUpColumnValidInsideAgg(
+            String column,
+            SqlCall call,
+            SqlNode parent,
+            CalciteConnectionConfig config
+        ) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public CollocationGroup colocationGroup(PlanningContext ctx) {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteDistribution distribution() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public TableDescriptor descriptor() {
+            throw new AssertionError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<String, IgniteIndex> indexes() {
+            return Collections.unmodifiableMap(indexes);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void addIndex(IgniteIndex idxTbl) {
+            indexes.put(idxTbl.name(), idxTbl);
+        }
+
+        /** */
+        public TestTable addIndex(RelCollation collation, String name) {
+            indexes.put(name, new IgniteIndex(collation, name, null, this));
+
+            return this;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteIndex getIndex(String idxName) {
+            return indexes.get(idxName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeIndex(String idxName) {
+            throw new AssertionError();
+        }
+    }
+
+    /** */
+    static class TestMessageServiceImpl extends MessageServiceImpl {
+        /** */
+        private final TestIoManager mgr;
+
+        /** */
+        TestMessageServiceImpl(GridTestKernalContext kernal, TestIoManager mgr) {
+            super(kernal);
+            this.mgr = mgr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void send(UUID nodeId, CalciteMessage msg) {
+            mgr.send(localNodeId(), nodeId, msg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean alive(UUID nodeId) {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void prepareMarshal(Message msg) {
+            // No-op;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void prepareUnmarshal(Message msg) {
+            // No-op;
+        }
+    }
+
+    /** */
+    class TestFailureProcessor extends FailureProcessor {
+        /** */
+        TestFailureProcessor(GridTestKernalContext kernal) {
+            super(kernal);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean process(FailureContext failureCtx) {
+            Throwable ex = failureContext().error();
+            log().error(ex.getMessage(), ex);
+
+            lastE = ex;
+
+            return true;
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedNestedLoopJoinPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedNestedLoopJoinPlannerTest.java
new file mode 100644
index 0000000..6c1e3bf
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/CorrelatedNestedLoopJoinPlannerTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.junit.Test;
+
+/**
+ *
+ */
+@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
+public class CorrelatedNestedLoopJoinPlannerTest extends AbstractPlannerTest {
+    /**
+     * Check equi-join. CorrelatedNestedLoopJoinTest is applicable for it.
+     */
+    @Test
+    public void testValidIndexExpressions() throws Exception {
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        publicSchema.addTable(
+            "T0",
+            new TestTable(
+                new RelDataTypeFactory.Builder(f)
+                    .add("ID", f.createJavaType(Integer.class))
+                    .add("JID", f.createJavaType(Integer.class))
+                    .add("VAL", f.createJavaType(String.class))
+                    .build()) {
+
+                @Override public IgniteDistribution distribution() {
+                    return IgniteDistributions.broadcast();
+                }
+            }
+        );
+
+        publicSchema.addTable(
+            "T1",
+            new TestTable(
+                new RelDataTypeFactory.Builder(f)
+                    .add("ID", f.createJavaType(Integer.class))
+                    .add("JID", f.createJavaType(Integer.class))
+                    .add("VAL", f.createJavaType(String.class))
+                    .build()) {
+
+                @Override public IgniteDistribution distribution() {
+                    return IgniteDistributions.broadcast();
+                }
+            }
+                .addIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "t1_jid_idx")
+        );
+
+        String sql = "select * " +
+            "from t0 " +
+            "join t1 on t0.jid = t1.jid";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema,
+            "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule"
+        );
+
+        assertNotNull(phys);
+
+        checkSplitAndSerialization(phys, publicSchema);
+
+        IgniteIndexScan idxScan = findFirstNode(phys, byClass(IgniteIndexScan.class));
+
+        List<RexNode> lBound = idxScan.lowerBound();
+
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), lBound);
+        assertEquals(3, lBound.size());
+
+        assertTrue(((RexLiteral)lBound.get(0)).isNull());
+        assertTrue(((RexLiteral)lBound.get(2)).isNull());
+        assertTrue(lBound.get(1) instanceof RexFieldAccess);
+
+        List<RexNode> uBound = idxScan.upperBound();
+
+        assertNotNull("Invalid plan\n" + RelOptUtil.toString(phys), uBound);
+        assertEquals(3, uBound.size());
+
+        assertTrue(((RexLiteral)uBound.get(0)).isNull());
+        assertTrue(((RexLiteral)uBound.get(2)).isNull());
+        assertTrue(uBound.get(1) instanceof RexFieldAccess);
+    }
+
+    /**
+     * Check join with not equi condition.
+     * Current implementation of the CorrelatedNestedLoopJoinTest is not applicable for such case.
+     */
+    @Test
+    public void testInvalidIndexExpressions() throws Exception {
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        publicSchema.addTable(
+            "T0",
+            new TestTable(
+                new RelDataTypeFactory.Builder(f)
+                    .add("ID", f.createJavaType(Integer.class))
+                    .add("JID", f.createJavaType(Integer.class))
+                    .add("VAL", f.createJavaType(String.class))
+                    .build()) {
+
+                @Override public IgniteDistribution distribution() {
+                    return IgniteDistributions.broadcast();
+                }
+            }
+                .addIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "t0_jid_idx")
+        );
+
+        publicSchema.addTable(
+            "T1",
+            new TestTable(
+                new RelDataTypeFactory.Builder(f)
+                    .add("ID", f.createJavaType(Integer.class))
+                    .add("JID", f.createJavaType(Integer.class))
+                    .add("VAL", f.createJavaType(String.class))
+                    .build()) {
+
+                @Override public IgniteDistribution distribution() {
+                    return IgniteDistributions.broadcast();
+                }
+            }
+                .addIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "t1_jid_idx")
+        );
+
+        String sql = "select * " +
+            "from t0 " +
+            "join t1 on t0.jid + 2 > t1.jid * 2";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema,
+            "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule"
+        );
+
+        assertNotNull(phys);
+
+        checkSplitAndSerialization(phys, publicSchema);
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/IndexSpoolPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/IndexSpoolPlannerTest.java
new file mode 100644
index 0000000..d9eaae6
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/IndexSpoolPlannerTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexSpool;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.junit.Test;
+
+/**
+ *
+ */
+@SuppressWarnings({"FieldCanBeLocal"})
+public class IndexSpoolPlannerTest extends AbstractPlannerTest {
+    /**
+     * Check equi-join on not collocated fields.
+     * CorrelatedNestedLoopJoinTest is applicable for this case only with IndexSpool.
+     */
+    @Test
+    public void test() throws Exception {
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        publicSchema.addTable(
+            "T0",
+            new TestTable(
+                new RelDataTypeFactory.Builder(f)
+                    .add("ID", f.createJavaType(Integer.class))
+                    .add("JID", f.createJavaType(Integer.class))
+                    .add("VAL", f.createJavaType(String.class))
+                    .build()) {
+
+                @Override public IgniteDistribution distribution() {
+                    return IgniteDistributions.affinity(0, "T0", "hash");
+                }
+            }
+                .addIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "t0_jid_idx")
+        );
+
+        publicSchema.addTable(
+            "T1",
+            new TestTable(
+                new RelDataTypeFactory.Builder(f)
+                    .add("ID", f.createJavaType(Integer.class))
+                    .add("JID", f.createJavaType(Integer.class))
+                    .add("VAL", f.createJavaType(String.class))
+                    .build()) {
+
+                @Override public IgniteDistribution distribution() {
+                    return IgniteDistributions.affinity(0, "T1", "hash");
+                }
+            }
+                .addIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "t1_jid_idx")
+        );
+
+        String sql = "select * " +
+            "from t0 " +
+            "join t1 on t0.jid = t1.jid";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema,
+            "MergeJoinConverter", "NestedLoopJoinConverter"
+        );
+
+        checkSplitAndSerialization(phys, publicSchema);
+
+        IgniteIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteIndexSpool.class));
+
+        List<RexNode> lBound = idxSpool.indexCondition().lowerBound();
+
+        assertNotNull(lBound);
+        assertEquals(3, lBound.size());
+
+        assertTrue(((RexLiteral)lBound.get(0)).isNull());
+        assertTrue(((RexLiteral)lBound.get(2)).isNull());
+        assertTrue(lBound.get(1) instanceof RexFieldAccess);
+
+        List<RexNode> uBound = idxSpool.indexCondition().upperBound();
+
+        assertNotNull(uBound);
+        assertEquals(3, uBound.size());
+
+        assertTrue(((RexLiteral)uBound.get(0)).isNull());
+        assertTrue(((RexLiteral)uBound.get(2)).isNull());
+        assertTrue(uBound.get(1) instanceof RexFieldAccess);
+    }
+
+    /**
+     * Check case when exists index (collation) isn't applied not for whole join condition
+     * but may be used by part of condition.
+     */
+    @Test
+    public void testPartialIndexForCondition() throws Exception {
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        publicSchema.addTable(
+            "T0",
+            new TestTable(
+                new RelDataTypeFactory.Builder(f)
+                    .add("ID", f.createJavaType(Integer.class))
+                    .add("JID0", f.createJavaType(Integer.class))
+                    .add("JID1", f.createJavaType(Integer.class))
+                    .add("VAL", f.createJavaType(String.class))
+                    .build()) {
+
+                @Override public IgniteDistribution distribution() {
+                    return IgniteDistributions.affinity(0, "T0", "hash");
+                }
+            }
+        );
+
+        publicSchema.addTable(
+            "T1",
+            new TestTable(
+                new RelDataTypeFactory.Builder(f)
+                    .add("ID", f.createJavaType(Integer.class))
+                    .add("JID0", f.createJavaType(Integer.class))
+                    .add("JID1", f.createJavaType(Integer.class))
+                    .add("VAL", f.createJavaType(String.class))
+                    .build()) {
+
+                @Override public IgniteDistribution distribution() {
+                    return IgniteDistributions.affinity(0, "T1", "hash");
+                }
+            }
+                .addIndex(RelCollations.of(ImmutableIntList.of(1, 0)), "t1_jid0_idx")
+        );
+
+        String sql = "select * " +
+            "from t0 " +
+            "join t1 on t0.jid0 = t1.jid0 and t0.jid1 = t1.jid1";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema,
+            "MergeJoinConverter", "NestedLoopJoinConverter"
+        );
+
+        checkSplitAndSerialization(phys, publicSchema);
+
+        IgniteIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteIndexSpool.class));
+
+        List<RexNode> lBound = idxSpool.indexCondition().lowerBound();
+
+        assertNotNull(lBound);
+        assertEquals(4, lBound.size());
+
+        assertTrue(((RexLiteral)lBound.get(0)).isNull());
+        assertTrue(((RexLiteral)lBound.get(2)).isNull());
+        assertTrue(((RexLiteral)lBound.get(3)).isNull());
+        assertTrue(lBound.get(1) instanceof RexFieldAccess);
+
+        List<RexNode> uBound = idxSpool.indexCondition().upperBound();
+
+        assertNotNull(uBound);
+        assertEquals(4, uBound.size());
+
+        assertTrue(((RexLiteral)uBound.get(0)).isNull());
+        assertTrue(((RexLiteral)lBound.get(2)).isNull());
+        assertTrue(((RexLiteral)lBound.get(3)).isNull());
+        assertTrue(uBound.get(1) instanceof RexFieldAccess);
+    }
+
+    /**
+     * Check equi-join on not collocated fields without indexes.
+     */
+    @Test
+    public void testSourceWithoutCollation() throws Exception {
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        publicSchema.addTable(
+            "T0",
+            new TestTable(
+                new RelDataTypeFactory.Builder(f)
+                    .add("ID", f.createJavaType(Integer.class))
+                    .add("JID", f.createJavaType(Integer.class))
+                    .add("VAL", f.createJavaType(String.class))
+                    .build()) {
+
+                @Override public IgniteDistribution distribution() {
+                    return IgniteDistributions.affinity(0, "T0", "hash");
+                }
+            }
+        );
+
+        publicSchema.addTable(
+            "T1",
+            new TestTable(
+                new RelDataTypeFactory.Builder(f)
+                    .add("ID", f.createJavaType(Integer.class))
+                    .add("JID", f.createJavaType(Integer.class))
+                    .add("VAL", f.createJavaType(String.class))
+                    .build()) {
+
+                @Override public IgniteDistribution distribution() {
+                    return IgniteDistributions.affinity(0, "T1", "hash");
+                }
+            }
+        );
+
+        String sql = "select * " +
+            "from t0 " +
+            "join t1 on t0.jid = t1.jid";
+
+        IgniteRel phys = physicalPlan(
+            sql,
+            publicSchema,
+            "MergeJoinConverter", "NestedLoopJoinConverter"
+        );
+
+        checkSplitAndSerialization(phys, publicSchema);
+
+        IgniteIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteIndexSpool.class));
+
+        assertTrue(idxSpool.getInput() instanceof IgniteSort);
+
+        List<RexNode> lBound = idxSpool.indexCondition().lowerBound();
+
+        assertNotNull(lBound);
+        assertEquals(3, lBound.size());
+
+        assertTrue(((RexLiteral)lBound.get(0)).isNull());
+        assertTrue(((RexLiteral)lBound.get(2)).isNull());
+        assertTrue(lBound.get(1) instanceof RexFieldAccess);
+
+        List<RexNode> uBound = idxSpool.indexCondition().upperBound();
+
+        assertNotNull(uBound);
+        assertEquals(3, uBound.size());
+
+        assertTrue(((RexLiteral)uBound.get(0)).isNull());
+        assertTrue(((RexLiteral)uBound.get(2)).isNull());
+        assertTrue(uBound.get(1) instanceof RexFieldAccess);
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
similarity index 84%
rename from modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
rename to modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index 7f8a4b8..740af2b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -15,15 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite;
+package org.apache.ignite.internal.processors.query.calcite.planner;
 
-import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -33,39 +29,23 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.linq4j.Linq4j;
 import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelDistribution;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelReferentialConstraint;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeImpl;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeServiceImpl;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
@@ -76,7 +56,6 @@ import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.Outbox;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
 import org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader;
-import org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
 import org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
 import org.apache.ignite.internal.processors.query.calcite.metadata.CollocationGroup;
@@ -95,13 +74,8 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteLimit;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
-import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
-import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import org.apache.ignite.internal.processors.query.calcite.schema.TableDescriptor;
 import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
@@ -113,15 +87,10 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.calcite.tools.Frameworks.createRootSchema;
@@ -135,35 +104,7 @@ import static org.apache.ignite.internal.processors.query.calcite.externalize.Re
  */
 //@WithSystemProperty(key = "calcite.debug", value = "true")
 @SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
-public class PlannerTest extends GridCommonAbstractTest {
-    /** */
-    private List<UUID> nodes;
-
-    /** */
-    private List<QueryTaskExecutorImpl> executors;
-
-    /** */
-    private volatile Throwable lastE;
-
-    /** */
-    @Before
-    public void setup() {
-        nodes = new ArrayList<>(4);
-
-        for (int i = 0; i < 4; i++)
-            nodes.add(UUID.randomUUID());
-    }
-
-    /** */
-    @After
-    public void tearDown() throws Throwable {
-        if (!F.isEmpty(executors))
-            executors.forEach(QueryTaskExecutorImpl::tearDown);
-
-        if (lastE != null)
-            throw lastE;
-    }
-
+public class PlannerTest extends AbstractPlannerTest {
     /**
      * @throws Exception If failed.
      */
@@ -797,11 +738,11 @@ public class PlannerTest extends GridCommonAbstractTest {
                         Supplier<Row> lowerIdxConditions,
                         Supplier<Row> upperIdxConditions,
                         Function<Row, Row> rowTransformer,
-                        @Nullable ImmutableBitSet requiredColunms
+                        @Nullable ImmutableBitSet requiredColumns
                     ) {
                         return Linq4j.asEnumerable(Arrays.asList(
-                            row(execCtx, requiredColunms, 0, "Igor", 0),
-                            row(execCtx, requiredColunms, 1, "Roman", 0)
+                            row(execCtx, requiredColumns, 0, "Igor", 0),
+                            row(execCtx, requiredColumns, 1, "Roman", 0)
                         ));
                     }
                 };
@@ -837,11 +778,11 @@ public class PlannerTest extends GridCommonAbstractTest {
                         Supplier<Row> lowerIdxConditions,
                         Supplier<Row> upperIdxConditions,
                         Function<Row, Row> rowTransformer,
-                        @Nullable ImmutableBitSet requiredColunms
+                        @Nullable ImmutableBitSet requiredColumns
                     ) {
                         return Linq4j.asEnumerable(Arrays.asList(
-                            row(execCtx, requiredColunms, 0, "Calcite", 1),
-                            row(execCtx, requiredColunms, 1, "Ignite", 1)
+                            row(execCtx, requiredColumns, 0, "Calcite", 1),
+                            row(execCtx, requiredColumns, 1, "Ignite", 1)
                         ));
                     }
                 };
@@ -2649,7 +2590,7 @@ public class PlannerTest extends GridCommonAbstractTest {
                 .replace(CorrelationTrait.UNCORRELATED)
                 .simplify();
 
-            RelNode phys = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
+            IgniteRel phys = planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
 
             assertNotNull(phys);
             assertEquals(
@@ -2658,6 +2599,8 @@ public class PlannerTest extends GridCommonAbstractTest {
                     "  IgniteTableScan(table=[[PUBLIC, DEPT]], requiredColumns=[{0}])\n" +
                     "  IgniteTableScan(table=[[PUBLIC, EMP]], filters=[=(CAST(+($cor1.DEPTNO, $t0)):INTEGER, 2)], requiredColumns=[{2}])\n",
                 RelOptUtil.toString(phys));
+
+            checkSplitAndSerialization(phys, publicSchema);
         }
     }
 
@@ -2703,7 +2646,7 @@ public class PlannerTest extends GridCommonAbstractTest {
 
         String sql = "select * from dept d join emp e on d.deptno = e.deptno and e.name = d.name order by e.name, d.deptno";
 
-        RelNode phys = physicalPlan(sql, publicSchema);
+        IgniteRel phys = physicalPlan(sql, publicSchema);
 
         assertNotNull(phys);
         assertEquals("" +
@@ -2711,6 +2654,8 @@ public class PlannerTest extends GridCommonAbstractTest {
                 "  IgniteIndexScan(table=[[PUBLIC, DEPT]], index=[dep_idx])\n" +
                 "  IgniteIndexScan(table=[[PUBLIC, EMP]], index=[emp_idx])\n",
             RelOptUtil.toString(phys));
+
+        checkSplitAndSerialization(phys, publicSchema);
     }
 
     /** */
@@ -2768,128 +2713,6 @@ public class PlannerTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     @Test
-    public void tableSpoolDistributed() throws Exception {
-        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
-        TestTable t0 = new TestTable(
-            new RelDataTypeFactory.Builder(f)
-                .add("ID", f.createJavaType(Integer.class))
-                .add("JID", f.createJavaType(Integer.class))
-                .add("VAL", f.createJavaType(String.class))
-                .build()) {
-
-            @Override public IgniteDistribution distribution() {
-                return IgniteDistributions.affinity(0, "T0", "hash");
-            }
-        };
-
-        TestTable t1 = new TestTable(
-            new RelDataTypeFactory.Builder(f)
-                .add("ID", f.createJavaType(Integer.class))
-                .add("JID", f.createJavaType(Integer.class))
-                .add("VAL", f.createJavaType(String.class))
-                .build()) {
-
-            @Override public IgniteDistribution distribution() {
-                return IgniteDistributions.affinity(0, "T1", "hash");
-            }
-        };
-
-        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
-        publicSchema.addTable("T0", t0);
-        publicSchema.addTable("T1", t1);
-
-        String sql = "select * " +
-            "from t0 " +
-            "join t1 on t0.jid = t1.jid";
-
-        RelNode phys = physicalPlan(sql, publicSchema, "NestedLoopJoinConverter", "MergeJoinConverter");
-
-        assertNotNull(phys);
-
-        AtomicInteger spoolCnt = new AtomicInteger();
-
-        phys.childrenAccept(
-            new RelVisitor() {
-                @Override public void visit(RelNode node, int ordinal, RelNode parent) {
-                    if (node instanceof IgniteTableSpool)
-                        spoolCnt.incrementAndGet();
-
-                    super.visit(node, ordinal, parent);
-                }
-            }
-        );
-
-        assertEquals(1, spoolCnt.get());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
-    public void tableSpoolBroadcastNotRewindable() throws Exception {
-        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
-
-        TestTable t0 = new TestTable(
-            new RelDataTypeFactory.Builder(f)
-                .add("ID", f.createJavaType(Integer.class))
-                .add("JID", f.createJavaType(Integer.class))
-                .add("VAL", f.createJavaType(String.class))
-                .build(),
-            RewindabilityTrait.ONE_WAY) {
-
-            @Override public IgniteDistribution distribution() {
-                return IgniteDistributions.broadcast();
-            }
-        };
-
-        TestTable t1 = new TestTable(
-            new RelDataTypeFactory.Builder(f)
-                .add("ID", f.createJavaType(Integer.class))
-                .add("JID", f.createJavaType(Integer.class))
-                .add("VAL", f.createJavaType(String.class))
-                .build(),
-            RewindabilityTrait.ONE_WAY) {
-
-            @Override public IgniteDistribution distribution() {
-                return IgniteDistributions.broadcast();
-            }
-        };
-
-        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
-
-        publicSchema.addTable("T0", t0);
-        publicSchema.addTable("T1", t1);
-
-        String sql = "select * " +
-            "from t0 " +
-            "join t1 on t0.jid = t1.jid";
-
-        RelNode phys = physicalPlan(sql, publicSchema, "NestedLoopJoinConverter", "MergeJoinConverter");
-
-        assertNotNull(phys);
-
-        AtomicInteger spoolCnt = new AtomicInteger();
-
-        phys.childrenAccept(
-            new RelVisitor() {
-                @Override public void visit(RelNode node, int ordinal, RelNode parent) {
-                    if (node instanceof IgniteTableSpool)
-                        spoolCnt.incrementAndGet();
-
-                    super.visit(node, ordinal, parent);
-                }
-            }
-        );
-
-        assertEquals(1, spoolCnt.get());
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @Test
     public void testLimit() throws Exception {
         IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
 
@@ -2910,7 +2733,7 @@ public class PlannerTest extends GridCommonAbstractTest {
         String sql = "SELECT * FROM TEST OFFSET 10 ROWS FETCH FIRST 10 ROWS ONLY";
 
         {
-            RelNode phys = physicalPlan(sql, publicSchema);
+            IgniteRel phys = physicalPlan(sql, publicSchema);
 
             assertNotNull(phys);
 
@@ -2928,12 +2751,14 @@ public class PlannerTest extends GridCommonAbstractTest {
 
             assertEquals("Invalid plan: \n" + RelOptUtil.toString(phys), 1, limit.get());
             assertFalse("Invalid plan: \n" + RelOptUtil.toString(phys), sort.get());
+
+            checkSplitAndSerialization(phys, publicSchema);
         }
 
         sql = "SELECT * FROM TEST ORDER BY ID OFFSET 10 ROWS FETCH FIRST 10 ROWS ONLY";
 
         {
-            RelNode phys = physicalPlan(sql, publicSchema);
+            IgniteRel phys = physicalPlan(sql, publicSchema);
 
             assertNotNull(phys);
 
@@ -2951,344 +2776,8 @@ public class PlannerTest extends GridCommonAbstractTest {
 
             assertEquals("Invalid plan: \n" + RelOptUtil.toString(phys), 1, limit.get());
             assertTrue("Invalid plan: \n" + RelOptUtil.toString(phys), sort.get());
-        }
-    }
-
-    /** */
-    interface TestVisitor {
-        public void visit(RelNode node, int ordinal, RelNode parent);
-    }
-
-    /** */
-    private static class TestRelVisitor extends RelVisitor {
-        /** */
-        final TestVisitor v;
-
-        /** */
-        TestRelVisitor(TestVisitor v) {
-            this.v = v;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void visit(RelNode node, int ordinal, RelNode parent) {
-            v.visit(node, ordinal, parent);
-
-            super.visit(node, ordinal, parent);
-        }
-    }
-
-    /** */
-    protected static void relTreeVisit(RelNode n, TestVisitor v) {
-        v.visit(n, -1, null);
-
-        n.childrenAccept(new TestRelVisitor(v));
-    }
-
-    /** */
-    private IgniteRel physicalPlan(String sql, IgniteSchema publicSchema, String... disabledRules) throws Exception {
-        SchemaPlus schema = createRootSchema(false)
-            .add("PUBLIC", publicSchema);
-
-        RelTraitDef<?>[] traitDefs = {
-            DistributionTraitDef.INSTANCE,
-            ConventionTraitDef.INSTANCE,
-            RelCollationTraitDef.INSTANCE,
-            RewindabilityTraitDef.INSTANCE,
-            CorrelationTraitDef.INSTANCE
-        };
-
-        PlanningContext ctx = PlanningContext.builder()
-            .localNodeId(F.first(nodes))
-            .originatingNodeId(F.first(nodes))
-            .parentContext(Contexts.empty())
-            .frameworkConfig(newConfigBuilder(FRAMEWORK_CONFIG)
-                .defaultSchema(schema)
-                .traitDefs(traitDefs)
-                .build())
-            .logger(log)
-            .query(sql)
-            .topologyVersion(AffinityTopologyVersion.NONE)
-            .build();
-
-        RelRoot relRoot;
-
-        try (IgnitePlanner planner = ctx.planner()) {
-            assertNotNull(planner);
-
-            String qry = ctx.query();
-
-            assertNotNull(qry);
-
-            // Parse
-            SqlNode sqlNode = planner.parse(qry);
-
-            // Validate
-            sqlNode = planner.validate(sqlNode);
-
-            // Convert to Relational operators graph
-            relRoot = planner.rel(sqlNode);
-
-            RelNode rel = relRoot.rel;
-
-            assertNotNull(rel);
-
-            // Transformation chain
-            RelTraitSet desired = rel.getTraitSet()
-                .replace(IgniteConvention.INSTANCE)
-                .replace(IgniteDistributions.single())
-                .simplify();
-
-            planner.setDisabledRules(ImmutableSet.copyOf(disabledRules));
-
-            return planner.transform(PlannerPhase.OPTIMIZATION, desired, rel);
-        }
-    }
-
-    /** */
-    private List<UUID> intermediateMapping(@NotNull AffinityTopologyVersion topVer, boolean single, @Nullable Predicate<ClusterNode> filter) {
-        return single ? select(nodes, 0) : select(nodes, 0, 1, 2, 3);
-    }
-
-    /** */
-    private static <T> List<T> select(List<T> src, int... idxs) {
-        ArrayList<T> res = new ArrayList<>(idxs.length);
-
-        for (int idx : idxs)
-            res.add(src.get(idx));
-
-        return res;
-    }
-
-    /** */
-    private <Row> Row row(ExecutionContext<Row> ctx, ImmutableBitSet requiredColunms, Object... fields) {
-        Type[] types = new Type[fields.length];
-        for (int i = 0; i < fields.length; i++)
-            types[i] = fields[i] == null ? Object.class : fields[i].getClass();
-
-        if (requiredColunms == null) {
-            for (int i = 0; i < fields.length; i++)
-                types[i] = fields[i] == null ? Object.class : fields[i].getClass();
-        }
-        else {
-            for (int i = 0, j = requiredColunms.nextSetBit(0); j != -1; j = requiredColunms.nextSetBit(j + 1), i++)
-                types[i] = fields[i] == null ? Object.class : fields[i].getClass();
-        }
-
-        return ctx.rowHandler().factory(types).create(fields);
-    }
-
-    /** */
-    private abstract static class TestTable implements IgniteTable {
-        /** */
-        private final RelProtoDataType protoType;
-
-        /** */
-        private final Map<String, IgniteIndex> indexes = new HashMap<>();
-
-        /** */
-        private final RewindabilityTrait rewindable;
-
-        /** */
-        private final double rowCnt;
-
-        /** */
-        private TestTable(RelDataType type) {
-            this(type, RewindabilityTrait.REWINDABLE);
-        }
-
-        /** */
-        private TestTable(RelDataType type, RewindabilityTrait rewindable) {
-            this(type, rewindable, 100.0);
-        }
-
-        /** */
-        private TestTable(RelDataType type, RewindabilityTrait rewindable, double rowCnt) {
-            protoType = RelDataTypeImpl.proto(type);
-            this.rewindable = rewindable;
-            this.rowCnt = rowCnt;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteLogicalTableScan toRel(RelOptCluster cluster, RelOptTable relOptTbl) {
-            RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE)
-                .replaceIf(RewindabilityTraitDef.INSTANCE, () -> rewindable)
-                .replaceIf(DistributionTraitDef.INSTANCE, this::distribution);
-
-            return IgniteLogicalTableScan.create(cluster, traitSet, relOptTbl, null, null, null);
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteLogicalIndexScan toRel(RelOptCluster cluster, RelOptTable relOptTbl, String idxName) {
-            RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE)
-                .replaceIf(DistributionTraitDef.INSTANCE, this::distribution)
-                .replaceIf(RewindabilityTraitDef.INSTANCE, () -> rewindable)
-                .replaceIf(RelCollationTraitDef.INSTANCE, getIndex(idxName)::collation);
-
-            return IgniteLogicalIndexScan.create(cluster, traitSet, relOptTbl, idxName, null, null, null);
-        }
-
-        /** {@inheritDoc} */
-        @Override public RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet bitSet) {
-            RelDataType rowType = protoType.apply(typeFactory);
-
-            if (bitSet != null) {
-                RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(typeFactory);
-                for (int i = bitSet.nextSetBit(0); i != -1; i = bitSet.nextSetBit(i + 1))
-                    b.add(rowType.getFieldList().get(i));
-                rowType = b.build();
-            }
-
-            return rowType;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Statistic getStatistic() {
-            return new Statistic() {
-                /** {@inheritDoc */
-                @Override public Double getRowCount() {
-                    return rowCnt;
-                }
-
-                /** {@inheritDoc */
-                @Override public boolean isKey(ImmutableBitSet cols) {
-                    return false;
-                }
-
-                /** {@inheritDoc */
-                @Override public List<ImmutableBitSet> getKeys() {
-                    throw new AssertionError();
-                }
-
-                /** {@inheritDoc */
-                @Override public List<RelReferentialConstraint> getReferentialConstraints() {
-                    throw new AssertionError();
-                }
-
-                /** {@inheritDoc */
-                @Override public List<RelCollation> getCollations() {
-                    return Collections.emptyList();
-                }
-
-                /** {@inheritDoc */
-                @Override public RelDistribution getDistribution() {
-                    throw new AssertionError();
-                }
-            };
-        }
-
-        /** {@inheritDoc} */
-        @Override public <Row> Iterable<Row> scan(
-            ExecutionContext<Row> execCtx,
-            CollocationGroup group, Predicate<Row> filter,
-            Function<Row, Row> transformer,
-            ImmutableBitSet bitSet
-        ) {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Schema.TableType getJdbcTableType() {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isRolledUp(String col) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean rolledUpColumnValidInsideAgg(
-            String column,
-            SqlCall call,
-            SqlNode parent,
-            CalciteConnectionConfig config
-        ) {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public CollocationGroup colocationGroup(PlanningContext ctx) {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteDistribution distribution() {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public TableDescriptor descriptor() {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Map<String, IgniteIndex> indexes() {
-            return Collections.unmodifiableMap(indexes);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void addIndex(IgniteIndex idxTbl) {
-            indexes.put(idxTbl.name(), idxTbl);
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteIndex getIndex(String idxName) {
-            return indexes.get(idxName);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void removeIndex(String idxName) {
-            throw new AssertionError();
-        }
-    }
-
-    /** */
-    private static class TestMessageServiceImpl extends MessageServiceImpl {
-        /** */
-        private final TestIoManager mgr;
-
-        /** */
-        private TestMessageServiceImpl(GridTestKernalContext kernal, TestIoManager mgr) {
-            super(kernal);
-            this.mgr = mgr;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void send(UUID nodeId, CalciteMessage msg) {
-            mgr.send(localNodeId(), nodeId, msg);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean alive(UUID nodeId) {
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void prepareMarshal(Message msg) {
-            // No-op;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void prepareUnmarshal(Message msg) {
-            // No-op;
-        }
-    }
-
-    /** */
-    private class TestFailureProcessor extends FailureProcessor {
-        /** */
-        private TestFailureProcessor(GridTestKernalContext kernal) {
-            super(kernal);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean process(FailureContext failureCtx) {
-            Throwable ex = failureContext().error();
-            log().error(ex.getMessage(), ex);
-
-            lastE = ex;
 
-            return true;
+            checkSplitAndSerialization(phys, publicSchema);
         }
     }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableSpoolPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableSpoolPlannerTest.java
new file mode 100644
index 0000000..c472142
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TableSpoolPlannerTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableSpool;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.junit.Test;
+
+/**
+ * Table spool test.
+ */
+@SuppressWarnings({"TooBroadScope", "FieldCanBeLocal", "TypeMayBeWeakened"})
+public class TableSpoolPlannerTest extends AbstractPlannerTest {
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void tableSpoolDistributed() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable t0 = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("JID", f.createJavaType(Integer.class))
+                .add("VAL", f.createJavaType(String.class))
+                .build()) {
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "T0", "hash");
+            }
+        };
+
+        TestTable t1 = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("JID", f.createJavaType(Integer.class))
+                .add("VAL", f.createJavaType(String.class))
+                .build()) {
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.affinity(0, "T1", "hash");
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("T0", t0);
+        publicSchema.addTable("T1", t1);
+
+        String sql = "select * " +
+            "from t0 " +
+            "join t1 on t0.jid = t1.jid";
+
+        RelNode phys = physicalPlan(sql, publicSchema,
+            "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule");
+
+        assertNotNull(phys);
+
+        IgniteTableSpool tblSpool = findFirstNode(phys, byClass(IgniteTableSpool.class));
+
+        assertNotNull("Invalid plan:\n" + RelOptUtil.toString(phys), tblSpool);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void tableSpoolBroadcastNotRewindable() throws Exception {
+        IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
+
+        TestTable t0 = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("JID", f.createJavaType(Integer.class))
+                .add("VAL", f.createJavaType(String.class))
+                .build(),
+            RewindabilityTrait.ONE_WAY) {
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        TestTable t1 = new TestTable(
+            new RelDataTypeFactory.Builder(f)
+                .add("ID", f.createJavaType(Integer.class))
+                .add("JID", f.createJavaType(Integer.class))
+                .add("VAL", f.createJavaType(String.class))
+                .build(),
+            RewindabilityTrait.ONE_WAY) {
+
+            @Override public IgniteDistribution distribution() {
+                return IgniteDistributions.broadcast();
+            }
+        };
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+
+        publicSchema.addTable("T0", t0);
+        publicSchema.addTable("T1", t1);
+
+        String sql = "select * " +
+            "from t0 " +
+            "join t1 on t0.jid = t1.jid";
+
+        RelNode phys = physicalPlan(sql, publicSchema,
+            "MergeJoinConverter", "NestedLoopJoinConverter", "FilterSpoolMergeRule");
+
+        assertNotNull(phys);
+
+        IgniteTableSpool tblSpool = findFirstNode(phys, byClass(IgniteTableSpool.class));
+
+        assertNotNull("Invalid plan:\n" + RelOptUtil.toString(phys), tblSpool);
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
similarity index 53%
copy from modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
copy to modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
index c346ada..4de7817 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
@@ -17,22 +17,12 @@
 
 package org.apache.ignite.testsuites;
 
-import org.apache.ignite.internal.processors.query.calcite.CalciteBasicSecondaryIndexIntegrationTest;
-import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessorTest;
-import org.apache.ignite.internal.processors.query.calcite.CancelTest;
-import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
-import org.apache.ignite.internal.processors.query.calcite.LimitOffsetTest;
-import org.apache.ignite.internal.processors.query.calcite.PlannerTest;
-import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
-import org.apache.ignite.internal.processors.query.calcite.TableSpoolTest;
-import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ExecutionTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinExecutionTest;
-import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcQueryTest;
-import org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest;
-import org.apache.ignite.internal.processors.query.calcite.rules.ProjectScanMergeRuleTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolExecutionTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -41,22 +31,12 @@ import org.junit.runners.Suite;
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
-    PlannerTest.class,
-    OrToUnionRuleTest.class,
-    ProjectScanMergeRuleTest.class,
     ExecutionTest.class,
+    ContinuousExecutionTest.class,
     MergeJoinExecutionTest.class,
     NestedLoopJoinExecutionTest.class,
-    ClosableIteratorsHolderTest.class,
-    ContinuousExecutionTest.class,
-    CalciteQueryProcessorTest.class,
-    JdbcQueryTest.class,
-    CalciteBasicSecondaryIndexIntegrationTest.class,
-    CancelTest.class,
-    QueryCheckerTest.class,
-    DateTimeTest.class,
-    TableSpoolTest.class,
-    LimitOffsetTest.class
+    TableSpoolExecutionTest.class,
+    IndexSpoolExecutionTest.class,
 })
-public class IgniteCalciteTestSuite {
+public class ExecutionTestSuite {
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index c346ada..a372500 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -22,14 +22,9 @@ import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor
 import org.apache.ignite.internal.processors.query.calcite.CancelTest;
 import org.apache.ignite.internal.processors.query.calcite.DateTimeTest;
 import org.apache.ignite.internal.processors.query.calcite.LimitOffsetTest;
-import org.apache.ignite.internal.processors.query.calcite.PlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
-import org.apache.ignite.internal.processors.query.calcite.TableSpoolTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.rel.ContinuousExecutionTest;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.ExecutionTest;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.MergeJoinExecutionTest;
-import org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJoinExecutionTest;
 import org.apache.ignite.internal.processors.query.calcite.jdbc.JdbcQueryTest;
 import org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest;
 import org.apache.ignite.internal.processors.query.calcite.rules.ProjectScanMergeRuleTest;
@@ -41,12 +36,10 @@ import org.junit.runners.Suite;
  */
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
-    PlannerTest.class,
+    PlannerTestSuite.class,
+    ExecutionTestSuite.class,
     OrToUnionRuleTest.class,
     ProjectScanMergeRuleTest.class,
-    ExecutionTest.class,
-    MergeJoinExecutionTest.class,
-    NestedLoopJoinExecutionTest.class,
     ClosableIteratorsHolderTest.class,
     ContinuousExecutionTest.class,
     CalciteQueryProcessorTest.class,
@@ -55,7 +48,6 @@ import org.junit.runners.Suite;
     CancelTest.class,
     QueryCheckerTest.class,
     DateTimeTest.class,
-    TableSpoolTest.class,
     LimitOffsetTest.class
 })
 public class IgniteCalciteTestSuite {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
new file mode 100644
index 0000000..ad329f1
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.IndexSpoolPlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Calcite tests.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    PlannerTest.class,
+    CorrelatedNestedLoopJoinPlannerTest.class,
+    TableSpoolPlannerTest.class,
+    IndexSpoolPlannerTest.class
+})
+public class PlannerTestSuite {
+}