You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2022/02/15 06:48:01 UTC

[ignite] branch sql-calcite updated: IGNITE-16111 Index rebuild handling - Fixes #9671.

This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new 778b9a2  IGNITE-16111 Index rebuild handling - Fixes #9671.
778b9a2 is described below

commit 778b9a2c64101508d95bcab34b5dc18f2111e73f
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Tue Feb 15 09:44:59 2022 +0300

    IGNITE-16111 Index rebuild handling - Fixes #9671.
    
    Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
 .../query/calcite/exec/LogicalRelImplementor.java  | 103 ++++++-
 .../query/calcite/prepare/QueryPlanCacheImpl.java  |  14 +-
 .../query/calcite/rel/IgniteIndexScan.java         |  45 +--
 .../calcite/rule/LogicalScanConverterRule.java     |  15 +-
 .../calcite/rule/logical/ExposeIndexRule.java      |   3 +
 .../calcite/rule/logical/FilterScanMergeRule.java  |  15 +-
 .../calcite/rule/logical/ProjectScanMergeRule.java |  18 +-
 .../query/calcite/schema/CacheTableImpl.java       |  13 +
 .../query/calcite/schema/IgniteTable.java          |  14 +
 .../query/calcite/schema/SchemaHolderImpl.java     |  22 ++
 .../query/calcite/schema/SystemViewTableImpl.java  |  10 +
 .../calcite/exec/LogicalRelImplementorTest.java    | 329 +++++++++++++++++++++
 .../integration/AbstractBasicIntegrationTest.java  |   7 +-
 .../integration/IndexRebuildIntegrationTest.java   | 316 ++++++++++++++++++++
 .../planner/AbstractAggregatePlannerTest.java      |   2 +-
 .../query/calcite/planner/AbstractPlannerTest.java | 211 +------------
 .../calcite/planner/IndexRebuildPlannerTest.java   |  93 ++++++
 .../query/calcite/planner/TestTable.java           | 242 +++++++++++++++
 .../ignite/testsuites/IgniteCalciteTestSuite.java  |   2 +
 .../ignite/testsuites/IntegrationTestSuite.java    |   2 +
 .../apache/ignite/testsuites/PlannerTestSuite.java |   2 +
 .../query/schema/SchemaChangeListener.java         |  16 +
 .../processors/query/h2/IgniteH2Indexing.java      |  16 +-
 .../processors/query/h2/SchemaManager.java         |  35 +++
 24 files changed, 1282 insertions(+), 263 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 4be5e6c..12d02f7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
@@ -91,6 +92,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteReduceS
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleHashAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteSingleSortAggregate;
 import org.apache.ignite.internal.processors.query.calcite.rel.set.IgniteSetOp;
+import org.apache.ignite.internal.processors.query.calcite.rule.LogicalScanConverterRule;
 import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
@@ -99,6 +101,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribut
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
 import org.apache.ignite.internal.util.typedef.F;
 
 import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
@@ -298,13 +301,107 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> {
         Supplier<Row> upper = upperCond == null ? null : expressionFactory.rowSource(upperCond);
         Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, rowType);
 
+        ColocationGroup grp = ctx.group(rel.sourceId());
+
         IgniteIndex idx = tbl.getIndex(rel.indexName());
 
-        ColocationGroup group = ctx.group(rel.sourceId());
+        if (idx != null && !tbl.isIndexRebuildInProgress()) {
+            Iterable<Row> rowsIter = idx.scan(ctx, grp, filters, lower, upper, prj, requiredColumns);
+
+            return new ScanNode<>(ctx, rowType, rowsIter);
+        }
+        else {
+            // Index was invalidated after planning, workaround through table-scan -> sort -> index spool.
+            // If there are correlates in filter or project, spool node is required to provide ability to rewind input.
+            // Sort node is required if output should be sorted or if spool node required (to provide search by
+            // index conditions).
+            // Additionally, project node is required in case of spool inserted, since spool requires unmodified
+            // original input for filtering by index conditions.
+            boolean filterHasCorrelation = condition != null && RexUtils.hasCorrelation(condition);
+            boolean projectHasCorrelation = projects != null && RexUtils.hasCorrelation(projects);
+            boolean spoolNodeRequired = projectHasCorrelation || filterHasCorrelation;
+            boolean projNodeRequired = projects != null && spoolNodeRequired;
+
+            Iterable<Row> rowsIter = tbl.scan(
+                ctx,
+                grp,
+                filterHasCorrelation ? null : filters,
+                projNodeRequired ? null : prj,
+                requiredColumns
+            );
 
-        Iterable<Row> rowsIter = idx.scan(ctx, group, filters, lower, upper, prj, requiredColumns);
+            // If there are projects in the scan node - after the scan we already have target row type.
+            if (!spoolNodeRequired && projects != null)
+                rowType = rel.getRowType();
 
-        return new ScanNode<>(ctx, rowType, rowsIter);
+            Node<Row> node = new ScanNode<>(ctx, rowType, rowsIter);
+
+            RelCollation collation = rel.collation();
+
+            if ((!spoolNodeRequired && projects != null) || requiredColumns != null) {
+                collation = collation.apply(LogicalScanConverterRule.createMapping(
+                    spoolNodeRequired ? null : projects,
+                    requiredColumns,
+                    tbl.getRowType(typeFactory).getFieldCount()
+                ));
+            }
+
+            boolean sortNodeRequired = !collation.getFieldCollations().isEmpty();
+
+            if (sortNodeRequired) {
+                SortNode<Row> sortNode = new SortNode<>(ctx, rowType, expressionFactory.comparator(collation));
+
+                sortNode.register(node);
+
+                node = sortNode;
+            }
+
+            if (spoolNodeRequired) {
+                if (lowerCond != null || upperCond != null) {
+                    if (requiredColumns != null) {
+                        // Remap index find predicate according to rowType of the spool.
+                        int cardinality = requiredColumns.cardinality();
+                        List<RexNode> remappedLowerCond = lowerCond != null ? new ArrayList<>(cardinality) : null;
+                        List<RexNode> remappedUpperCond = upperCond != null ? new ArrayList<>(cardinality) : null;
+
+                        for (int i = requiredColumns.nextSetBit(0); i != -1; i = requiredColumns.nextSetBit(i + 1)) {
+                            if (remappedLowerCond != null)
+                                remappedLowerCond.add(lowerCond.get(i));
+
+                            if (remappedUpperCond != null)
+                                remappedUpperCond.add(upperCond.get(i));
+                        }
+
+                        lower = remappedLowerCond == null ? null : expressionFactory.rowSource(remappedLowerCond);
+                        upper = remappedUpperCond == null ? null : expressionFactory.rowSource(remappedUpperCond);
+                    }
+                }
+
+                IndexSpoolNode<Row> spoolNode = IndexSpoolNode.createTreeSpool(
+                    ctx,
+                    rowType,
+                    collation,
+                    expressionFactory.comparator(collation),
+                    filterHasCorrelation ? filters : null, // Not correlated filter included into table scan.
+                    lower,
+                    upper
+                );
+
+                spoolNode.register(node);
+
+                node = spoolNode;
+            }
+
+            if (projNodeRequired) {
+                ProjectNode<Row> projectNode = new ProjectNode<>(ctx, rel.getRowType(), prj);
+
+                projectNode.register(node);
+
+                node = projectNode;
+            }
+
+            return node;
+        }
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
index 87a8a88..37d8296 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
@@ -116,8 +116,18 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach
     }
 
     /** {@inheritDoc} */
+    @Override public void onIndexRebuildStarted(String schemaName, String tblName) {
+        clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIndexRebuildFinished(String schemaName, String tblName) {
+        clear();
+    }
+
+    /** {@inheritDoc} */
     @Override public void onSchemaCreated(String schemaName) {
-        // No-op
+        // No-op.
     }
 
     /** {@inheritDoc} */
@@ -126,7 +136,7 @@ public class QueryPlanCacheImpl extends AbstractService implements QueryPlanCach
         GridQueryTypeDescriptor typeDesc,
         GridCacheContextInfo<?, ?> cacheInfo
     ) {
-        // No-op
+        // No-op.
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
index 339454d..9817440 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteIndexScan.java
@@ -18,11 +18,11 @@
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
 import java.util.List;
-
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rex.RexNode;
@@ -39,6 +39,9 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
     /** */
     private final long sourceId;
 
+    /** Index collation. Required only for rewriting index scan to table scan + sort in case of index rebuild. */
+    private final RelCollation collation;
+
     /**
      * Constructor used for deserialization.
      *
@@ -47,6 +50,8 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
     public IgniteIndexScan(RelInput input) {
         super(changeTraits(input, IgniteConvention.INSTANCE));
 
+        collation = input.getCollation();
+
         Object srcIdObj = input.get("sourceId");
         if (srcIdObj != null)
             sourceId = ((Number)srcIdObj).longValue();
@@ -60,24 +65,10 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
      * @param traits Traits of this relational expression
      * @param tbl Table definition.
      * @param idxName Index name.
-     */
-    public IgniteIndexScan(
-        RelOptCluster cluster,
-        RelTraitSet traits,
-        RelOptTable tbl,
-        String idxName) {
-        this(cluster, traits, tbl, idxName, null, null, null, null);
-    }
-
-    /**
-     * Creates a IndexScan.
-     * @param cluster Cluster that this relational expression belongs to
-     * @param traits Traits of this relational expression
-     * @param tbl Table definition.
-     * @param idxName Index name.
      * @param proj Projects.
      * @param cond Filters.
      * @param requiredCols Participating columns.
+     * @param collation Index collation.
      */
     public IgniteIndexScan(
         RelOptCluster cluster,
@@ -87,9 +78,10 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
         @Nullable List<RexNode> proj,
         @Nullable RexNode cond,
         @Nullable IndexConditions idxCond,
-        @Nullable ImmutableBitSet requiredCols
+        @Nullable ImmutableBitSet requiredCols,
+        RelCollation collation
     ) {
-        this(-1L, cluster, traits, tbl, idxName, proj, cond, idxCond, requiredCols);
+        this(-1L, cluster, traits, tbl, idxName, proj, cond, idxCond, requiredCols, collation);
     }
 
     /**
@@ -101,6 +93,7 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
      * @param proj Projects.
      * @param cond Filters.
      * @param requiredCols Participating colunms.
+     * @param collation Index collation.
      */
     private IgniteIndexScan(
         long sourceId,
@@ -111,11 +104,13 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
         @Nullable List<RexNode> proj,
         @Nullable RexNode cond,
         @Nullable IndexConditions idxCond,
-        @Nullable ImmutableBitSet requiredCols
+        @Nullable ImmutableBitSet requiredCols,
+        RelCollation collation
     ) {
         super(cluster, traits, ImmutableList.of(), tbl, idxName, proj, cond, idxCond, requiredCols);
 
         this.sourceId = sourceId;
+        this.collation = collation;
     }
 
     /** {@inheritDoc} */
@@ -126,7 +121,8 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
     /** {@inheritDoc} */
     @Override protected RelWriter explainTerms0(RelWriter pw) {
         return super.explainTerms0(pw)
-            .itemIf("sourceId", sourceId, sourceId != -1);
+            .itemIf("sourceId", sourceId, sourceId != -1)
+            .item("collation", collation());
     }
 
     /** {@inheritDoc} */
@@ -137,12 +133,17 @@ public class IgniteIndexScan extends AbstractIndexScan implements SourceAwareIgn
     /** {@inheritDoc} */
     @Override public IgniteRel clone(long sourceId) {
         return new IgniteIndexScan(sourceId, getCluster(), getTraitSet(), getTable(),
-            idxName, projects, condition, idxCond, requiredColumns);
+            idxName, projects, condition, idxCond, requiredColumns, collation);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
         return new IgniteIndexScan(sourceId, cluster, getTraitSet(), getTable(),
-            idxName, projects, condition, idxCond, requiredColumns);
+            idxName, projects, condition, idxCond, requiredColumns, collation);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelCollation collation() {
+        return collation;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
index ef26e4c..348f20f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/LogicalScanConverterRule.java
@@ -62,10 +62,16 @@ public abstract class LogicalScanConverterRule<T extends ProjectableFilterableTa
             ) {
                 RelOptCluster cluster = rel.getCluster();
                 IgniteTable table = rel.getTable().unwrap(IgniteTable.class);
-                IgniteIndex index = table.getIndex(rel.indexName());
+                IgniteIndex idx = table.getIndex(rel.indexName());
+
+                if (table.isIndexRebuildInProgress()) {
+                    cluster.getPlanner().prune(rel);
+
+                    return null;
+                }
 
                 RelDistribution distribution = table.distribution();
-                RelCollation collation = index.collation();
+                RelCollation collation = idx.collation();
 
                 if (rel.projects() != null || rel.requiredColumns() != null) {
                     Mappings.TargetMapping mapping = createMapping(
@@ -101,7 +107,8 @@ public abstract class LogicalScanConverterRule<T extends ProjectableFilterableTa
                     rel.projects(),
                     rel.condition(),
                     rel.indexConditions(),
-                    rel.requiredColumns()
+                    rel.requiredColumns(),
+                    idx.collation()
                 );
             }
         };
@@ -154,7 +161,7 @@ public abstract class LogicalScanConverterRule<T extends ProjectableFilterableTa
     }
 
     /** */
-    private static Mappings.TargetMapping createMapping(
+    public static Mappings.TargetMapping createMapping(
         List<RexNode> projects,
         ImmutableBitSet requiredColumns,
         int tableRowSize
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ExposeIndexRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ExposeIndexRule.java
index 620b009..9fbed19 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ExposeIndexRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ExposeIndexRule.java
@@ -68,6 +68,9 @@ public class ExposeIndexRule extends RelRule<ExposeIndexRule.Config> {
         RexNode condition = scan.condition();
         ImmutableBitSet requiredCols = scan.requiredColumns();
 
+        if (igniteTable.isIndexRebuildInProgress())
+            return;
+
         List<IgniteLogicalIndexScan> indexes = igniteTable.indexes().values().stream()
             .map(idx -> idx.toRel(cluster, optTable, proj, condition, requiredCols))
             .collect(Collectors.toList());
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java
index c6c665b..b8a8e62 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/FilterScanMergeRule.java
@@ -31,9 +31,11 @@ import org.apache.calcite.rex.RexUtil;
 import org.apache.ignite.internal.processors.query.calcite.rel.ProjectableFilterableTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Rule that pushes filter into the scan. This might be useful for index range scans.
@@ -91,11 +93,14 @@ public abstract class FilterScanMergeRule<T extends ProjectableFilterableTableSc
 
         RelNode res = createNode(cluster, scan, trait, condition);
 
+        if (res == null)
+            return;
+
         call.transformTo(res);
     }
 
     /** */
-    protected abstract T createNode(RelOptCluster cluster, T scan, RelTraitSet traits, RexNode cond);
+    protected abstract @Nullable T createNode(RelOptCluster cluster, T scan, RelTraitSet traits, RexNode cond);
 
     /** */
     private static class FilterIndexScanMergeRule extends FilterScanMergeRule<IgniteLogicalIndexScan> {
@@ -105,12 +110,18 @@ public abstract class FilterScanMergeRule<T extends ProjectableFilterableTableSc
         }
 
         /** {@inheritDoc} */
-        @Override protected IgniteLogicalIndexScan createNode(
+        @Override protected @Nullable IgniteLogicalIndexScan createNode(
             RelOptCluster cluster,
             IgniteLogicalIndexScan scan,
             RelTraitSet traits,
             RexNode cond
         ) {
+            if (scan.getTable().unwrap(IgniteTable.class).isIndexRebuildInProgress()) {
+                cluster.getPlanner().prune(scan);
+
+                return null;
+            }
+
             return IgniteLogicalIndexScan.create(cluster, traits, scan.getTable(), scan.indexName(),
                 scan.projects(), cond, scan.requiredColumns());
         }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java
index f981196..bfe2019 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/logical/ProjectScanMergeRule.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactor
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
 import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
 
 /** */
 @Value.Enclosing
@@ -53,7 +54,7 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
     public static final RelOptRule TABLE_SCAN_SKIP_CORRELATED = Config.TABLE_SCAN_SKIP_CORRELATED.toRule();
 
     /** */
-    protected abstract T createNode(
+    protected abstract @Nullable T createNode(
         RelOptCluster cluster,
         T scan,
         RelTraitSet traits,
@@ -139,7 +140,12 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
         if (RexUtils.isIdentity(projects, tbl.getRowType(typeFactory, requiredColumns), true))
             projects = null;
 
-        call.transformTo(createNode(cluster, scan, traits, projects, cond, requiredColumns));
+        T res = createNode(cluster, scan, traits, projects, cond, requiredColumns);
+
+        if (res == null)
+            return;
+
+        call.transformTo(res);
 
         if (!RexUtils.hasCorrelation(relProject.getProjects()))
             cluster.getPlanner().prune(relProject);
@@ -188,7 +194,7 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
         }
 
         /** {@inheritDoc} */
-        @Override protected IgniteLogicalIndexScan createNode(
+        @Override protected @Nullable IgniteLogicalIndexScan createNode(
             RelOptCluster cluster,
             IgniteLogicalIndexScan scan,
             RelTraitSet traits,
@@ -196,6 +202,12 @@ public abstract class ProjectScanMergeRule<T extends ProjectableFilterableTableS
             RexNode cond,
             ImmutableBitSet requiredColumns
         ) {
+            if (scan.getTable().unwrap(IgniteTable.class).isIndexRebuildInProgress()) {
+                cluster.getPlanner().prune(scan);
+
+                return null;
+            }
+
             return IgniteLogicalIndexScan.create(
                 cluster,
                 traits,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
index 2a3566a..1d03652 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
@@ -60,6 +60,9 @@ public class CacheTableImpl extends AbstractTable implements IgniteCacheTable {
     /** */
     private final Map<String, IgniteIndex> indexes = new ConcurrentHashMap<>();
 
+    /** */
+    private volatile boolean idxRebuildInProgress;
+
     /**
      * @param ctx Kernal context.
      * @param desc Table descriptor.
@@ -152,6 +155,16 @@ public class CacheTableImpl extends AbstractTable implements IgniteCacheTable {
     }
 
     /** {@inheritDoc} */
+    @Override public void markIndexRebuildInProgress(boolean mark) {
+        idxRebuildInProgress = mark;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isIndexRebuildInProgress() {
+        return idxRebuildInProgress;
+    }
+
+    /** {@inheritDoc} */
     @Override public <C> C unwrap(Class<C> aCls) {
         if (aCls.isInstance(desc))
             return aCls.cast(desc);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index d67c891..b42f4fa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -141,4 +141,18 @@ public interface IgniteTable extends TranslatableTable {
      * Is table modifiable.
      */
     boolean isModifiable();
+
+    /**
+     * Mark table for index rebuild.
+     *
+     * @param mark Mark/unmark flag, {@code true} if index rebuild started, {@code false} if finished.
+     */
+    void markIndexRebuildInProgress(boolean mark);
+
+    /**
+     * Returns index rebuild flag.
+     *
+     * @param {@code True} if index rebuild in progress.
+     */
+    boolean isIndexRebuildInProgress();
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
index 69876e9..f755c55 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SchemaHolderImpl.java
@@ -266,6 +266,28 @@ public class SchemaHolderImpl extends AbstractService implements SchemaHolder, S
     }
 
     /** {@inheritDoc} */
+    @Override public void onIndexRebuildStarted(String schemaName, String tblName) {
+        IgniteSchema schema = igniteSchemas.get(schemaName);
+        assert schema != null;
+
+        IgniteTable tbl = (IgniteTable)schema.getTable(tblName);
+        assert tbl != null;
+
+        tbl.markIndexRebuildInProgress(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIndexRebuildFinished(String schemaName, String tblName) {
+        IgniteSchema schema = igniteSchemas.get(schemaName);
+        assert schema != null;
+
+        IgniteTable tbl = (IgniteTable)schema.getTable(tblName);
+        assert tbl != null;
+
+        tbl.markIndexRebuildInProgress(false);
+    }
+
+    /** {@inheritDoc} */
     @Override public void onFunctionCreated(String schemaName, String name, Method method) {
         IgniteSchema schema = igniteSchemas.computeIfAbsent(schemaName, IgniteSchema::new);
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
index 3f1056c..fee4ca5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
@@ -153,6 +153,16 @@ public class SystemViewTableImpl extends AbstractTable implements IgniteTable {
         return false;
     }
 
+    /** {@inheritDoc} */
+    @Override public void markIndexRebuildInProgress(boolean mark) {
+        throw new AssertionError("Index rebuild in progress was marked for system view");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isIndexRebuildInProgress() {
+        return false;
+    }
+
     /** */
     private static class StatisticsImpl implements Statistic {
         /** {@inheritDoc} */
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
new file mode 100644
index 0000000..b3d94b9
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementorTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mappings;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortNode;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.planner.TestTable;
+import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.calcite.tools.Frameworks.createRootSchema;
+import static org.apache.calcite.tools.Frameworks.newConfigBuilder;
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+/**
+ * Test LogicalRelImplementor class.
+ */
+public class LogicalRelImplementorTest extends GridCommonAbstractTest {
+    /** */
+    @Test
+    public void testIndexScanRewriter() {
+        IgniteTypeFactory tf = Commons.typeFactory();
+
+        RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(tf);
+
+        RelDataType sqlTypeInt = tf.createSqlType(SqlTypeName.INTEGER);
+        RelDataType sqlTypeVarchar = tf.createSqlType(SqlTypeName.VARCHAR);
+
+        b.add("_KEY", tf.createJavaType(Object.class));
+        b.add("_VAL", tf.createJavaType(Object.class));
+        b.add("ID", sqlTypeInt);
+        b.add("VAL", sqlTypeVarchar);
+
+        RelDataType rowType = b.build();
+
+        ScanAwareTable tbl = new ScanAwareTable(rowType);
+
+        tbl.addIndex("IDX", 2);
+
+        IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
+        publicSchema.addTable("TBL", tbl);
+
+        BaseQueryContext qctx = BaseQueryContext.builder()
+            .frameworkConfig(
+                newConfigBuilder(FRAMEWORK_CONFIG)
+                    .defaultSchema(createRootSchema(false).add(publicSchema.getName(), publicSchema))
+                    .build()
+            )
+            .logger(log)
+            .build();
+
+        UUID nodeId = UUID.randomUUID();
+
+        ExecutionContext<Object[]> ectx = new ExecutionContext<Object[]>(
+            qctx,
+            null,
+            null,
+            nodeId,
+            nodeId,
+            null,
+            null,
+            ArrayRowHandler.INSTANCE,
+            null
+        ) {
+            @Override public ColocationGroup group(long srcId) {
+                return ColocationGroup.forNodes(Collections.singletonList(nodeId));
+            }
+        };
+
+        LogicalRelImplementor<Object[]> relImplementor = new LogicalRelImplementor<>(
+            ectx,
+            null,
+            null,
+            null,
+            null
+        );
+
+        // Construct relational operator corresponding to SQL: "SELECT val, id, id + 1 FROM TBL WHERE id = 1"
+        RelOptCluster cluster = Commons.emptyCluster();
+        RexBuilder rexBuilder = cluster.getRexBuilder();
+
+        // Projects, filters and required columns.
+        List<RexNode> project = F.asList(
+            rexBuilder.makeLocalRef(sqlTypeVarchar, 1),
+            rexBuilder.makeLocalRef(sqlTypeInt, 0),
+            rexBuilder.makeCall(SqlStdOperatorTable.PLUS,
+                rexBuilder.makeLocalRef(sqlTypeInt, 0),
+                rexBuilder.makeLiteral(1, sqlTypeInt))
+        );
+
+        RexNode filter = rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+            rexBuilder.makeLocalRef(sqlTypeInt, 0),
+            rexBuilder.makeLiteral(1, sqlTypeInt)
+        );
+
+        ImmutableBitSet requiredColumns = ImmutableBitSet.of(2, 3);
+
+        // Collations.
+        RelCollation idxCollation = tbl.getIndex("IDX").collation();
+
+        RelCollation colCollation = idxCollation.apply(Mappings.target(requiredColumns.asList(),
+            tbl.getRowType(tf).getFieldCount()));
+
+        RelCollation projCollation = TraitUtils.projectCollation(colCollation, RexUtils.replaceLocalRefs(project),
+            tbl.getRowType(tf, requiredColumns));
+
+        RelCollation emptyCollation = RelCollations.of();
+
+        // Correlated projects and filters.
+        RexShuttle replaceLiteralToCorr = new RexShuttle() {
+            @Override public RexNode visitLiteral(RexLiteral literal) {
+                return rexBuilder.makeFieldAccess(
+                    rexBuilder.makeCorrel(tbl.getRowType(tf), new CorrelationId(0)), "ID", false);
+            }
+        };
+
+        RexNode corrFilter = replaceLiteralToCorr.apply(filter);
+        List<RexNode> corrProject = F.asList(project.get(0), project.get(1), replaceLiteralToCorr.apply(project.get(2)));
+
+        tbl.markIndexRebuildInProgress(true);
+
+        Predicate<Node<Object[]>> isScanNoFilterNoProject =
+            node -> node instanceof ScanNode && !tbl.lastScanHasFilter && !tbl.lastScanHasProject;
+        Predicate<Node<Object[]>> isScanWithFilterNoProject =
+            node -> node instanceof ScanNode && tbl.lastScanHasFilter && !tbl.lastScanHasProject;
+        Predicate<Node<Object[]>> isScanWithProjectNoFilter =
+            node -> node instanceof ScanNode && !tbl.lastScanHasFilter && tbl.lastScanHasProject;
+        Predicate<Node<Object[]>> isScanWithFilterWithProject =
+            node -> node instanceof ScanNode && tbl.lastScanHasFilter && tbl.lastScanHasProject;
+
+        Predicate<Node<Object[]>> isSort = node -> node instanceof SortNode;
+        Predicate<Node<Object[]>> isSpool = node -> node instanceof IndexSpoolNode;
+        Predicate<Node<Object[]>> isProj = node -> node instanceof ProjectNode;
+
+        IgniteIndexScan templateScan = new IgniteIndexScan(
+            cluster,
+            cluster.traitSet(),
+            qctx.catalogReader().getTable(F.asList("PUBLIC", "TBL")),
+            "IDX",
+            project,
+            filter,
+            RexUtils.buildSortedIndexConditions(cluster, idxCollation, filter, rowType, requiredColumns),
+            requiredColumns,
+            idxCollation
+        );
+
+        IgniteIndexScan scan;
+
+        // IndexScan without filters and projects transforms to scan and sort.
+        scan = createScan(templateScan, idxCollation, null, null, null);
+        checkNodesChain(relImplementor, scan, isSort, isScanNoFilterNoProject);
+
+        scan = createScan(templateScan, projCollation, null, null, requiredColumns);
+        checkNodesChain(relImplementor, scan, isSort, isScanNoFilterNoProject);
+
+        // IndexScan with simple filters and projects transforms to scan and sort.
+        scan = createScan(templateScan, projCollation, project, filter, requiredColumns);
+        checkNodesChain(relImplementor, scan, isSort, isScanWithFilterWithProject);
+
+        scan = createScan(templateScan, colCollation, null, filter, requiredColumns);
+        checkNodesChain(relImplementor, scan, isSort, isScanWithFilterNoProject);
+
+        scan = createScan(templateScan, projCollation, project, null, requiredColumns);
+        checkNodesChain(relImplementor, scan, isSort, isScanWithProjectNoFilter);
+
+        // IndexScan with correlated filter without project transforms to scan, sort and spool.
+        scan = createScan(templateScan, projCollation, null, corrFilter, requiredColumns);
+        checkNodesChain(relImplementor, scan, isSpool, isSort, isScanNoFilterNoProject);
+
+        // IndexScan with correlated filter without project transforms to scan, sort and spool.
+        scan = createScan(templateScan, idxCollation, null, corrFilter, requiredColumns);
+        checkNodesChain(relImplementor, scan, isSpool, isSort, isScanNoFilterNoProject);
+
+        // IndexScan with correlated filter with project transforms to scan, sort, spool and project.
+        scan = createScan(templateScan, projCollation, project, corrFilter, requiredColumns);
+        checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanNoFilterNoProject);
+
+        // IndexScan with correlated project transforms to scan, sort, spool and project.
+        scan = createScan(templateScan, projCollation, corrProject, null, requiredColumns);
+        checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanNoFilterNoProject);
+
+        scan = createScan(templateScan, projCollation, corrProject, filter, requiredColumns);
+        checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanWithFilterNoProject);
+
+        scan = createScan(templateScan, projCollation, corrProject, corrFilter, requiredColumns);
+        checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanNoFilterNoProject);
+
+        // IndexScan with simple project without collation transforms to scan.
+        List<RexNode> unknownCollationProject = new ArrayList<>(1);
+        unknownCollationProject.add(corrProject.get(0)); // Field "val".
+
+        scan = createScan(templateScan, emptyCollation, unknownCollationProject, filter, requiredColumns);
+        checkNodesChain(relImplementor, scan, isScanWithFilterWithProject);
+
+        scan = createScan(templateScan, emptyCollation, unknownCollationProject, null, requiredColumns);
+        checkNodesChain(relImplementor, scan, isScanWithProjectNoFilter);
+
+        // IndexScan with correlated project without collation transforms to scan, sort, spool and project.
+        List<RexNode> unknownCollationCorrProject = new ArrayList<>(1);
+        unknownCollationCorrProject.add(corrProject.get(2)); // Field "id + $cor0.id".
+
+        scan = createScan(templateScan, emptyCollation, unknownCollationCorrProject, filter, requiredColumns);
+        checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanWithFilterNoProject);
+
+        scan = createScan(templateScan, emptyCollation, unknownCollationCorrProject, corrFilter, requiredColumns);
+        checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanNoFilterNoProject);
+
+        scan = createScan(templateScan, emptyCollation, unknownCollationCorrProject, null, requiredColumns);
+        checkNodesChain(relImplementor, scan, isProj, isSpool, isSort, isScanNoFilterNoProject);
+    }
+
+    /** */
+    private IgniteIndexScan createScan(
+        IgniteIndexScan templateScan,
+        RelCollation collation,
+        List<RexNode> projects,
+        RexNode filters,
+        ImmutableBitSet requiredColumns
+    ) {
+        return new IgniteIndexScan(
+            templateScan.getCluster(),
+            templateScan.getTraitSet().replace(collation),
+            templateScan.getTable(),
+            templateScan.indexName(),
+            projects,
+            filters,
+            templateScan.indexConditions(),
+            requiredColumns,
+            templateScan.collation()
+        );
+    }
+
+    /** */
+    private <Row> void checkNodesChain(
+        LogicalRelImplementor<Row> relImplementor,
+        IgniteIndexScan scan,
+        Predicate<Node<Row>>... predicates
+    ) {
+        Node<Row> node = relImplementor.visit(scan);
+
+        boolean lastFound = false;
+
+        for (Predicate<Node<Row>> predicate : predicates) {
+            assertFalse("Not enough nodes", lastFound);
+            assertTrue("Node " + node + " doesn't match predicate", predicate.test(node));
+
+            if (!F.isEmpty(node.sources()))
+                node = node.sources().get(0);
+            else
+                lastFound = true;
+        }
+
+        assertTrue("Too much nodes", lastFound);
+    }
+
+    /** */
+    private static class ScanAwareTable extends TestTable {
+        /** */
+        private volatile boolean lastScanHasFilter;
+
+        /** */
+        private volatile boolean lastScanHasProject;
+
+        /** */
+        public ScanAwareTable(RelDataType rowType) {
+            super(rowType);
+        }
+
+        /** {@inheritDoc} */
+        @Override public <Row> Iterable<Row> scan(
+            ExecutionContext<Row> execCtx,
+            ColocationGroup grp,
+            Predicate<Row> filter,
+            Function<Row, Row> transformer,
+            ImmutableBitSet bitSet)
+        {
+            lastScanHasFilter = filter != null;
+            lastScanHasProject = transformer != null;
+            return Collections.emptyList();
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 0da2463..782a05c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -91,9 +91,14 @@ public class AbstractBasicIntegrationTest extends GridCommonAbstractTest {
 
     /** */
     protected QueryChecker assertQuery(String qry) {
+        return assertQuery(client, qry);
+    }
+
+    /** */
+    protected QueryChecker assertQuery(IgniteEx ignite, String qry) {
         return new QueryChecker(qry) {
             @Override protected QueryEngine getEngine() {
-                return Commons.lookupComponent(client.context(), QueryEngine.class);
+                return Commons.lookupComponent(ignite.context(), QueryEngine.class);
             }
         };
     }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexRebuildIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexRebuildIntegrationTest.java
new file mode 100644
index 0000000..4d6e986
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexRebuildIntegrationTest.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cache.query.index.IndexProcessor;
+import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
+import org.apache.ignite.internal.processors.query.schema.IndexRebuildCancelToken;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.Test;
+
+/**
+ * Test index rebuild.
+ */
+public class IndexRebuildIntegrationTest extends AbstractBasicIntegrationTest {
+    /** Index rebuild init latch. */
+    private static CountDownLatch initLatch;
+
+    /** Index rebuild start latch. */
+    private static CountDownLatch startLatch;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IndexProcessor.idxRebuildCls = BlockingIndexesRebuildTask.class;
+
+        return super.getConfiguration(igniteInstanceName).setDataStorageConfiguration(
+            new DataStorageConfiguration().setDefaultDataRegionConfiguration(
+                new DataRegionConfiguration().setPersistenceEnabled(true)));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int nodeCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        initLatch = null;
+        startLatch = null;
+
+        super.beforeTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() {
+        // Override super method to skip caches destroy.
+        cleanQueryPlanCache();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        cleanPersistenceDir();
+
+        super.beforeTestsStarted();
+
+        client.cluster().state(ClusterState.ACTIVE);
+
+        executeSql("CREATE TABLE tbl (id INT PRIMARY KEY, val VARCHAR, val2 VARCHAR) WITH CACHE_NAME=\"test\"");
+        executeSql("CREATE INDEX idx_id_val ON tbl (id DESC, val)");
+        executeSql("CREATE INDEX idx_id_val2 ON tbl (id, val2 DESC)");
+        executeSql("CREATE INDEX idx_val ON tbl (val DESC)");
+
+        for (int i = 0; i < 100; i++)
+            executeSql("INSERT INTO tbl VALUES (?, ?, ?)", i, "val" + i, "val" + i);
+
+        executeSql("CREATE TABLE tbl2 (id INT PRIMARY KEY, val VARCHAR)");
+
+        for (int i = 0; i < 100; i++)
+            executeSql("INSERT INTO tbl2 VALUES (?, ?)", i, "val" + i);
+    }
+
+    /** */
+    @Test
+    public void testRebuildOnInitiatorNode() throws Exception {
+        String sql = "SELECT * FROM tbl WHERE id = 0 AND val='val0'";
+
+        QueryChecker validChecker = assertQuery(grid(0), sql)
+            .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+            .returns(0, "val0", "val0");
+
+        QueryChecker rebuildingChecker = assertQuery(grid(0), sql)
+            .matches(QueryChecker.containsTableScan("PUBLIC", "TBL"))
+            .returns(0, "val0", "val0");
+
+        checkRebuildIndexQuery(grid(0), validChecker, rebuildingChecker);
+    }
+
+    /** */
+    @Test
+    public void testRebuildOnRemoteNodeUncorrelated() throws Exception {
+        IgniteEx initNode = grid(0);
+
+        // Uncorrelated, filter by indexed field, without projection (idenitity projection).
+        for (int i = 0; i < 10; i++) {
+            QueryChecker checker = assertQuery(initNode, "SELECT * FROM tbl WHERE id = ? AND val = ?")
+                .withParams(i, "val" + i)
+                .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IgniteSort")))
+                .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+                .returns(i, "val" + i, "val" + i);
+
+            checkRebuildIndexQuery(grid(1), checker, checker);
+        }
+
+        // Uncorrelated, filter by indexed field, with projection.
+        for (int i = 0; i < 10; i++) {
+            QueryChecker checker = assertQuery(initNode, "SELECT val FROM tbl WHERE id = ? AND val = ?")
+                .withParams(i, "val" + i)
+                .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IgniteSort")))
+                .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+                .returns("val" + i);
+
+            checkRebuildIndexQuery(grid(1), checker, checker);
+        }
+    }
+
+    /** */
+    @Test
+    public void testRebuildOnRemoteNodeSorted() throws Exception {
+        IgniteEx initNode = grid(0);
+
+        // Order by part of index collation, without projection.
+        String sql = "SELECT * FROM tbl WHERE id >= 10 and id <= 15 ORDER BY id DESC";
+
+        QueryChecker checker = assertQuery(initNode, sql)
+            .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IndexSpool")))
+            .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IgniteSort")))
+            .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+            .returns(15, "val15", "val15").returns(14, "val14", "val14").returns(13, "val13", "val13")
+            .returns(12, "val12", "val12").returns(11, "val11", "val11").returns(10, "val10", "val10")
+            .ordered();
+
+        checkRebuildIndexQuery(grid(1), checker, checker);
+
+        // Order by part of index collation, with projection.
+        sql = "SELECT val FROM tbl WHERE id >= 10 and id < 20 ORDER BY id DESC";
+
+        checker = assertQuery(initNode, sql)
+            .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IndexSpool")))
+            .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IgniteSort")))
+            .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+            .returns("val19").returns("val18").returns("val17").returns("val16").returns("val15")
+            .returns("val14").returns("val13").returns("val12").returns("val11").returns("val10")
+            .ordered();
+
+        checkRebuildIndexQuery(grid(1), checker, checker);
+
+        // Order by full index collation, with projection.
+        sql = "SELECT val FROM tbl WHERE id >= 10 and id < 20 ORDER BY id DESC, val";
+
+        checker = assertQuery(initNode, sql)
+            .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IndexSpool")))
+            .matches(CoreMatchers.not(QueryChecker.containsSubPlan("IgniteSort")))
+            .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+            .returns("val19").returns("val18").returns("val17").returns("val16").returns("val15")
+            .returns("val14").returns("val13").returns("val12").returns("val11").returns("val10")
+            .ordered();
+
+        checkRebuildIndexQuery(grid(1), checker, checker);
+
+        // Order by another collation.
+        sql = "SELECT * FROM tbl WHERE val BETWEEN 'val10' AND 'val15' ORDER BY val";
+
+        checker = assertQuery(initNode, sql)
+            .matches(QueryChecker.containsSubPlan("IgniteSort"))
+            .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_VAL"))
+            .returns(10, "val10", "val10").returns(11, "val11", "val11").returns(12, "val12", "val12")
+            .returns(13, "val13", "val13").returns(14, "val14", "val14").returns(15, "val15", "val15")
+            .ordered();
+
+        checkRebuildIndexQuery(grid(1), checker, checker);
+    }
+
+    /** */
+    @Test
+    public void testRebuildOnRemoteNodeCorrelated() throws Exception {
+        IgniteEx initNode = grid(0);
+
+        // Correlated join with correlation in filter, without project.
+        String sql = "SELECT /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ tbl2.id, tbl.val " +
+            "FROM tbl2 LEFT JOIN tbl ON tbl.id = tbl2.id AND tbl.val = tbl2.val AND tbl.id % 2 = 0 " +
+            "WHERE tbl2.id BETWEEN 10 AND 19";
+
+        QueryChecker checker = assertQuery(initNode, sql)
+            .matches(QueryChecker.containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
+            .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+            .returns(10, "val10").returns(11, null).returns(12, "val12").returns(13, null).returns(14, "val14")
+            .returns(15, null).returns(16, "val16").returns(17, null).returns(18, "val18").returns(19, null);
+
+        checkRebuildIndexQuery(grid(1), checker, checker);
+
+        // Correlated join with correlation in filter, with project.
+        sql = "SELECT /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ tbl2.id, tbl.val1 " +
+            "FROM tbl2 JOIN (SELECT tbl.val || '-' AS val1, val, id FROM tbl) AS tbl " +
+            "ON tbl.id = tbl2.id AND tbl.val = tbl2.val " +
+            "WHERE tbl2.id BETWEEN 10 AND 12";
+
+        checker = assertQuery(initNode, sql)
+            .matches(QueryChecker.containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
+            .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+            .returns(10, "val10-").returns(11, "val11-").returns(12, "val12-");
+
+        checkRebuildIndexQuery(grid(1), checker, checker);
+    }
+
+    /** */
+    @Test
+    public void testRebuildOnRemoteNodeCollationRestore() throws Exception {
+        IgniteEx initNode = grid(0);
+
+        // Correlated join with correlation in filter, with project as a subset of collation.
+        String sql = "SELECT /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ tbl2.id, tbl.id1 " +
+            "FROM tbl2 JOIN (SELECT tbl.id + 1 AS id1, id FROM tbl WHERE val >= 'val') AS tbl " +
+            "ON tbl.id = tbl2.id " +
+            "WHERE tbl2.val BETWEEN 'val10' AND 'val12'";
+
+        QueryChecker checker = assertQuery(initNode, sql)
+            .matches(QueryChecker.containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
+            .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL"))
+            .returns(10, 11).returns(11, 12).returns(12, 13);
+
+        checkRebuildIndexQuery(grid(1), checker, checker);
+
+        // Correlated join with correlation in filter, with a project as a subset of collation with DESC ordering.
+        sql = "SELECT /*+ DISABLE_RULE('MergeJoinConverter', 'NestedLoopJoinConverter') */ tbl2.id, tbl.id1 " +
+            "FROM tbl2 JOIN (SELECT tbl.id + 1 AS id1, id FROM tbl WHERE val2 >= 'val') AS tbl " +
+            "ON tbl.id = tbl2.id " +
+            "WHERE tbl2.val BETWEEN 'val10' AND 'val12'";
+
+        checker = assertQuery(initNode, sql)
+            .matches(QueryChecker.containsSubPlan("IgniteCorrelatedNestedLoopJoin"))
+            .matches(QueryChecker.containsIndexScan("PUBLIC", "TBL", "IDX_ID_VAL2"))
+            .returns(10, 11).returns(11, 12).returns(12, 13);
+
+        checkRebuildIndexQuery(grid(1), checker, checker);
+    }
+
+    /** */
+    private void checkRebuildIndexQuery(
+        IgniteEx rebuildNode,
+        QueryChecker checkerWhenValid,
+        QueryChecker checkerWhenRebuilding
+    ) throws Exception {
+        initLatch = new CountDownLatch(1);
+        startLatch = new CountDownLatch(1);
+
+        GridCacheContext<?, ?> cctx = rebuildNode.context().cache().cache("test").context();
+
+        checkerWhenValid.check();
+
+        GridTestUtils.runAsync(() -> forceRebuildIndexes(rebuildNode, cctx));
+
+        try {
+            initLatch.await(10, TimeUnit.SECONDS);
+
+            // Check query cache invalidated after index rebuild start.
+            checkerWhenRebuilding.check();
+        }
+        finally {
+            startLatch.countDown();
+        }
+
+        indexRebuildFuture(rebuildNode, cctx.cacheId()).get();
+
+        // Check query cache invalidated after index rebuild finish.
+        checkerWhenValid.check();
+    }
+
+    /** */
+    static class BlockingIndexesRebuildTask extends IndexesRebuildTask {
+        /** {@inheritDoc} */
+        @Override protected void startRebuild(
+            GridCacheContext cctx,
+            GridFutureAdapter<Void> fut,
+            SchemaIndexCacheVisitorClosure clo,
+            IndexRebuildCancelToken cancelTok
+        ) {
+            try {
+                if (initLatch != null)
+                    initLatch.countDown();
+
+                if (startLatch != null)
+                    startLatch.await(100, TimeUnit.SECONDS);
+            }
+            catch (InterruptedException ignored) {
+                // No-op.
+            }
+            super.startRebuild(cctx, fut, clo, cancelTok);
+        }
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractAggregatePlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractAggregatePlannerTest.java
index 421b948..dde5390 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractAggregatePlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractAggregatePlannerTest.java
@@ -57,7 +57,7 @@ public class AbstractAggregatePlannerTest extends AbstractPlannerTest {
     /**
      * @return PARTITIONED test table (ID, VAL0, VAL1, GRP0, GRP1)
      */
-    @NotNull protected AbstractPlannerTest.TestTable createAffinityTable() {
+    @NotNull protected TestTable createAffinityTable() {
         IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
 
         return new TestTable(
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 8cc783b..62b1fb0 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -22,23 +22,17 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import com.google.common.collect.ImmutableSet;
-import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.AbstractRelNode;
-import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.RelVisitor;
@@ -46,14 +40,9 @@ import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelDataTypeImpl;
-import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.ColumnStrategy;
-import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlNode;
@@ -88,20 +77,13 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.Splitter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
-import org.apache.ignite.internal.processors.query.calcite.schema.CacheIndexImpl;
 import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
 import org.apache.ignite.internal.processors.query.calcite.schema.ColumnDescriptor;
-import org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
-import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
-import org.apache.ignite.internal.processors.query.calcite.schema.IgniteStatisticsImpl;
 import org.apache.ignite.internal.processors.query.calcite.schema.ModifyTuple;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
-import org.apache.ignite.internal.processors.query.stat.ObjectStatisticsImpl;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -664,7 +646,7 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
     }
 
     /** */
-    private BaseQueryContext baseQueryContext(Collection<IgniteSchema> schemas) {
+    protected BaseQueryContext baseQueryContext(Collection<IgniteSchema> schemas) {
         SchemaPlus rootSchema = createRootSchema(false);
         SchemaPlus dfltSchema = null;
 
@@ -686,197 +668,6 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
     }
 
     /** */
-    protected static class TestTable implements IgniteCacheTable {
-        /** */
-        private final String name;
-
-        /** */
-        private final RelProtoDataType protoType;
-
-        /** */
-        private final Map<String, IgniteIndex> indexes = new HashMap<>();
-
-        /** */
-        private IgniteDistribution distribution;
-
-        /** */
-        private IgniteStatisticsImpl statistics;
-
-        /** */
-        private final CacheTableDescriptor desc;
-
-        /** */
-        TestTable(RelDataType type) {
-            this(type, 100.0);
-        }
-
-        /** */
-        TestTable(RelDataType type, double rowCnt) {
-            this(UUID.randomUUID().toString(), type, rowCnt);
-        }
-
-        /** */
-        TestTable(String name, RelDataType type, double rowCnt) {
-            protoType = RelDataTypeImpl.proto(type);
-            statistics = new IgniteStatisticsImpl(new ObjectStatisticsImpl((long)rowCnt, Collections.emptyMap()));
-            this.name = name;
-
-            desc = new TestTableDescriptor(this::distribution, type);
-        }
-
-        /**
-         * Set table distribution.
-         *
-         * @param distribution Table distribution to set.
-         * @return TestTable for chaining.
-         */
-        public TestTable setDistribution(IgniteDistribution distribution) {
-            this.distribution = distribution;
-
-            return this;
-        }
-
-        /**
-         * Set table statistics;
-         *
-         * @param statistics Statistics to set.
-         * @return TestTable for chaining.
-         */
-        public TestTable setStatistics(IgniteStatisticsImpl statistics) {
-            this.statistics = statistics;
-
-            return this;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteLogicalTableScan toRel(
-            RelOptCluster cluster,
-            RelOptTable relOptTbl,
-            @Nullable List<RexNode> proj,
-            @Nullable RexNode cond,
-            @Nullable ImmutableBitSet requiredColumns
-        ) {
-            return IgniteLogicalTableScan.create(cluster, cluster.traitSet(), relOptTbl, proj, cond, requiredColumns);
-        }
-
-        /** {@inheritDoc} */
-        @Override public RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet bitSet) {
-            RelDataType rowType = protoType.apply(typeFactory);
-
-            if (bitSet != null) {
-                RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(typeFactory);
-                for (int i = bitSet.nextSetBit(0); i != -1; i = bitSet.nextSetBit(i + 1))
-                    b.add(rowType.getFieldList().get(i));
-                rowType = b.build();
-            }
-
-            return rowType;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Statistic getStatistic() {
-            return statistics;
-        }
-
-        /** {@inheritDoc} */
-        @Override public <Row> Iterable<Row> scan(
-            ExecutionContext<Row> execCtx,
-            ColocationGroup group, Predicate<Row> filter,
-            Function<Row, Row> transformer,
-            ImmutableBitSet bitSet
-        ) {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Schema.TableType getJdbcTableType() {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isRolledUp(String col) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean rolledUpColumnValidInsideAgg(
-            String column,
-            SqlCall call,
-            SqlNode parent,
-            CalciteConnectionConfig config
-        ) {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public ColocationGroup colocationGroup(MappingQueryContext ctx) {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteDistribution distribution() {
-            if (distribution != null)
-                return distribution;
-
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public CacheTableDescriptor descriptor() {
-            return desc;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Map<String, IgniteIndex> indexes() {
-            return Collections.unmodifiableMap(indexes);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void addIndex(IgniteIndex idxTbl) {
-            indexes.put(idxTbl.name(), idxTbl);
-        }
-
-        /** */
-        public TestTable addIndex(RelCollation collation, String name) {
-            indexes.put(name, new CacheIndexImpl(collation, name, null, this));
-
-            return this;
-        }
-
-        /** */
-        public TestTable addIndex(String name, int... keys) {
-            addIndex(TraitUtils.createCollation(Arrays.stream(keys).boxed().collect(Collectors.toList())), name);
-
-            return this;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteIndex getIndex(String idxName) {
-            return indexes.get(idxName);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void removeIndex(String idxName) {
-            throw new AssertionError();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void ensureCacheStarted() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isModifiable() {
-            return true;
-        }
-
-        /** */
-        public String name() {
-            return name;
-        }
-    }
-
-    /** */
     static class TestTableDescriptor implements CacheTableDescriptor {
         /** */
         private final Supplier<IgniteDistribution> distributionSupp;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/IndexRebuildPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/IndexRebuildPlannerTest.java
new file mode 100644
index 0000000..da86c45
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/IndexRebuildPlannerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Planner test for index rebuild.
+ */
+public class IndexRebuildPlannerTest extends AbstractPlannerTest {
+    /** */
+    private IgniteSchema publicSchema;
+
+    /** */
+    private TestTable tbl;
+
+    /** {@inheritDoc} */
+    @Override public void setup() {
+        super.setup();
+
+        tbl = createTable("TBL", 100, IgniteDistributions.single(), "ID", Integer.class, "VAL", String.class)
+            .addIndex("IDX", 0);
+
+        publicSchema = createSchema(tbl);
+    }
+
+    /** */
+    @Test
+    public void testIndexRebuild() throws Exception {
+        String sql = "SELECT * FROM TBL WHERE id = 0";
+
+        assertPlan(sql, publicSchema, isInstanceOf(IgniteIndexScan.class));
+
+        tbl.markIndexRebuildInProgress(true);
+
+        assertPlan(sql, publicSchema, isInstanceOf(IgniteTableScan.class));
+
+        tbl.markIndexRebuildInProgress(false);
+
+        assertPlan(sql, publicSchema, isInstanceOf(IgniteIndexScan.class));
+    }
+
+    /** */
+    @Test
+    public void testConcurrentIndexRebuildStateChange() throws Exception {
+        String sql = "SELECT * FROM TBL WHERE id = 0";
+
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
+            while (!stop.get()) {
+                tbl.markIndexRebuildInProgress(true);
+                tbl.markIndexRebuildInProgress(false);
+            }
+        });
+
+        try {
+            for (int i = 0; i < 1000; i++) {
+                IgniteRel rel = physicalPlan(sql, publicSchema);
+
+                assertTrue(rel instanceof IgniteTableScan || rel instanceof IgniteIndexScan);
+            }
+        }
+        finally {
+            stop.set(true);
+        }
+
+        fut.get();
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
new file mode 100644
index 0000000..742399c
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
@@ -0,0 +1,242 @@
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeImpl;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.prepare.MappingQueryContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalTableScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.CacheIndexImpl;
+import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteStatisticsImpl;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.stat.ObjectStatisticsImpl;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class TestTable implements IgniteCacheTable {
+    /** */
+    private final String name;
+
+    /** */
+    private final RelProtoDataType protoType;
+
+    /** */
+    private final Map<String, IgniteIndex> indexes = new HashMap<>();
+
+    /** */
+    private IgniteDistribution distribution;
+
+    /** */
+    private IgniteStatisticsImpl statistics;
+
+    /** */
+    private final CacheTableDescriptor desc;
+
+    /** */
+    private volatile boolean idxRebuildInProgress;
+
+    /** */
+    protected TestTable(RelDataType type) {
+        this(type, 100.0);
+    }
+
+    /** */
+    protected TestTable(RelDataType type, double rowCnt) {
+        this(UUID.randomUUID().toString(), type, rowCnt);
+    }
+
+    /** */
+    protected TestTable(String name, RelDataType type, double rowCnt) {
+        protoType = RelDataTypeImpl.proto(type);
+        statistics = new IgniteStatisticsImpl(new ObjectStatisticsImpl((long)rowCnt, Collections.emptyMap()));
+        this.name = name;
+
+        desc = new AbstractPlannerTest.TestTableDescriptor(this::distribution, type);
+    }
+
+    /**
+     * Set table distribution.
+     *
+     * @param distribution Table distribution to set.
+     * @return TestTable for chaining.
+     */
+    public TestTable setDistribution(IgniteDistribution distribution) {
+        this.distribution = distribution;
+
+        return this;
+    }
+
+    /**
+     * Set table statistics;
+     *
+     * @param statistics Statistics to set.
+     * @return TestTable for chaining.
+     */
+    public TestTable setStatistics(IgniteStatisticsImpl statistics) {
+        this.statistics = statistics;
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteLogicalTableScan toRel(
+        RelOptCluster cluster,
+        RelOptTable relOptTbl,
+        @Nullable List<RexNode> proj,
+        @Nullable RexNode cond,
+        @Nullable ImmutableBitSet requiredColumns
+    ) {
+        return IgniteLogicalTableScan.create(cluster, cluster.traitSet(), relOptTbl, proj, cond, requiredColumns);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelDataType getRowType(RelDataTypeFactory typeFactory, ImmutableBitSet bitSet) {
+        RelDataType rowType = protoType.apply(typeFactory);
+
+        if (bitSet != null) {
+            RelDataTypeFactory.Builder b = new RelDataTypeFactory.Builder(typeFactory);
+            for (int i = bitSet.nextSetBit(0); i != -1; i = bitSet.nextSetBit(i + 1))
+                b.add(rowType.getFieldList().get(i));
+            rowType = b.build();
+        }
+
+        return rowType;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Statistic getStatistic() {
+        return statistics;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <Row> Iterable<Row> scan(
+        ExecutionContext<Row> execCtx,
+        ColocationGroup grp, Predicate<Row> filter,
+        Function<Row, Row> transformer,
+        ImmutableBitSet bitSet
+    ) {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Schema.TableType getJdbcTableType() {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isRolledUp(String col) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean rolledUpColumnValidInsideAgg(
+        String column,
+        SqlCall call,
+        SqlNode parent,
+        CalciteConnectionConfig cfg
+    ) {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ColocationGroup colocationGroup(MappingQueryContext ctx) {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteDistribution distribution() {
+        if (distribution != null)
+            return distribution;
+
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheTableDescriptor descriptor() {
+        return desc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, IgniteIndex> indexes() {
+        return Collections.unmodifiableMap(indexes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addIndex(IgniteIndex idxTbl) {
+        indexes.put(idxTbl.name(), idxTbl);
+    }
+
+    /** */
+    public TestTable addIndex(RelCollation collation, String name) {
+        indexes.put(name, new CacheIndexImpl(collation, name, null, this));
+
+        return this;
+    }
+
+    /** */
+    public TestTable addIndex(String name, int... keys) {
+        addIndex(TraitUtils.createCollation(Arrays.stream(keys).boxed().collect(Collectors.toList())), name);
+
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteIndex getIndex(String idxName) {
+        return indexes.get(idxName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeIndex(String idxName) {
+        throw new AssertionError();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void ensureCacheStarted() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isModifiable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markIndexRebuildInProgress(boolean mark) {
+        idxRebuildInProgress = mark;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isIndexRebuildInProgress() {
+        return idxRebuildInProgress;
+    }
+
+    /** */
+    public String name() {
+        return name;
+    }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
index 6338aef..8633d4d 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IgniteCalciteTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import org.apache.ignite.internal.processors.query.calcite.QueryCheckerTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.ClosableIteratorsHolderTest;
+import org.apache.ignite.internal.processors.query.calcite.exec.LogicalRelImplementorTest;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.IgniteSqlFunctionsTest;
 import org.apache.ignite.internal.processors.query.calcite.sql.SqlDdlParserTest;
 import org.junit.runner.RunWith;
@@ -37,6 +38,7 @@ import org.junit.runners.Suite;
     QueryCheckerTest.class,
     SqlDdlParserTest.class,
     IgniteSqlFunctionsTest.class,
+    LogicalRelImplementorTest.class,
 
     ScriptTestSuite.class,
 })
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index 73b044a..e51017c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.query.calcite.integration.CalciteEr
 import org.apache.ignite.internal.processors.query.calcite.integration.CorrelatesIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.HashSpoolIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.IndexDdlIntegrationTest;
+import org.apache.ignite.internal.processors.query.calcite.integration.IndexRebuildIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.IndexScanlIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.IndexSpoolIntegrationTest;
 import org.apache.ignite.internal.processors.query.calcite.integration.IntervalTest;
@@ -97,6 +98,7 @@ import org.junit.runners.Suite;
     UserDefinedFunctionsIntegrationTest.class,
     CorrelatesIntegrationTest.class,
     SystemViewsIntegrationTest.class,
+    IndexRebuildIntegrationTest.class,
 })
 public class IntegrationTestSuite {
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index 5de95d3..524ac14 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNes
 import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedSubqueryPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.IndexRebuildPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.JoinCommutePlannerTest;
 import org.apache.ignite.internal.processors.query.calcite.planner.JoinWithUsingPlannerTest;
@@ -67,6 +68,7 @@ import org.junit.runners.Suite;
     CorrelatedSubqueryPlannerTest.class,
     JoinWithUsingPlannerTest.class,
     ProjectFilterScanMergePlannerTest.class,
+    IndexRebuildPlannerTest.class,
 })
 public class PlannerTestSuite {
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
index 1334f4f..2783094 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaChangeListener.java
@@ -93,6 +93,22 @@ public interface SchemaChangeListener {
     public void onIndexDropped(String schemaName, String tblName, String idxName);
 
     /**
+     * Callback on index rebuild started for all indexes in the table.
+     *
+     * @param schemaName Schema name.
+     * @param tblName Table name.
+     */
+    public void onIndexRebuildStarted(String schemaName, String tblName);
+
+    /**
+     * Callback on index rebuild finished for all indexes in the table.
+     *
+     * @param schemaName Schema name.
+     * @param tblName Table name.
+     */
+    public void onIndexRebuildFinished(String schemaName, String tblName);
+
+    /**
      * Callback on function creation.
      *
      * @param schemaName Schema name.
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 9ff8ea6..ee539d9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -2073,21 +2073,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /** {@inheritDoc} */
     @Override public void markAsRebuildNeeded(GridCacheContext cctx, boolean val) {
-        markIndexRebuild(cctx.name(), val);
-    }
-
-    /**
-     * Mark tables for index rebuild, so that their indexes are not used.
-     *
-     * @param cacheName Cache name.
-     * @param val Value.
-     */
-    private void markIndexRebuild(String cacheName, boolean val) {
-        for (H2TableDescriptor tblDesc : schemaMgr.tablesForCache(cacheName)) {
-            assert tblDesc.table() != null;
-
-            tblDesc.table().markRebuildFromHashInProgress(val);
-        }
+        schemaMgr.markIndexRebuild(cctx.name(), val);
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
index 528a772..0101c5f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/SchemaManager.java
@@ -882,6 +882,25 @@ public class SchemaManager implements GridQuerySchemaManager {
         return null;
     }
 
+    /**
+     * Mark tables for index rebuild, so that their indexes are not used.
+     *
+     * @param cacheName Cache name.
+     * @param mark Mark/unmark flag, {@code true} if index rebuild started, {@code false} if finished.
+     */
+    public void markIndexRebuild(String cacheName, boolean mark) {
+        for (H2TableDescriptor tblDesc : tablesForCache(cacheName)) {
+            assert tblDesc.table() != null;
+
+            tblDesc.table().markRebuildFromHashInProgress(mark);
+
+            if (mark)
+                lsnr.onIndexRebuildStarted(tblDesc.schemaName(), tblDesc.tableName());
+            else
+                lsnr.onIndexRebuildFinished(tblDesc.schemaName(), tblDesc.tableName());
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public GridQueryTypeDescriptor typeDescriptorForTable(String schemaName, String tableName) {
         GridH2Table dataTable = dataTable(schemaName, tableName);
@@ -929,6 +948,12 @@ public class SchemaManager implements GridQuerySchemaManager {
         @Override public void onIndexDropped(String schemaName, String tblName, String idxName) {}
 
         /** {@inheritDoc} */
+        @Override public void onIndexRebuildStarted(String schemaName, String tblName) {}
+
+        /** {@inheritDoc} */
+        @Override public void onIndexRebuildFinished(String schemaName, String tblName) {}
+
+        /** {@inheritDoc} */
         @Override public void onSqlTypeCreated(
             String schemaName,
             GridQueryTypeDescriptor typeDesc,
@@ -1025,6 +1050,16 @@ public class SchemaManager implements GridQuerySchemaManager {
         }
 
         /** {@inheritDoc} */
+        @Override public void onIndexRebuildStarted(String schemaName, String tblName) {
+            lsnrs.forEach(lsnr -> lsnr.onIndexRebuildStarted(schemaName, tblName));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onIndexRebuildFinished(String schemaName, String tblName) {
+            lsnrs.forEach(lsnr -> lsnr.onIndexRebuildFinished(schemaName, tblName));
+        }
+
+        /** {@inheritDoc} */
         @Override public void onFunctionCreated(String schemaName, String name, Method method) {
             lsnrs.forEach(lsnr -> lsnr.onFunctionCreated(schemaName, name, method));
         }