You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/04/12 14:49:18 UTC

[ignite] branch master updated: IGNITE-14321 Fix force index rebuilding (#8962)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 43a7a1a  IGNITE-14321 Fix force index rebuilding (#8962)
43a7a1a is described below

commit 43a7a1a14cec08ec1bbe706a57753b7498cc04d2
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Mon Apr 12 17:49:01 2021 +0300

    IGNITE-14321 Fix force index rebuilding (#8962)
---
 .../GridCommandHandlerIndexForceRebuildTest.java   |   8 +-
 .../internal/cache/query/index/IndexProcessor.java |   9 +-
 .../managers/indexing/IndexesRebuildTask.java      |  53 ++--
 .../preloader/GridDhtPartitionsExchangeFuture.java |   7 +
 .../GridCacheDatabaseSharedManager.java            |  74 +++--
 .../IgniteCacheDatabaseSharedManager.java          |  10 +-
 .../processors/query/GridQueryProcessor.java       | 143 +++++++---
 .../processors/query/IndexRebuildAware.java        | 145 ++++++++++
 .../junits/common/GridCommonAbstractTest.java      |  38 +++
 .../cache/index/ForceRebuildIndexTest.java         | 317 +++++++++++++++++++++
 .../cache/index/IndexesRebuildTaskEx.java          | 207 ++++++++++++++
 .../cache/index/StopRebuildIndexTest.java          | 191 ++-----------
 .../IgnitePdsIndexingDefragmentationTest.java      |   5 +-
 .../testsuites/IgnitePdsWithIndexingTestSuite.java |   4 +-
 14 files changed, 950 insertions(+), 261 deletions(-)

diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
index 904055fd..4d8b181 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexForceRebuildTest.java
@@ -406,15 +406,15 @@ public class GridCommandHandlerIndexForceRebuildTest extends GridCommandHandlerA
 
             waitForIndexesRebuild(n);
 
+            intlRebIdxFut.get(getTestTimeout());
+            destroyCacheFut.get(getTestTimeout());
+            putCacheFut.get(getTestTimeout());
+
             injectTestSystemOut();
 
             assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", "--check-crc", cacheName1));
 
             assertContains(log, testOut.toString(), "no issues found.");
-
-            intlRebIdxFut.get(getTestTimeout());
-            destroyCacheFut.get(getTestTimeout());
-            putCacheFut.get(getTestTimeout());
         }
         finally {
             stopLoad.set(true);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
index f10b0f2..1b16e33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexProcessor.java
@@ -341,9 +341,13 @@ public class IndexProcessor extends GridProcessorAdapter {
 
     /**
      * Start rebuild of indexes for specified cache.
+     *
+     * @param cctx Cache context.
+     * @param force Force rebuild indexes.
+     * @return A future of rebuilding cache indexes.
      */
-    public IgniteInternalFuture<?> rebuildIndexesForCache(GridCacheContext<?, ?> cctx) {
-        return idxRebuild.rebuild(cctx);
+    @Nullable public IgniteInternalFuture<?> rebuildIndexesForCache(GridCacheContext<?, ?> cctx, boolean force) {
+        return idxRebuild.rebuild(cctx, force);
     }
 
     /** */
@@ -555,6 +559,7 @@ public class IndexProcessor extends GridProcessorAdapter {
     /**
      * Collect indexes for rebuild.
      *
+     * @param cctx Cache context.
      * @param createdOnly Get only created indexes (not restored from dick).
      */
     public List<InlineIndex> treeIndexes(GridCacheContext cctx, boolean createdOnly) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
index df70a128..6cb3850 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
@@ -38,9 +38,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
-import static java.util.Objects.isNull;
-import static java.util.Objects.nonNull;
-
 /**
  * Task that rebuilds indexes.
  */
@@ -48,9 +45,15 @@ public class IndexesRebuildTask {
     /** Index rebuilding futures for caches. Mapping: cacheId -> rebuild indexes future. */
     private final Map<Integer, SchemaIndexCacheFuture> idxRebuildFuts = new ConcurrentHashMap<>();
 
-    /** Start to rebuild. */
-    public IgniteInternalFuture<?> rebuild(GridCacheContext cctx) {
-        assert nonNull(cctx);
+    /**
+     * Start to rebuild.
+     *
+     * @param cctx Cache context.
+     * @param force Force rebuild indexes.
+     * @return A future of rebuilding cache indexes.
+     */
+    @Nullable public IgniteInternalFuture<?> rebuild(GridCacheContext cctx, boolean force) {
+        assert cctx != null;
 
         if (!CU.affinityNode(cctx.localNode(), cctx.config().getNodeFilter()))
             return null;
@@ -68,7 +71,7 @@ public class IndexesRebuildTask {
             clo = row -> cctx.queries().store(row, null, mvccEnabled);
         }
         else {
-            Collection<InlineIndex> toRebuild = cctx.kernalContext().indexProcessor().treeIndexes(cctx, true);
+            Collection<InlineIndex> toRebuild = cctx.kernalContext().indexProcessor().treeIndexes(cctx, !force);
 
             if (F.isEmpty(toRebuild))
                 return null;
@@ -84,17 +87,20 @@ public class IndexesRebuildTask {
         // To avoid possible data race.
         GridFutureAdapter<Void> outRebuildCacheIdxFut = new GridFutureAdapter<>();
 
-        // An internal future for the ability to cancel index rebuilding.
-        // This behavior should be discussed in IGNITE-14321.
         IgniteLogger log = cctx.kernalContext().grid().log();
 
+        // An internal future for the ability to cancel index rebuilding.
         SchemaIndexCacheFuture intRebFut = new SchemaIndexCacheFuture(new SchemaIndexOperationCancellationToken());
-        cancelIndexRebuildFuture(idxRebuildFuts.put(cctx.cacheId(), intRebFut), log);
+
+        SchemaIndexCacheFuture prevIntRebFut = idxRebuildFuts.put(cctx.cacheId(), intRebFut);
+
+        // Check that the previous rebuild is completed.
+        assert prevIntRebFut == null;
 
         rebuildCacheIdxFut.listen(fut -> {
             Throwable err = fut.error();
 
-            if (isNull(err)) {
+            if (err == null) {
                 try {
                     cctx.kernalContext().query().markAsRebuildNeeded(cctx, false);
                 }
@@ -103,13 +109,13 @@ public class IndexesRebuildTask {
                 }
             }
 
-            if (nonNull(err))
+            if (err != null)
                 U.error(log, "Failed to rebuild indexes for cache: " + cacheName, err);
 
-            outRebuildCacheIdxFut.onDone(err);
-
             idxRebuildFuts.remove(cctx.cacheId(), intRebFut);
             intRebFut.onDone(err);
+
+            outRebuildCacheIdxFut.onDone(err);
         });
 
         startRebuild(cctx, rebuildCacheIdxFut, clo, intRebFut.cancelToken());
@@ -117,9 +123,20 @@ public class IndexesRebuildTask {
         return outRebuildCacheIdxFut;
     }
 
-    /** Actual start rebuilding. Use this method for test purposes only. */
-    protected void startRebuild(GridCacheContext cctx, GridFutureAdapter<Void> fut,
-        SchemaIndexCacheVisitorClosure clo, SchemaIndexOperationCancellationToken cancel) {
+    /**
+     * Actual start rebuilding. Use this method for test purposes only.
+     *
+     * @param cctx Cache context.
+     * @param fut Future for rebuild indexes.
+     * @param clo Closure.
+     * @param cancel Cancellation token.
+     */
+    protected void startRebuild(
+        GridCacheContext cctx,
+        GridFutureAdapter<Void> fut,
+        SchemaIndexCacheVisitorClosure clo,
+        SchemaIndexOperationCancellationToken cancel
+    ) {
         new SchemaIndexCacheVisitorImpl(cctx, cancel, fut).visit(clo);
     }
 
@@ -127,6 +144,7 @@ public class IndexesRebuildTask {
      * Stop rebuilding indexes.
      *
      * @param cacheInfo Cache context info.
+     * @param log Logger.
      */
     public void stopRebuild(GridCacheContextInfo cacheInfo, IgniteLogger log) {
         cancelIndexRebuildFuture(idxRebuildFuts.remove(cacheInfo.cacheId()), log);
@@ -136,6 +154,7 @@ public class IndexesRebuildTask {
      * Cancel rebuilding indexes for the cache through a future.
      *
      * @param rebFut Index rebuilding future.
+     * @param log Logger.
      */
     private void cancelIndexRebuildFuture(@Nullable SchemaIndexCacheFuture rebFut, IgniteLogger log) {
         if (rebFut != null && !rebFut.isDone() && rebFut.cancelToken().cancel()) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index dcb131f..e63d78a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2429,6 +2429,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             span.addTag(SpanTags.ERROR, errf::toString);
         }
 
+        boolean cleanIdxRebuildFutures = true;
+
         try {
             waitUntilNewCachesAreRegistered();
 
@@ -2529,6 +2531,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (err == null) {
                 cctx.database().rebuildIndexesIfNeeded(this);
 
+                cleanIdxRebuildFutures = false;
+
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
                     if (!grp.isLocal())
                         grp.topology().onExchangeDone(this, grp.affinity().readyAffinity(res), false);
@@ -2548,6 +2552,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
 
         final Throwable err0 = err;
 
+        if (err0 != null && cleanIdxRebuildFutures)
+            cctx.kernalContext().query().removeIndexRebuildFuturesOnExchange(this, null);
+
         // Should execute this listener first, before any external listeners.
         // Listeners use stack as data structure.
         listen(f -> {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 87b15b6..e6ca08c 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -166,8 +166,11 @@ import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.emptyList;
 import static java.util.Objects.nonNull;
 import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
@@ -403,7 +406,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 catch (IgniteCheckedException e) {
                     log.warning("Metastore iteration error", e);
 
-                    return Collections.emptyList();
+                    return emptyList();
                 }
             }, identity());
     }
@@ -1429,25 +1432,55 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         if (defrgMgr != null)
             return;
 
-        rebuildIndexes(cctx.cacheContexts(), (cacheCtx) -> cacheCtx.startTopologyVersion().equals(exchangeFut.initialVersion()));
+        Collection<GridCacheContext> rejected = rebuildIndexes(
+            cctx.cacheContexts(),
+            (cacheCtx) -> cacheCtx.startTopologyVersion().equals(exchangeFut.initialVersion()) &&
+                cctx.kernalContext().query().rebuildIndexOnExchange(cacheCtx.cacheId(), exchangeFut),
+            false
+        );
+
+        if (!rejected.isEmpty()) {
+            cctx.kernalContext().query().removeIndexRebuildFuturesOnExchange(
+                exchangeFut,
+                rejected.stream().map(GridCacheContext::cacheId).collect(toSet())
+            );
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public void forceRebuildIndexes(Collection<GridCacheContext> contexts) {
-        contexts.forEach(ctx -> cctx.kernalContext().query().prepareIndexRebuildFuture(ctx.cacheId()));
+    @Override public Collection<GridCacheContext> forceRebuildIndexes(Collection<GridCacheContext> contexts) {
+        Set<Integer> cacheIds = contexts.stream().map(GridCacheContext::cacheId).collect(toSet());
 
-        rebuildIndexes(contexts, (cacheCtx) -> true);
+        Set<Integer> rejected = cctx.kernalContext().query().prepareRebuildIndexes(cacheIds);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Preparing features of rebuilding indexes for caches on force rebuild [requested=" + cacheIds +
+                ", rejected=" + rejected + ']');
+        }
+
+        rebuildIndexes(contexts, (cacheCtx) -> !rejected.contains(cacheCtx.cacheId()), true);
+
+        return rejected.isEmpty() ? emptyList() :
+            contexts.stream().filter(ctx -> rejected.contains(ctx.cacheId())).collect(toList());
     }
 
     /**
+     * Rebuilding indexes for caches.
+     *
      * @param contexts Collection of cache contexts for which indexes should be rebuilt.
      * @param rebuildCond Condition that should be met for indexes to be rebuilt for specific cache.
+     * @param force Force rebuild indexes.
+     * @return Cache contexts that did not pass by {@code rebuildCond}.
      */
-    private void rebuildIndexes(Collection<GridCacheContext> contexts, Predicate<GridCacheContext> rebuildCond) {
+    private Collection<GridCacheContext> rebuildIndexes(
+        Collection<GridCacheContext> contexts,
+        Predicate<GridCacheContext> rebuildCond,
+        boolean force
+    ) {
         GridQueryProcessor qryProc = cctx.kernalContext().query();
 
         if (!qryProc.moduleEnabled())
-            return;
+            return emptyList();
 
         GridCountDownCallback rebuildIndexesCompleteCntr = new GridCountDownCallback(
             contexts.size(),
@@ -1458,17 +1491,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             1  //need at least 1 index rebuilded to print message about rebuilding completion
         );
 
+        Collection<GridCacheContext> rejected = null;
+
         for (GridCacheContext cacheCtx : contexts) {
-            if (!rebuildCond.test(cacheCtx))
-                continue;
+            if (rebuildCond.test(cacheCtx)) {
+                IgniteInternalFuture<?> rebuildFut = qryProc.rebuildIndexesFromHash(cacheCtx, force);
 
-            IgniteInternalFuture<?> rebuildFut = qryProc.rebuildIndexesFromHash(cacheCtx);
+                if (nonNull(rebuildFut))
+                    rebuildFut.listen(fut -> rebuildIndexesCompleteCntr.countDown(true));
+                else
+                    rebuildIndexesCompleteCntr.countDown(false);
+            }
+            else {
+                if (rejected == null)
+                    rejected = new ArrayList<>();
 
-            if (nonNull(rebuildFut))
-                rebuildFut.listen(fut -> rebuildIndexesCompleteCntr.countDown(true));
-            else
-                rebuildIndexesCompleteCntr.countDown(false);
+                rejected.add(cacheCtx);
+            }
         }
+
+        return rejected == null ? emptyList() : rejected;
     }
 
     /**
@@ -1492,7 +1534,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         List<Integer> stoppedGrpIds = stoppedGrps.stream()
             .filter(IgniteBiTuple::get2)
             .map(t -> t.get1().groupId())
-            .collect(Collectors.toList());
+            .collect(toList());
 
         cctx.snapshotMgr().onCacheGroupsStopped(stoppedGrpIds);
 
@@ -3545,7 +3587,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                     return cctx.cacheContext(cacheId) != null && cacheGroupPredicate.apply(cctx.cacheContext(cacheId).groupId());
                 })
-                .collect(Collectors.toList());
+                .collect(toList());
 
             return record.setWriteEntries(filteredEntries);
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 207ef8c..b6886ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -1085,12 +1085,14 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
     }
 
     /**
-     * Performs indexes rebuild for all cache contexts from {@code contexts}.
+     * Initiate an asynchronous forced index rebuild for caches.
      *
-     * @param contexts List of cache contexts.
+     * @param contexts Cache contexts.
+     * @return Cache contexts for which index rebuilding is not initiated by
+     *      this call because they are already in the process of rebuilding.
      */
-    public void forceRebuildIndexes(Collection<GridCacheContext> contexts) {
-        // No-op.
+    public Collection<GridCacheContext> forceRebuildIndexes(Collection<GridCacheContext> contexts) {
+        return Collections.emptyList();
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index f335444..6f99fdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -56,7 +56,6 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -133,9 +132,12 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.emptySet;
 import static java.util.Collections.newSetFromMap;
+import static java.util.Collections.singleton;
 import static java.util.Objects.isNull;
 import static java.util.Objects.nonNull;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.internal.GridTopic.TOPIC_SCHEMA;
@@ -204,9 +206,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     /** Active propose messages. */
     private final LinkedHashMap<UUID, SchemaProposeDiscoveryMessage> activeProposals = new LinkedHashMap<>();
 
-    /** Map from a cacheId to a future indicating that there is an in-progress index rebuild for the given cache. */
-    private final ConcurrentMap<Integer, GridFutureAdapter<Void>> idxRebuildFuts = new ConcurrentHashMap<>();
-
     /** General state mutex. */
     private final Object stateMux = new Object();
 
@@ -242,6 +241,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     /** Cache name - value typeId pairs for which type mismatch message was logged. */
     private final Set<Long> missedCacheTypes = ConcurrentHashMap.newKeySet();
 
+    /** Index rebuild aware. */
+    private final IndexRebuildAware idxRebuildAware = new IndexRebuildAware();
+
     /**
      * @param ctx Kernal context.
      */
@@ -454,33 +456,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param fut Exchange future.
      */
     public void beforeExchange(GridDhtPartitionsExchangeFuture fut) {
-        ExchangeActions acts = fut.exchangeActions();
-
-        if (acts != null) {
-            if (!F.isEmpty(acts.cacheStartRequests())) {
-                for (ExchangeActions.CacheActionData actionData : acts.cacheStartRequests())
-                    prepareIndexRebuildFuture(CU.cacheId(actionData.request().cacheName()));
-            }
-            else if (acts.localJoinContext() != null && !F.isEmpty(acts.localJoinContext().caches())) {
-                for (T2<DynamicCacheDescriptor, NearCacheConfiguration> tup : acts.localJoinContext().caches())
-                    prepareIndexRebuildFuture(tup.get1().cacheId());
-            }
-        }
-    }
+        Set<Integer> cacheIds = rebuildIndexCacheIds(fut);
 
-    /**
-     * Creates a new index rebuild future that should be completed later after exchange is done. The future
-     * has to be created before exchange is initialized to guarantee that we will capture a correct future
-     * after activation or restore completes.
-     * If there was an old future for the given ID, it will be completed.
-     *
-     * @param cacheId Cache ID.
-     */
-    public void prepareIndexRebuildFuture(int cacheId) {
-        GridFutureAdapter<Void> old = idxRebuildFuts.put(cacheId, new GridFutureAdapter<>());
+        Set<Integer> rejected = idxRebuildAware.prepareRebuildIndexes(cacheIds, fut.initialVersion());
 
-        if (old != null)
-            old.onDone();
+        if (log.isDebugEnabled()) {
+            log.debug("Preparing features of rebuilding indexes for caches on exchange [requested=" + cacheIds +
+                ", rejected=" + rejected + ']');
+        }
     }
 
     /**
@@ -1963,7 +1946,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     registerCache0(op0.cacheName(), op.schemaName(), cacheInfo, candRes.get1(), false);
                 }
 
-                rebuildIndexesFromHash0(cacheInfo.cacheContext());
+                if (idxRebuildAware.prepareRebuildIndexes(singleton(cacheInfo.cacheId()), null).isEmpty())
+                    rebuildIndexesFromHash0(cacheInfo.cacheContext(), false);
+                else {
+                    if (log.isInfoEnabled())
+                        log.info("Rebuilding indexes for the cache is already in progress: " + cacheInfo.name());
+                }
             }
             else
                 throw new SchemaOperationException("Unsupported operation: " + op);
@@ -2328,9 +2316,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * Rebuilds indexes for provided caches from corresponding hash indexes.
      *
      * @param cctx Cache context.
+     * @param force Force rebuild indexes.
      * @return Future that will be completed when rebuilding is finished.
      */
-    public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx) {
+    public IgniteInternalFuture<?> rebuildIndexesFromHash(GridCacheContext cctx, boolean force) {
         assert nonNull(cctx);
 
         // Indexing module is disabled, nothing to rebuild.
@@ -2357,7 +2346,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         }
 
         try {
-            return rebuildIndexesFromHash0(cctx);
+            return rebuildIndexesFromHash0(cctx, force);
         }
         finally {
             busyLock.leaveBusy();
@@ -2365,10 +2354,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Rebuild indexes for cache.
+     *
      * @param cctx Cache context.
+     * @param force Force rebuild indexes.
      */
-    private IgniteInternalFuture<?> rebuildIndexesFromHash0(GridCacheContext<?, ?> cctx) {
-        IgniteInternalFuture<?> idxFut = idxProc.rebuildIndexesForCache(cctx);
+    private IgniteInternalFuture<?> rebuildIndexesFromHash0(GridCacheContext<?, ?> cctx, boolean force) {
+        IgniteInternalFuture<?> idxFut = idxProc.rebuildIndexesForCache(cctx, force);
 
         return chainIndexRebuildFuture(idxFut, cctx);
     }
@@ -2384,19 +2376,17 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         @Nullable IgniteInternalFuture<?> idxFut,
         GridCacheContext<?, ?> cctx
     ) {
-        int cacheId = cctx.cacheId();
+        GridFutureAdapter<Void> res = idxRebuildAware.indexRebuildFuture(cctx.cacheId());
 
-        if (nonNull(idxFut)) {
-            GridFutureAdapter<Void> res = idxRebuildFuts.computeIfAbsent(cacheId, id -> new GridFutureAdapter<>());
+        assert res != null;
 
+        if (idxFut != null) {
             String cacheInfo = "[name=" + cctx.name() + ", grpName=" + cctx.group().name() + "]";
 
             if (log.isInfoEnabled())
                 log.info("Started indexes rebuilding for cache " + cacheInfo);
 
             idxFut.listen(fut -> {
-                idxRebuildFuts.remove(cacheId, res);
-
                 Throwable err = fut.error();
 
                 if (isNull(err) && log.isInfoEnabled())
@@ -2404,16 +2394,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 else if (!(err instanceof NodeStoppingException))
                     log.error("Failed to rebuild indexes for cache " + cacheInfo, err);
 
-                res.onDone(err);
+                idxRebuildAware.onFinishRebuildIndexes(cctx.cacheId(), err);
             });
 
             return res;
         }
         else {
-            GridFutureAdapter<Void> fut = idxRebuildFuts.remove(cacheId);
-
-            if (fut != null)
-                fut.onDone();
+            idxRebuildAware.onFinishRebuildIndexes(cctx.cacheId(), null);
 
             return null;
         }
@@ -2423,7 +2410,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @return Future that will be completed when indexes for given cache are restored.
      */
     @Nullable public IgniteInternalFuture<?> indexRebuildFuture(int cacheId) {
-        return idxRebuildFuts.get(cacheId);
+        return idxRebuildAware.indexRebuildFuture(cacheId);
     }
 
     /**
@@ -3808,4 +3795,70 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             this.mgr = mgr;
         }
     }
+
+    /**
+     * Removing futures of rebuilding indexes that should have been rebuilt on the exchange.
+     *
+     * @param fut Exchange future.
+     * @param cacheIds Cache ids for which futures will be deleted,
+     *      if {@code null} then ids will be taken from the {@code fut}.
+     */
+    public void removeIndexRebuildFuturesOnExchange(
+        GridDhtPartitionsExchangeFuture fut,
+        @Nullable Set<Integer> cacheIds
+    ) {
+        idxRebuildAware.cancelRebuildIndexesOnExchange(
+            cacheIds != null ? cacheIds : rebuildIndexCacheIds(fut),
+            fut.initialVersion()
+        );
+    }
+
+    /**
+     * Checks that the indexes need to be rebuilt on the exchange.
+     *
+     * @param cacheId Cache id.
+     * @param fut Exchange future.
+     * @return {@code True} if need to rebuild.
+     */
+    public boolean rebuildIndexOnExchange(int cacheId, GridDhtPartitionsExchangeFuture fut) {
+        return idxRebuildAware.rebuildIndexesOnExchange(cacheId, fut.initialVersion());
+    }
+
+    /**
+     * Preparing futures of rebuilding indexes for caches.
+     * The future for the cache will be added only if the previous one is missing or completed.
+     *
+     * @param cacheIds Cache ids.
+     * @return Cache ids for which features have not been added.
+     */
+    public Set<Integer> prepareRebuildIndexes(Set<Integer> cacheIds) {
+        return idxRebuildAware.prepareRebuildIndexes(cacheIds, null);
+    }
+
+    /**
+     * Getting cache ids for which will need to rebuild the indexes on the exchange.
+     *
+     * @param fut Exchange future.
+     * @return Cache ids.
+     */
+    private Set<Integer> rebuildIndexCacheIds(GridDhtPartitionsExchangeFuture fut) {
+        ExchangeActions acts = fut.exchangeActions();
+
+        Set<Integer> cacheIds = emptySet();
+
+        if (acts != null) {
+            if (!F.isEmpty(acts.cacheStartRequests())) {
+                cacheIds = acts.cacheStartRequests().stream()
+                    .map(d -> CU.cacheId(d.request().cacheName()))
+                    .collect(toSet());
+            }
+            else if (acts.localJoinContext() != null && !F.isEmpty(acts.localJoinContext().caches())) {
+                cacheIds = acts.localJoinContext().caches().stream()
+                    .map(t2 -> t2.get1().cacheId())
+                    .collect(toSet());
+            }
+        }
+
+        return cacheIds;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IndexRebuildAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IndexRebuildAware.java
new file mode 100644
index 0000000..0535705
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/IndexRebuildAware.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.emptySet;
+
+/**
+ * Holder actual information about the state of rebuilding indexes.
+ * Thread safe.
+ */
+public class IndexRebuildAware {
+    /**
+     * Futures to track the status of index rebuilds.
+     * Mapping: Cache id -> Future.
+     * Guarded by {@code this}.
+     */
+    private final Map<Integer, GridFutureAdapter<Void>> futs = new HashMap<>();
+
+    /**
+     * Topology versions that will require index rebuilding on exchange.
+     * Mapping: Cache id -> Topology version.
+     * Guarded by {@code this}.
+     */
+    private final Map<Integer, AffinityTopologyVersion> tops = new HashMap<>();
+
+    /**
+     * Getting index rebuild future for the cache.
+     *
+     * @param cacheId Cache id.
+     * @return Future if rebuilding is in progress or {@code null} if not.
+     */
+    @Nullable public synchronized GridFutureAdapter<Void> indexRebuildFuture(int cacheId) {
+        return futs.get(cacheId);
+    }
+
+    /**
+     * Checks that the indexes need to be rebuilt on the exchange.
+     *
+     * @param cacheId Cache id.
+     * @param topVer Topology version.
+     * @return {@code True} if need to rebuild.
+     */
+    public synchronized boolean rebuildIndexesOnExchange(int cacheId, AffinityTopologyVersion topVer) {
+        boolean rebuild = tops.containsKey(cacheId) && topVer.equals(tops.get(cacheId));
+
+        if (rebuild)
+            assert futs.get(cacheId) != null;
+
+        return rebuild;
+    }
+
+    /**
+     * Canceling index rebuilding for caches on the exchange.
+     *
+     * @param cacheIds Cache ids.
+     * @param topVer Topology version.
+     */
+    public void cancelRebuildIndexesOnExchange(Set<Integer> cacheIds, AffinityTopologyVersion topVer) {
+        if (!cacheIds.isEmpty()) {
+            synchronized (this) {
+                for (Integer cacheId : cacheIds) {
+                    if (tops.containsKey(cacheId) && topVer.equals(tops.get(cacheId)))
+                        onFinishRebuildIndexes(cacheId, null);
+                }
+            }
+        }
+    }
+
+    /**
+     * Preparing futures of rebuilding indexes for caches.
+     * The future for the cache will be added only if the previous one is missing or completed.
+     *
+     * @param cacheIds Cache ids.
+     * @param topVer Topology version if rebuilding indexes should be on exchange.
+     * @return Cache ids for which features have not been added.
+     */
+    public Set<Integer> prepareRebuildIndexes(Set<Integer> cacheIds, @Nullable AffinityTopologyVersion topVer) {
+        if (!cacheIds.isEmpty()) {
+            synchronized (this) {
+                Set<Integer> alreadyPrepared = new HashSet<>();
+
+                for (Integer cacheId : cacheIds) {
+                    GridFutureAdapter<Void> prevFut = futs.get(cacheId);
+
+                    if (prevFut == null || prevFut.isDone()) {
+                        if (topVer != null) {
+                            AffinityTopologyVersion prevTopVer = tops.put(cacheId, topVer);
+
+                            assert prevTopVer == null;
+                        }
+
+                        futs.put(cacheId, new GridFutureAdapter<>());
+                    }
+                    else
+                        alreadyPrepared.add(cacheId);
+                }
+
+                return alreadyPrepared;
+            }
+        }
+        else
+            return emptySet();
+    }
+
+    /**
+     * Callback on finish of rebuilding indexes for the cache.
+     *
+     * @param cacheId Cache id.
+     * @param err Error.
+     */
+    public synchronized void onFinishRebuildIndexes(int cacheId, @Nullable Throwable err) {
+        tops.remove(cacheId);
+
+        GridFutureAdapter<Void> rmv = futs.remove(cacheId);
+
+        assert rmv != null;
+
+        rmv.onDone(err);
+    }
+}
+
+
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index c4e3159..a7f4c74 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -81,6 +82,7 @@ import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
 import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -90,6 +92,7 @@ import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeMan
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
 import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -2676,4 +2679,39 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
         else
             assertNotNull(logPtr);
     }
+
+    /**
+     * Initiate an asynchronous forced index rebuild for caches.
+     *
+     * @param n Node.
+     * @param cacheCtxs Cache contexts.
+     * @return Cache contexts for which index rebuilding is not initiated by
+     *      this call because they are already in the process of rebuilding.
+     */
+    protected Collection<GridCacheContext> forceRebuildIndexes(IgniteEx n, GridCacheContext... cacheCtxs) {
+        return n.context().cache().context().database().forceRebuildIndexes(F.asList(cacheCtxs));
+    }
+
+    /**
+     * Getting rebuild index future for the cache.
+     *
+     * @param n Node.
+     * @param cacheId Cache id.
+     * @return Rebuild index future.
+     */
+    @Nullable protected IgniteInternalFuture<?> indexRebuildFuture(IgniteEx n, int cacheId) {
+        return n.context().query().indexRebuildFuture(cacheId);
+    }
+
+    /**
+     * Getting cache metrics.
+     *
+     * @param n Node.
+     * @param cacheName Cache name.
+     * @return Cache metrics.
+     */
+    @Nullable protected CacheMetricsImpl cacheMetrics0(IgniteEx n, String cacheName) {
+        return Optional.ofNullable(n.cachex(cacheName)).map(IgniteInternalCache::context)
+            .map(GridCacheContext::cache).map(GridCacheAdapter::metrics0).orElse(null);
+    }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java
new file mode 100644
index 0000000..8f012a7
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/ForceRebuildIndexTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import org.apache.ignite.client.Person;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cache.query.index.IndexProcessor;
+import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.StopRebuildIndexConsumer;
+import org.apache.ignite.internal.processors.query.IndexRebuildAware;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static java.util.Collections.emptyList;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRowConsumer;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.nodeName;
+import static org.apache.ignite.testframework.GridTestUtils.deleteIndexBin;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+
+/**
+ * Class for testing forced rebuilding of indexes.
+ */
+public class ForceRebuildIndexTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
+
+        stopAllGrids();
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
+
+        stopAllGrids();
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setConsistentId(igniteInstanceName)
+            .setFailureHandler(new StopNodeFailureHandler())
+            .setDataStorageConfiguration(
+                new DataStorageConfiguration()
+                    .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
+            ).setCacheConfiguration(
+                new CacheConfiguration<>(DEFAULT_CACHE_NAME).setIndexedTypes(Integer.class, Person.class)
+            );
+    }
+
+    /**
+     * Checking that a forced rebuild of indexes is possible only after the previous one has finished.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSequentialForceRebuildIndexes() throws Exception {
+        IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
+
+        IgniteEx n = prepareCluster(100);
+
+        GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
+
+        StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
+        addCacheRowConsumer(nodeName(n), cacheCtx.name(), stopRebuildIdxConsumer);
+
+        // The forced rebuild has begun - no rejected.
+        assertEqualsCollections(emptyList(), forceRebuildIndexes(n, cacheCtx));
+
+        IgniteInternalFuture<?> idxRebFut0 = checkStartRebuildIndexes(n, cacheCtx);
+
+        stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+        assertFalse(idxRebFut0.isDone());
+
+        // There will be no forced rebuilding since the previous one has not ended - they will be rejected.
+        assertEqualsCollections(F.asList(cacheCtx), forceRebuildIndexes(n, cacheCtx));
+        assertTrue(idxRebFut0 == indexRebuildFuture(n, cacheCtx.cacheId()));
+
+        stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+
+        idxRebFut0.get(getTestTimeout());
+
+        checkFinishRebuildIndexes(n, cacheCtx, 100);
+        assertEquals(100, stopRebuildIdxConsumer.visitCnt.get());
+
+        stopRebuildIdxConsumer.resetFutures();
+
+        // Forced rebuilding is possible again as the past is over - no rejected.
+        assertEqualsCollections(emptyList(), forceRebuildIndexes(n, cacheCtx));
+
+        IgniteInternalFuture<?> idxRebFut1 = checkStartRebuildIndexes(n, cacheCtx);
+
+        stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+        assertFalse(idxRebFut1.isDone());
+
+        stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+        idxRebFut1.get(getTestTimeout());
+
+        checkFinishRebuildIndexes(n, cacheCtx, 100);
+        assertEquals(200, stopRebuildIdxConsumer.visitCnt.get());
+    }
+
+    /**
+     * Checking that a forced index rebuild can only be performed after an index rebuild after an exchange.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testForceRebuildIndexesAfterExchange() throws Exception {
+        IgniteEx n = prepareCluster(100);
+
+        stopAllGrids();
+        deleteIndexBin(n.context().igniteInstanceName());
+
+        IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
+
+        StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
+        addCacheRowConsumer(nodeName(n), DEFAULT_CACHE_NAME, stopRebuildIdxConsumer);
+
+        n = startGrid(0);
+        n.cluster().state(ACTIVE);
+
+        GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
+
+        stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+
+        IgniteInternalFuture<?> idxRebFut0 = checkStartRebuildIndexes(n, cacheCtx);
+        checkRebuildAfterExchange(n, cacheCtx.cacheId(), true);
+
+        // There will be no forced rebuilding of indexes since it has not ended after the exchange - they will be rejected.
+        assertEqualsCollections(F.asList(cacheCtx), forceRebuildIndexes(n, cacheCtx));
+        assertTrue(idxRebFut0 == indexRebuildFuture(n, cacheCtx.cacheId()));
+        checkRebuildAfterExchange(n, cacheCtx.cacheId(), true);
+
+        stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+
+        idxRebFut0.get(getTestTimeout());
+
+        checkFinishRebuildIndexes(n, cacheCtx, 100);
+        assertEquals(100, stopRebuildIdxConsumer.visitCnt.get());
+        checkRebuildAfterExchange(n, cacheCtx.cacheId(), false);
+
+        stopRebuildIdxConsumer.resetFutures();
+
+        // A forced index rebuild will be triggered because it has ended after the exchange - no rejected.
+        assertEqualsCollections(emptyList(), forceRebuildIndexes(n, cacheCtx));
+
+        IgniteInternalFuture<?> idxRebFut1 = checkStartRebuildIndexes(n, cacheCtx);
+        checkRebuildAfterExchange(n, cacheCtx.cacheId(), false);
+
+        stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+        assertFalse(idxRebFut1.isDone());
+
+        stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+        idxRebFut1.get(getTestTimeout());
+
+        checkFinishRebuildIndexes(n, cacheCtx, 100);
+        checkRebuildAfterExchange(n, cacheCtx.cacheId(), false);
+        assertEquals(200, stopRebuildIdxConsumer.visitCnt.get());
+    }
+
+    /**
+     * Checking that sequential index rebuilds on exchanges will not intersection.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSequentialRebuildIndexesOnExchange() throws Exception {
+        IgniteEx n = prepareCluster(100);
+
+        stopAllGrids();
+        deleteIndexBin(n.context().igniteInstanceName());
+
+        IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
+
+        StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
+        addCacheRowConsumer(nodeName(n), DEFAULT_CACHE_NAME, stopRebuildIdxConsumer);
+
+        n = startGrid(0);
+        n.cluster().state(ACTIVE);
+
+        GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
+
+        stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
+
+        IgniteInternalFuture<?> idxRebFut = checkStartRebuildIndexes(n, cacheCtx);
+
+        // To initiate an exchange.
+        n.getOrCreateCache(DEFAULT_CACHE_NAME + "_1");
+
+        assertTrue(idxRebFut == indexRebuildFuture(n, cacheCtx.cacheId()));
+
+        stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
+
+        idxRebFut.get(getTestTimeout());
+
+        checkFinishRebuildIndexes(n, cacheCtx, 100);
+        assertEquals(100, stopRebuildIdxConsumer.visitCnt.get());
+    }
+
+    /**
+     * Prepare cluster for test.
+     *
+     * @param keys Key count.
+     * @return Coordinator.
+     * @throws Exception If failed.
+     */
+    private IgniteEx prepareCluster(int keys) throws Exception {
+        IgniteEx n = startGrid(0);
+
+        n.cluster().state(ACTIVE);
+
+        for (int i = 0; i < keys; i++)
+            n.cache(DEFAULT_CACHE_NAME).put(i, new Person(i, "name_" + i));
+
+        return n;
+    }
+
+    /**
+     * Checking metrics rebuilding indexes of cache.
+     *
+     * @param n                          Node.
+     * @param cacheName                  Cache name.
+     * @param expIdxRebuildInProgress    The expected status of rebuilding indexes.
+     * @param expIdxRebuildKeysProcessed The expected number of keys processed during index rebuilding.
+     */
+    private void checkCacheMetrics0(
+        IgniteEx n,
+        String cacheName,
+        boolean expIdxRebuildInProgress,
+        long expIdxRebuildKeysProcessed
+    ) {
+        CacheMetricsImpl metrics0 = cacheMetrics0(n, cacheName);
+        assertNotNull(metrics0);
+
+        assertEquals(expIdxRebuildInProgress, metrics0.isIndexRebuildInProgress());
+        assertEquals(expIdxRebuildKeysProcessed, metrics0.getIndexRebuildKeysProcessed());
+    }
+
+    /**
+     * Checking that rebuilding indexes for the cache has started.
+     *
+     * @param n Node.
+     * @param cacheCtx Cache context.
+     * @return Rebuild index future.
+     */
+    private IgniteInternalFuture<?> checkStartRebuildIndexes(IgniteEx n, GridCacheContext<?, ?> cacheCtx) {
+        IgniteInternalFuture<?> idxRebFut = indexRebuildFuture(n, cacheCtx.cacheId());
+
+        assertNotNull(idxRebFut);
+        assertFalse(idxRebFut.isDone());
+
+        checkCacheMetrics0(n, cacheCtx.name(), true, 0);
+
+        return idxRebFut;
+    }
+
+    /**
+     * Checking that the rebuild of indexes for the cache has completed.
+     *
+     * @param n Node.
+     * @param cacheCtx Cache context.
+     * @param expKeys The expected number of keys processed during index rebuilding
+     */
+    private void checkFinishRebuildIndexes(IgniteEx n, GridCacheContext<?, ?> cacheCtx, int expKeys) {
+        assertNull(indexRebuildFuture(n, cacheCtx.cacheId()));
+
+        checkCacheMetrics0(n, cacheCtx.name(), false, expKeys);
+    }
+
+    /**
+     * Checking the contents of the cache in {@code GridQueryProcessor#idxRebuildOnExchange}.
+     * Allows to check if the cache will be marked, that the rebuild for it should be after the exchange.
+     *
+     * @param n Node.
+     * @param cacheId Cache id.
+     * @param expContains Whether a cache is expected.
+     */
+    private void checkRebuildAfterExchange(IgniteEx n, int cacheId, boolean expContains) {
+        IndexRebuildAware idxRebuildAware = getFieldValue(n.context().query(), "idxRebuildAware");
+
+        GridDhtPartitionsExchangeFuture exhFut = n.context().cache().context().exchange().lastTopologyFuture();
+
+        assertEquals(expContains, idxRebuildAware.rebuildIndexesOnExchange(cacheId, exhFut.initialVersion()));
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java
new file mode 100644
index 0000000..65c0b6b
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexesRebuildTaskEx.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.index;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import static java.util.Collections.emptyMap;
+
+/**
+ * Extension {@link IndexesRebuildTask} for the tests.
+ */
+class IndexesRebuildTaskEx extends IndexesRebuildTask {
+    /**
+     * Consumer for cache rows when rebuilding indexes on a node.
+     * Mapping: Node name -> Cache name -> Consumer.
+     */
+    private static final Map<String, Map<String, IgniteThrowableConsumer<CacheDataRow>>> cacheRowConsumer =
+        new ConcurrentHashMap<>();
+
+    /**
+     * A function that should run before preparing to rebuild the cache indexes on a node.
+     * Mapping: Node name -> Cache name -> Function.
+     */
+    private static final Map<String, Map<String, Runnable>> cacheRebuildRunner = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override protected void startRebuild(
+        GridCacheContext cctx,
+        GridFutureAdapter<Void> rebuildIdxFut,
+        SchemaIndexCacheVisitorClosure clo,
+        SchemaIndexOperationCancellationToken cancel
+    ) {
+        super.startRebuild(cctx, rebuildIdxFut, new SchemaIndexCacheVisitorClosure() {
+            /** {@inheritDoc} */
+            @Override public void apply(CacheDataRow row) throws IgniteCheckedException {
+                cacheRowConsumer.getOrDefault(nodeName(cctx), emptyMap())
+                    .getOrDefault(cctx.name(), r -> { }).accept(row);
+
+                clo.apply(row);
+            }
+        }, cancel);
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public IgniteInternalFuture<?> rebuild(GridCacheContext cctx, boolean force) {
+        cacheRebuildRunner.getOrDefault(nodeName(cctx), emptyMap()).getOrDefault(cctx.name(), () -> { }).run();
+
+        return super.rebuild(cctx, force);
+    }
+
+    /**
+     * Cleaning of internal structures. It is recommended to clean at
+     * {@code GridCommonAbstractTest#beforeTest} and {@code GridCommonAbstractTest#afterTest}.
+     *
+     * @param nodeNamePrefix Prefix of node name ({@link GridKernalContext#igniteInstanceName()})
+     *      for which internal structures will be removed, if {@code null} will be removed for all.
+     *
+     * @see GridCommonAbstractTest#getTestIgniteInstanceName()
+     */
+    static void clean(@Nullable String nodeNamePrefix) {
+        if (nodeNamePrefix == null) {
+            cacheRowConsumer.clear();
+            cacheRebuildRunner.clear();
+        }
+        else {
+            cacheRowConsumer.entrySet().removeIf(e -> e.getKey().startsWith(nodeNamePrefix));
+            cacheRebuildRunner.entrySet().removeIf(e -> e.getKey().startsWith(nodeNamePrefix));
+        }
+    }
+
+    /**
+     * Registering a consumer for cache rows when rebuilding indexes on a node.
+     *
+     * @param nodeName The name of the node,
+     *      the value of which will return {@link GridKernalContext#igniteInstanceName()}.
+     * @param cacheName Cache name.
+     * @param c Cache row consumer.
+     *
+     * @see #nodeName(GridKernalContext)
+     * @see #nodeName(IgniteEx)
+     * @see #nodeName(GridCacheContext)
+     * @see GridCommonAbstractTest#getTestIgniteInstanceName(int)
+     */
+    static void addCacheRowConsumer(String nodeName, String cacheName, IgniteThrowableConsumer<CacheDataRow> c) {
+        cacheRowConsumer.computeIfAbsent(nodeName, s -> new ConcurrentHashMap<>()).put(cacheName, c);
+    }
+
+    /**
+     * Registering A function that should run before preparing to rebuild the cache indexes on a node.
+     *
+     * @param nodeName The name of the node,
+     *      the value of which will return {@link GridKernalContext#igniteInstanceName()}.
+     * @param cacheName Cache name.
+     * @param r A function that should run before preparing to rebuild the cache indexes.
+     *
+     * @see #nodeName(GridKernalContext)
+     * @see #nodeName(IgniteEx)
+     * @see #nodeName(GridCacheContext)
+     * @see GridCommonAbstractTest#getTestIgniteInstanceName(int)
+     */
+    static void addCacheRebuildRunner(String nodeName, String cacheName, Runnable r) {
+        cacheRebuildRunner.computeIfAbsent(nodeName, s -> new ConcurrentHashMap<>()).put(cacheName, r);
+    }
+
+    /**
+     * Getting local instance name of the node.
+     *
+     * @param cacheCtx Cache context.
+     * @return Local instance name.
+     */
+    static String nodeName(GridCacheContext cacheCtx) {
+        return nodeName(cacheCtx.kernalContext());
+    }
+
+    /**
+     * Getting local instance name of the node.
+     *
+     * @param n Node.
+     * @return Local instance name.
+     */
+    static String nodeName(IgniteEx n) {
+        return nodeName(n.context());
+    }
+
+    /**
+     * Getting local instance name of the node.
+     *
+     * @param kernalCtx Kernal context.
+     * @return Local instance name.
+     */
+    static String nodeName(GridKernalContext kernalCtx) {
+        return kernalCtx.igniteInstanceName();
+    }
+
+    /**
+     * Consumer for stopping rebuilding indexes of cache.
+     */
+    static class StopRebuildIndexConsumer implements IgniteThrowableConsumer<CacheDataRow> {
+        /** Future to indicate that the rebuilding indexes has begun. */
+        final GridFutureAdapter<Void> startRebuildIdxFut = new GridFutureAdapter<>();
+
+        /** Future to wait to continue rebuilding indexes. */
+        final GridFutureAdapter<Void> finishRebuildIdxFut = new GridFutureAdapter<>();
+
+        /** Counter of visits. */
+        final AtomicLong visitCnt = new AtomicLong();
+
+        /** The maximum time to wait finish future in milliseconds. */
+        final long timeout;
+
+        /**
+         * Constructor.
+         *
+         * @param timeout The maximum time to wait finish future in milliseconds.
+         */
+        StopRebuildIndexConsumer(long timeout) {
+            this.timeout = timeout;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void accept(CacheDataRow row) throws IgniteCheckedException {
+            startRebuildIdxFut.onDone();
+
+            visitCnt.incrementAndGet();
+
+            finishRebuildIdxFut.get(timeout);
+        }
+
+        /**
+         * Resetting internal futures.
+         */
+        void resetFutures() {
+            startRebuildIdxFut.reset();
+            finishRebuildIdxFut.reset();
+        }
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
index d308065..04fe0d7 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/StopRebuildIndexTest.java
@@ -18,8 +18,6 @@
 package org.apache.ignite.internal.processors.cache.index;
 
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.client.Person;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -32,16 +30,13 @@ import org.apache.ignite.internal.cache.query.index.IndexProcessor;
 import org.apache.ignite.internal.managers.indexing.IndexesRebuildTask;
 import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.StopRebuildIndexConsumer;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheCompoundFuture;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFuture;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheStat;
-import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationException;
-import org.apache.ignite.internal.processors.query.schema.SchemaIndexOperationCancellationToken;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -50,10 +45,11 @@ import org.junit.Test;
 
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
 import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRebuildRunner;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.addCacheRowConsumer;
+import static org.apache.ignite.internal.processors.cache.index.IndexesRebuildTaskEx.nodeName;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
-import static org.apache.ignite.testframework.GridTestUtils.deleteIndexBin;
 import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy;
-import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
@@ -64,8 +60,7 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
-        IndexesRebuildTaskEx.cacheRowConsumer.clear();
-        IndexesRebuildTaskEx.cacheRebuildRunner.clear();
+        IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
 
         stopAllGrids();
         cleanPersistenceDir();
@@ -75,8 +70,7 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
-        IndexesRebuildTaskEx.cacheRowConsumer.clear();
-        IndexesRebuildTaskEx.cacheRebuildRunner.clear();
+        IndexesRebuildTaskEx.clean(getTestIgniteInstanceName());
 
         stopAllGrids();
         cleanPersistenceDir();
@@ -186,21 +180,18 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
 
         IgniteEx n = prepareCluster(10);
 
-        GridFutureAdapter<?> f0 = new GridFutureAdapter<>();
-        GridFutureAdapter<?> f1 = new GridFutureAdapter<>();
-
         GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
 
-        IndexesRebuildTaskEx.cacheRebuildRunner.put(
-            DEFAULT_CACHE_NAME, () -> assertNull(internalIndexRebuildFuture(n, cacheCtx.cacheId())));
-
-        IndexesRebuildTaskEx.cacheRowConsumer.put(DEFAULT_CACHE_NAME, row -> {
-            f0.onDone();
+        addCacheRebuildRunner(
+            nodeName(n),
+            cacheCtx.name(),
+            () -> assertNull(internalIndexRebuildFuture(n, cacheCtx.cacheId()))
+        );
 
-            f1.get(getTestTimeout());
-        });
+        StopRebuildIndexConsumer stopRebuildIdxConsumer = new StopRebuildIndexConsumer(getTestTimeout());
+        addCacheRowConsumer(nodeName(n), cacheCtx.name(), stopRebuildIdxConsumer);
 
-        n.context().cache().context().database().forceRebuildIndexes(F.asList(cacheCtx));
+        forceRebuildIndexes(n, cacheCtx);
 
         IgniteInternalFuture<?> rebFut0 = indexRebuildFuture(n, cacheCtx.cacheId());
         assertNotNull(rebFut0);
@@ -208,13 +199,13 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
         SchemaIndexCacheFuture rebFut1 = internalIndexRebuildFuture(n, cacheCtx.cacheId());
         assertNotNull(rebFut1);
 
-        f0.get(getTestTimeout());
+        stopRebuildIdxConsumer.startRebuildIdxFut.get(getTestTimeout());
         assertFalse(rebFut0.isDone());
 
         assertFalse(rebFut1.isDone());
         assertFalse(rebFut1.cancelToken().isCancelled());
 
-        f1.onDone();
+        stopRebuildIdxConsumer.finishRebuildIdxFut.onDone();
 
         rebFut0.get(getTestTimeout());
         rebFut1.get(getTestTimeout());
@@ -226,87 +217,6 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Checks that when starting an index rebuild sequentially,
-     * the previous rebuild will be canceled and a new one will start.
-     *
-     * This behavior should be discussed in IGNITE-14321.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testSequentialRebuildIndexes() throws Exception {
-        IgniteEx n = prepareCluster(10);
-
-        int cacheId = n.cachex(DEFAULT_CACHE_NAME).context().cacheId();
-        int cacheSize = n.cachex(DEFAULT_CACHE_NAME).size();
-
-        stopAllGrids();
-        deleteIndexBin(n.context().igniteInstanceName());
-
-        IndexProcessor.idxRebuildCls = IndexesRebuildTaskEx.class;
-
-        GridFutureAdapter<?> startBlockRebIdxFut0 = new GridFutureAdapter<>();
-        GridFutureAdapter<?> endBlockRebIdxFut0 = new GridFutureAdapter<>();
-
-        IndexesRebuildTaskEx.cacheRowConsumer.put(DEFAULT_CACHE_NAME, row -> {
-            IgniteInternalFuture<?> fut = indexRebuildFuture(grid(0), cacheId);
-            assertNotNull(fut);
-            assertFalse(fut.isDone());
-
-            startBlockRebIdxFut0.onDone();
-            endBlockRebIdxFut0.get(getTestTimeout());
-        });
-
-        n = startGrid(0);
-
-        n.cluster().state(ACTIVE);
-        awaitPartitionMapExchange();
-
-        startBlockRebIdxFut0.get(getTestTimeout());
-
-        IgniteInternalFuture<?> rebIdxFut = indexRebuildFuture(n, cacheId);
-        assertNotNull(rebIdxFut);
-        assertFalse(rebIdxFut.isDone());
-
-        SchemaIndexCacheFuture intRebIdxFut = internalIndexRebuildFuture(n, cacheId);
-        assertNotNull(intRebIdxFut);
-        assertFalse(intRebIdxFut.isDone());
-        assertFalse(intRebIdxFut.cancelToken().isCancelled());
-
-        GridFutureAdapter<IgniteInternalFuture<?>> forceRebIdxFut = new GridFutureAdapter<>();
-
-        IgniteInternalFuture<?> startForceRebIdxFut = runAsync(() -> {
-            IgniteEx n0 = grid(0);
-
-            IndexesRebuildTaskEx.cacheRowConsumer.put(DEFAULT_CACHE_NAME, row -> {
-                forceRebIdxFut.onDone(internalIndexRebuildFuture(n0, cacheId));
-            });
-
-            n0.context().cache().context().database().forceRebuildIndexes(
-                F.asList(n0.cachex(DEFAULT_CACHE_NAME).context()));
-
-            return null;
-        });
-
-        assertTrue(waitForCondition(intRebIdxFut.cancelToken()::isCancelled, getTestTimeout()));
-        endBlockRebIdxFut0.onDone();
-
-        rebIdxFut.get(getTestTimeout());
-
-        assertThrows(
-            log,
-            () -> intRebIdxFut.get(getTestTimeout()),
-            SchemaIndexOperationCancellationException.class,
-            null
-        );
-
-        startForceRebIdxFut.get(getTestTimeout());
-        forceRebIdxFut.get(getTestTimeout()).get(getTestTimeout());
-
-        assertEquals(cacheSize, cacheMetrics0(n, DEFAULT_CACHE_NAME).getIndexRebuildKeysProcessed());
-    }
-
-    /**
      * Restart the rebuild of the indexes, checking that it completes gracefully.
      *
      * @param stopRebuildIndexes Stop index rebuild function.
@@ -322,12 +232,11 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
         int keys = 100_000;
         IgniteEx n = prepareCluster(keys);
 
-        IndexesRebuildTaskEx.cacheRowConsumer.put(DEFAULT_CACHE_NAME, row -> {
-            U.sleep(10);
-        });
-
         GridCacheContext<?, ?> cacheCtx = n.cachex(DEFAULT_CACHE_NAME).context();
-        n.context().cache().context().database().forceRebuildIndexes(F.asList(cacheCtx));
+
+        addCacheRowConsumer(nodeName(n), cacheCtx.name(), row -> U.sleep(10));
+
+        forceRebuildIndexes(n, cacheCtx);
 
         IgniteInternalFuture<?> fut0 = indexRebuildFuture(n, cacheCtx.cacheId());
         assertNotNull(fut0);
@@ -335,7 +244,7 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
         SchemaIndexCacheFuture fut1 = internalIndexRebuildFuture(n, cacheCtx.cacheId());
         assertNotNull(fut1);
 
-        CacheMetricsImpl metrics0 = cacheMetrics0(n, DEFAULT_CACHE_NAME);
+        CacheMetricsImpl metrics0 = cacheMetrics0(n, cacheCtx.name());
         assertTrue(metrics0.isIndexRebuildInProgress());
         assertFalse(fut0.isDone());
 
@@ -399,62 +308,4 @@ public class StopRebuildIndexTest extends GridCommonAbstractTest {
 
         return ((Map<Integer, SchemaIndexCacheFuture>)getFieldValueHierarchy(idxRebuild, "idxRebuildFuts")).get(cacheId);
     }
-
-    /**
-     * Getting rebuild index future for the cache.
-     *
-     * @param n Node.
-     * @param cacheId Cache id.
-     * @return Rebuild index future.
-     */
-    @Nullable private IgniteInternalFuture<?> indexRebuildFuture(IgniteEx n, int cacheId) {
-        return n.context().query().indexRebuildFuture(cacheId);
-    }
-
-    /**
-     * Getting cache metrics.
-     *
-     * @param n Node.
-     * @param cacheName Cache name.
-     * @return Cache metrics.
-     */
-    private CacheMetricsImpl cacheMetrics0(IgniteEx n, String cacheName) {
-        return n.cachex(cacheName).context().cache().metrics0();
-    }
-
-    /**
-     * Extension {@link IndexesRebuildTask} for the test.
-     */
-    private static class IndexesRebuildTaskEx extends IndexesRebuildTask {
-        /** Consumer for cache rows when rebuilding indexes. */
-        private static final Map<String, IgniteThrowableConsumer<CacheDataRow>> cacheRowConsumer =
-            new ConcurrentHashMap<>();
-
-        /** A function that should run before preparing to rebuild the cache indexes. */
-        private static final Map<String, Runnable> cacheRebuildRunner = new ConcurrentHashMap<>();
-
-        /** {@inheritDoc} */
-        @Override protected void startRebuild(
-            GridCacheContext cctx,
-            GridFutureAdapter<Void> rebuildIdxFut,
-            SchemaIndexCacheVisitorClosure clo,
-            SchemaIndexOperationCancellationToken cancel
-        ) {
-            super.startRebuild(cctx, rebuildIdxFut, new SchemaIndexCacheVisitorClosure() {
-                /** {@inheritDoc} */
-                @Override public void apply(CacheDataRow row) throws IgniteCheckedException {
-                    cacheRowConsumer.getOrDefault(cctx.name(), r -> {}).accept(row);
-
-                    clo.apply(row);
-                }
-            }, cancel);
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteInternalFuture<?> rebuild(GridCacheContext cctx) {
-            cacheRebuildRunner.getOrDefault(cctx.name(), () -> {}).run();
-
-            return super.rebuild(cctx);
-        }
-    }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
index f3b09c6..61e513b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsIndexingDefragmentationTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.verify.ValidateIndexesClosure;
 import org.apache.ignite.internal.visor.verify.VisorValidateIndexesJobResult;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -308,8 +309,8 @@ public class IgnitePdsIndexingDefragmentationTest extends IgnitePdsDefragmentati
         private boolean rebuiltIndexes;
 
         /** {@inheritDoc} */
-        @Override public IgniteInternalFuture<?> rebuild(GridCacheContext cctx) {
-            IgniteInternalFuture<?> future = super.rebuild(cctx);
+        @Override @Nullable public IgniteInternalFuture<?> rebuild(GridCacheContext cctx, boolean force) {
+            IgniteInternalFuture<?> future = super.rebuild(cctx, force);
             rebuiltIndexes = future != null;
 
             return future;
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index cdd352b..593f5df 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -21,6 +21,7 @@ import org.apache.ignite.internal.encryption.CacheGroupReencryptionTest;
 import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingAndGroupPutGetPersistenceSelfTest;
 import org.apache.ignite.internal.processors.cache.IgnitePdsSingleNodeWithIndexingPutGetPersistenceTest;
 import org.apache.ignite.internal.processors.cache.index.ClientReconnectWithSqlTableConfiguredTest;
+import org.apache.ignite.internal.processors.cache.index.ForceRebuildIndexTest;
 import org.apache.ignite.internal.processors.cache.index.StopRebuildIndexTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsIndexingDefragmentationTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgniteTcBotInitNewPageTest;
@@ -64,7 +65,8 @@ import org.junit.runners.Suite;
     MultipleParallelCacheDeleteDeadlockTest.class,
     CacheGroupReencryptionTest.class,
     IgnitePdsIndexingDefragmentationTest.class,
-    StopRebuildIndexTest.class
+    StopRebuildIndexTest.class,
+    ForceRebuildIndexTest.class
 })
 public class IgnitePdsWithIndexingTestSuite {
 }