You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/04/15 21:38:22 UTC

[ignite] branch master updated: IGNITE-14528 Fixed race between rebalance and checkpoint which led to assertion error in GridDhtPartitionDemander$RebalanceFuture.ownPartitionsAndFinishFuture. Fixes #9003

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3561e87  IGNITE-14528 Fixed race between rebalance and checkpoint which led to assertion error in GridDhtPartitionDemander$RebalanceFuture.ownPartitionsAndFinishFuture. Fixes #9003
3561e87 is described below

commit 3561e87a3490165c81353195bc2b2f84825226cd
Author: denis-chudov <mo...@gmail.com>
AuthorDate: Fri Apr 16 00:36:36 2021 +0300

    IGNITE-14528 Fixed race between rebalance and checkpoint which led to assertion error in GridDhtPartitionDemander$RebalanceFuture.ownPartitionsAndFinishFuture. Fixes #9003
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../cache/GridCachePartitionExchangeManager.java   |  10 +-
 .../processors/cache/GridCachePreloader.java       |   3 +-
 .../cache/GridCachePreloaderAdapter.java           |   2 +-
 .../processors/cache/GridCacheProcessor.java       |   2 +-
 .../dht/preloader/FinishPreloadingTask.java        |  21 ++-
 .../dht/preloader/GridDhtPartitionDemander.java    |  85 +++++----
 .../dht/preloader/GridDhtPreloader.java            |   4 +-
 .../rebalancing/RebalanceStatisticsTest.java       |   3 +-
 .../persistence/db/wal/IgniteWalRebalanceTest.java | 195 ++++++++++++++++++++-
 9 files changed, 268 insertions(+), 57 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1f27819..64bf456 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1278,9 +1278,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /**
      * @param topVer Topology version.
      * @param grpId Group id.
+     * @param rebalanceId Rebalance id.
      */
-    public void finishPreloading(AffinityTopologyVersion topVer, int grpId) {
-        exchWorker.finishPreloading(topVer, grpId);
+    public void finishPreloading(AffinityTopologyVersion topVer, int grpId, long rebalanceId) {
+        exchWorker.finishPreloading(topVer, grpId, rebalanceId);
     }
 
     /**
@@ -3038,9 +3039,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         /**
          * @param topVer Topology version.
          * @param grpId Group id.
+         * @param rebalanceId Rebalance id.
          */
-        void finishPreloading(AffinityTopologyVersion topVer, int grpId) {
-            futQ.add(new FinishPreloadingTask(topVer, grpId));
+        void finishPreloading(AffinityTopologyVersion topVer, int grpId, long rebalanceId) {
+            futQ.add(new FinishPreloadingTask(topVer, grpId, rebalanceId));
         }
 
         /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index c5b0b4a..4da69ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -237,6 +237,7 @@ public interface GridCachePreloader {
      * Finish preloading for given topology version.
      *
      * @param topVer Topology version.
+     * @param rebalanceId Rebalance id.
      */
-    public void finishPreloading(AffinityTopologyVersion topVer);
+    public void finishPreloading(AffinityTopologyVersion topVer, long rebalanceId);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 783f8e5..31d209b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -205,7 +205,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public void finishPreloading(AffinityTopologyVersion topVer) {
+    @Override public void finishPreloading(AffinityTopologyVersion topVer, long rebalanceId) {
         // No-op.
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 6b5e493..507a35a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -459,7 +459,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             CacheGroupContext grp = cacheGroup(task0.groupId());
 
             if (grp != null)
-                grp.preloader().finishPreloading(task0.topologyVersion());
+                grp.preloader().finishPreloading(task0.topologyVersion(), task0.rebalanceId());
         }
         else
             U.warn(log, "Unsupported custom exchange task: " + task);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/FinishPreloadingTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/FinishPreloadingTask.java
index b0ae758..a931259 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/FinishPreloadingTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/FinishPreloadingTask.java
@@ -24,22 +24,22 @@ import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerT
  * A task for finishing preloading future in exchange worker thread.
  */
 public class FinishPreloadingTask implements CachePartitionExchangeWorkerTask {
-    /**
-     * Topology version.
-     */
+    /** Topology version. */
     private final AffinityTopologyVersion topVer;
 
-    /**
-     * Group id.
-     */
+    /** Group id. */
     private final int grpId;
 
+    /** Rebalance id. */
+    private final long rebalanceId;
+
     /**
      * @param topVer Topology version.
      */
-    public FinishPreloadingTask(AffinityTopologyVersion topVer, int grpId) {
+    public FinishPreloadingTask(AffinityTopologyVersion topVer, int grpId, long rebalanceId) {
         this.grpId = grpId;
         this.topVer = topVer;
+        this.rebalanceId = rebalanceId;
     }
 
     /**
@@ -62,4 +62,11 @@ public class FinishPreloadingTask implements CachePartitionExchangeWorkerTask {
     public int groupId() {
         return grpId;
     }
+
+    /**
+     * @return Rebalance id.
+     */
+    public long rebalanceId() {
+        return rebalanceId;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 10b9969..c012b5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -1323,7 +1323,7 @@ public class GridDhtPartitionDemander {
 
             if (isDone()) {
                 assert !result() : "Rebalance future was done, but partitions never requested [grp="
-                    + grp.cacheOrGroupName() + ", topVer=" + topologyVersion() + "]";
+                    + grp.cacheOrGroupName() + ", topVer=" + topologyVersion() + ", rebalanceId=" + rebalanceId + ']';
 
                 return;
             }
@@ -1348,7 +1348,7 @@ public class GridDhtPartitionDemander {
 
                     U.log(log, "Prepared rebalancing [grp=" + grp.cacheOrGroupName()
                         + ", mode=" + cfg.getRebalanceMode() + ", supplier=" + node.id() + ", partitionsCount=" + parts.size()
-                        + ", topVer=" + topologyVersion() + "]");
+                        + ", topVer=" + topologyVersion() + ", rebalanceId=" + rebalanceId + ']');
                 }
 
                 if (!parts.isEmpty()) {
@@ -1415,7 +1415,8 @@ public class GridDhtPartitionDemander {
                         ", topVer=" + topVer +
                         ", supplier=" + supplierNode.id() +
                         ", fullPartitions=" + S.compact(parts.fullSet()) +
-                        ", histPartitions=" + S.compact(parts.historicalSet()) + ']');
+                        ", histPartitions=" + S.compact(parts.historicalSet()) +
+                        ", rebalanceId=" + rebalanceId + ']');
 
                 ctx.io().sendOrderedMessage(supplierNode, msg.topic(),
                     msg.convertIfNeeded(supplierNode.version()), grp.ioPolicy(), msg.timeout());
@@ -1443,36 +1444,11 @@ public class GridDhtPartitionDemander {
             }
         }
 
-        /** {@inheritDoc} */
-        @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
-            if (super.onDone(res, err)) {
-                if (!isInitial()) {
-                    sendRebalanceFinishedEvent();
-
-                    if (log.isInfoEnabled())
-                        log.info("Completed rebalance future: " + this);
-
-                    // Complete sync future only if rebalancing was not cancelled.
-                    if (res && !grp.preloader().syncFuture().isDone())
-                        ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
-
-                    if (isChainFinished())
-                        onChainFinished();
-                }
-
-                if (next != null)
-                    next.requestPartitions(); // Process next group.
-
-                return true;
-            }
-
-            return false;
-        }
-
         /**
          * @param topVer Rebalancing topology version.
+         * @param rebalanceId Rebalance id.
          */
-        public void ownPartitionsAndFinishFuture(AffinityTopologyVersion topVer) {
+        public void ownPartitionsAndFinishFuture(AffinityTopologyVersion topVer, long rebalanceId) {
             // Ignore all client exchanges.
             // Note rebalancing may be started on client topology version if forced reassign was queued after client
             // topology exchange.
@@ -1489,6 +1465,16 @@ public class GridDhtPartitionDemander {
                 return;
             }
 
+            if (this.rebalanceId != rebalanceId) {
+                if (log.isInfoEnabled()) {
+                    log.info("Received preloading task with wrong rebalanceId, ignoring it [rebalanceId=" +
+                        this.rebalanceId + ", finishPreloadingTaskRebalanceId=" + rebalanceId + ", grpId=" +
+                        grp.groupId() + ", topVer=" + topVer + ']');
+                }
+
+                return;
+            }
+
             if (onDone(true, null)) {
                 assert state == RebalanceFutureState.STARTED : this;
 
@@ -1500,7 +1486,7 @@ public class GridDhtPartitionDemander {
 
                 if (log.isDebugEnabled())
                     log.debug("Partitions have been scheduled to resend [reason=" +
-                        "Group durability restored, name=" + grp.cacheOrGroupName() + "]");
+                        "Group durability restored, name=" + grp.cacheOrGroupName() + ']');
 
                 ctx.exchange().refreshPartitions(Collections.singleton(grp));
             }
@@ -1534,11 +1520,17 @@ public class GridDhtPartitionDemander {
         }
 
         /**
-         * Cancel running future or mark for cancel {@code RebalanceFutureState#MARK_CANCELLED}.
+         * Cancel running future or mark for cancel {@code RebalanceFutureState#MARK_CANCELLED}, if it not started yet.
          */
         private void tryCancel() {
-            if (STATE_UPD.compareAndSet(this, RebalanceFutureState.INIT, RebalanceFutureState.MARK_CANCELLED))
+            if (STATE_UPD.compareAndSet(this, RebalanceFutureState.INIT, RebalanceFutureState.MARK_CANCELLED)) {
+                U.log(log, "Rebalancing marked as cancelled [grp=" + grp.cacheOrGroupName() +
+                    ", topVer=" + topologyVersion() + ", rebalanceId=" + rebalanceId + ']');
+
+                // Don't call #cancel() for this future from INIT state, as it will trigger #requestPartitions()
+                // for #next future.
                 return;
+            }
 
             cancel();
         }
@@ -1559,7 +1551,7 @@ public class GridDhtPartitionDemander {
                         return true;
 
                     U.log(log, "Cancelled rebalancing from all nodes [grp=" + grp.cacheOrGroupName() +
-                        ", topVer=" + topologyVersion() + "]");
+                        ", topVer=" + topologyVersion() + ", rebalanceId=" + rebalanceId + ']');
 
                     if (!ctx.kernalContext().isStopping()) {
                         for (UUID nodeId : remaining.keySet())
@@ -1590,6 +1582,23 @@ public class GridDhtPartitionDemander {
                     lastCancelledTime.accumulateAndGet(System.currentTimeMillis(), Math::max);
                 else if (startTime != -1)
                     endTime = System.currentTimeMillis();
+
+                if (log != null && log.isInfoEnabled() && !isInitial())
+                    log.info("Completed rebalance future: " + this + (isFailed() ? ", error=" + err : ""));
+
+                if (!isInitial()) {
+                    sendRebalanceFinishedEvent();
+
+                    // Complete sync future only if rebalancing was not cancelled.
+                    if (res && !grp.preloader().syncFuture().isDone())
+                        ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
+
+                    if (isChainFinished())
+                        onChainFinished();
+                }
+
+                if (next != null)
+                    next.requestPartitions(); // Process next group.
             }
 
             return byThisCall;
@@ -1757,7 +1766,7 @@ public class GridDhtPartitionDemander {
                     cp.onStateChanged(PAGE_SNAPSHOT_TAKEN, () -> grp.localWalEnabled(true, false));
 
                     cp.onStateChanged(FINISHED, () -> {
-                        ctx.exchange().finishPreloading(topVer, grp.groupId());
+                        ctx.exchange().finishPreloading(topVer, grp.groupId(), rebalanceId);
                     });
                 }
                 else {
@@ -2001,7 +2010,7 @@ public class GridDhtPartitionDemander {
                     ", fullEntries=" + fullEntries +
                     ", fullBytesRcvd=" + U.humanReadableByteCount(fullBytes) +
                     ", topVer=" + topologyVersion() +
-                    ", progress=" + (routines - remainingRoutines) + "/" + routines + "]");
+                    ", progress=" + (routines - remainingRoutines) + "/" + routines + ']');
             }
             catch (Throwable t) {
                 U.error(log, "Completed " + ((remainingRoutines == 0 ? "(final) " : "") +
@@ -2021,9 +2030,9 @@ public class GridDhtPartitionDemander {
     /**
      * @param topVer Topopolog verion.
      */
-    void finishPreloading(AffinityTopologyVersion topVer) {
+    void finishPreloading(AffinityTopologyVersion topVer, long rebalanceId) {
         assert !rebalanceFut.isInitial() : topVer;
 
-        rebalanceFut.ownPartitionsAndFinishFuture(topVer);
+        rebalanceFut.ownPartitionsAndFinishFuture(topVer, rebalanceId);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 925f7a8..0e6222d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -558,12 +558,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void finishPreloading(AffinityTopologyVersion topVer) {
+    @Override public void finishPreloading(AffinityTopologyVersion topVer, long rebalanceId) {
         if (!enterBusy())
             return;
 
         try {
-            demander.finishPreloading(topVer);
+            demander.finishPreloading(topVer, rebalanceId);
         }
         finally {
             leaveBusy();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/RebalanceStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/RebalanceStatisticsTest.java
index 9e02a72..304eea5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/RebalanceStatisticsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/RebalanceStatisticsTest.java
@@ -51,6 +51,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
+import static java.util.Arrays.asList;
 import static java.util.Objects.requireNonNull;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
@@ -180,7 +181,7 @@ public class RebalanceStatisticsTest extends GridCommonAbstractTest {
                 };
 
                 assertTrue(
-                    supplierMsgs.toString(),
+                    "msgs=" + supplierMsgs.toString() + ", checVals=" + asList(checVals).toString(),
                     supplierMsgs.stream().anyMatch(s -> Stream.of(checVals).allMatch(s::contains))
                 );
             }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 7567705..f43da2f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -31,6 +31,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -64,7 +65,11 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.WalTestUtils;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
@@ -75,6 +80,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAhea
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -90,8 +96,11 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
 
 /**
  * Historical WAL rebalance base test.
@@ -140,8 +149,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
             .setDefaultDataRegionConfiguration(
                 new DataRegionConfiguration()
                     .setPersistenceEnabled(true)
-                    .setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE)
-            );
+                    .setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE));
 
         cfg.setDataStorageConfiguration(dbCfg);
 
@@ -978,6 +986,189 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Tests that owning partitions (that are trigged by rebalance future) cannot be mapped to a new rebalance future
+     * that was created by RebalanceReassignExchangeTask.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceReassignAndOwnPartitions() throws Exception {
+        backups = 3;
+
+        IgniteEx supplier1 = startGrid(0);
+        IgniteEx supplier2 = startGrid(1);
+        IgniteEx demander = startGrid(2);
+
+        supplier1.cluster().state(ACTIVE);
+
+        String cacheName1 = "test-cache-1";
+        String cacheName2 = "test-cache-2";
+
+        IgniteCache<Integer, IndexedObject> c1 = supplier1.getOrCreateCache(
+            new CacheConfiguration<Integer, IndexedObject>(cacheName1)
+                .setBackups(backups)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT))
+                .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setRebalanceOrder(10));
+
+        IgniteCache<Integer, IndexedObject> c2 = supplier1.getOrCreateCache(
+            new CacheConfiguration<Integer, IndexedObject>(cacheName2)
+                .setBackups(backups)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT))
+                .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setRebalanceOrder(20));
+
+        // Fill initial data.
+        final int entryCnt = PARTS_CNT * 200;
+        final int preloadEntryCnt = PARTS_CNT * 400;
+
+        for (int k = 0; k < preloadEntryCnt; k++) {
+            c1.put(k, new IndexedObject(k));
+
+            c2.put(k, new IndexedObject(k));
+        }
+
+        forceCheckpoint();
+
+        stopGrid(2);
+
+        // Rewrite data to trigger further rebalance.
+        // Make sure that all partitions will be updated in order to disable wal locally for preloading.
+        // Updating entryCnt keys allows to trigger historical rebalance.
+        // This is an easy way to emulate missing partitions on the first rebalance.
+        for (int i = 0; i < entryCnt; i++)
+            c1.put(i, new IndexedObject(i));
+
+        // Full rebalance for the cacheName2.
+        for (int i = 0; i < preloadEntryCnt; i++)
+            c2.put(i, new IndexedObject(i));
+
+        // Delay rebalance process for specified groups.
+        blockMsgPred = (node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+                return msg0.groupId() == CU.cacheId(cacheName1) || msg0.groupId() == CU.cacheId(cacheName2);
+            }
+
+            return false;
+        };
+
+        // Emulate missing partitions and trigger RebalanceReassignExchangeTask which should re-trigger a new rebalance.
+        FailingIOFactory ioFactory = injectFailingIOFactory(supplier1);
+
+        demander = startGrid(2);
+
+        TestRecordingCommunicationSpi demanderSpi = TestRecordingCommunicationSpi.spi(grid(2));
+
+        // Wait until demander starts rebalancning.
+        demanderSpi.waitForBlocked();
+
+        // Need to start a client node in order to block RebalanceReassignExchangeTask (and do not change the affinity)
+        // until cacheName2 triggers a checkpoint after rebalancing.
+        CountDownLatch blockClientJoin = new CountDownLatch(1);
+        CountDownLatch unblockClientJoin = new CountDownLatch(1);
+
+        demander.context().cache().context().exchange().registerExchangeAwareComponent(new PartitionsExchangeAware() {
+            @Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                blockClientJoin.countDown();
+
+                try {
+                    if (!unblockClientJoin.await(getTestTimeout(), MILLISECONDS))
+                        throw new IgniteException("Failed to wait for client node joinning the cluster.");
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteException("Unexpected exception.", e);
+                }
+            }
+        });
+
+        startClientGrid(4);
+
+        // Wait for a checkpoint after rebalancing cacheName2.
+        CountDownLatch blockCheckpoint = new CountDownLatch(1);
+        CountDownLatch unblockCheckpoint = new CountDownLatch(1);
+
+        ((GridCacheDatabaseSharedManager) demander
+            .context()
+            .cache()
+            .context()
+            .database())
+            .addCheckpointListener(new CheckpointListener() {
+                /** {@inheritDoc} */
+                @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
+                    if (!ctx.progress().reason().contains(String.valueOf(CU.cacheId(cacheName2))))
+                        return;
+
+                    blockCheckpoint.countDown();
+
+                    try {
+                        if (!unblockCheckpoint.await(getTestTimeout(), MILLISECONDS))
+                            throw new IgniteCheckedException("Failed to wait for unblocking checkpointer.");
+                    }
+                    catch (InterruptedException e) {
+                        throw new IgniteCheckedException("Unexpected exception", e);
+                    }
+                }
+
+                /** {@inheritDoc} */
+                @Override public void beforeCheckpointBegin(Context ctx) throws IgniteCheckedException {
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException {
+                }
+            });
+
+        // Unblock the first rebalance.
+        demanderSpi.stopBlock();
+
+        // Wait for start of the checkpoint after rebalancing cacheName2.
+        assertTrue("Failed to wait for checkpoint.", blockCheckpoint.await(getTestTimeout(), MILLISECONDS));
+
+        // Block the second rebalancing.
+        demanderSpi.blockMessages((node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+                return msg0.groupId() == CU.cacheId(cacheName1);
+            }
+
+            return false;
+        });
+
+        ioFactory.reset();
+
+        // Let's unblock client exchange and, therefore, handling of RebalanceReassignExchangeTask,
+        // which is already scheduled.
+        unblockClientJoin.countDown();
+
+        // Wait for starting the second rebalance (new chain of rebalance futures should be created at this point).
+        demanderSpi.waitForBlocked();
+
+        GridFutureAdapter checkpointFut = ((GridCacheDatabaseSharedManager) demander
+            .context()
+            .cache()
+            .context()
+            .database())
+            .getCheckpointer()
+            .currentProgress()
+            .futureFor(FINISHED);
+
+        // Unblock checkpointer.
+        unblockCheckpoint.countDown();
+
+        assertTrue(
+            "Failed to wait for a checkpoint.",
+            GridTestUtils.waitForCondition(() -> checkpointFut.isDone(), getTestTimeout()));
+
+        // Well, there is a race between we unblock rebalance and the current checkpoint executes all its listeners.
+        demanderSpi.stopBlock();
+
+        awaitPartitionMapExchange(false, true, null);
+    }
+
+    /**
      * Injects a new instance of FailingIOFactory into wal manager for the given supplier node.
      * This allows to break historical rebalance fo=rom the supplier.
      *