You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2020/05/15 10:44:26 UTC

[ignite] branch master updated: IGNITE-11147 Rebalancing is not cancelled if topologies are compatible. - Fixes #7428.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new a6d3f52  IGNITE-11147 Rebalancing is not cancelled if topologies are compatible. - Fixes #7428.
a6d3f52 is described below

commit a6d3f52e41dfe602a3654cc67c59f460fd0dfd4e
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri May 15 13:35:13 2020 +0300

    IGNITE-11147 Rebalancing is not cancelled if topologies are compatible. - Fixes #7428.
    
    Signed-off-by: Aleksei Scherbakov <as...@apache.org>
---
 .../org/apache/ignite/IgniteSystemProperties.java  |   9 +
 .../affinity/GridAffinityAssignmentCache.java      |   4 +-
 .../cache/GridCachePartitionExchangeManager.java   | 108 +--
 .../processors/cache/GridCachePreloader.java       |  16 +-
 .../cache/GridCachePreloaderAdapter.java           |  11 +-
 .../cache/IgniteCacheOffheapManagerImpl.java       |   2 +-
 .../internal/processors/cache/WalStateManager.java | 178 +++--
 .../preloader/GridDhtPartitionDemandMessage.java   |  29 +-
 .../dht/preloader/GridDhtPartitionDemander.java    | 777 ++++++++++++---------
 .../preloader/GridDhtPartitionsExchangeFuture.java |  32 +-
 .../preloader/GridDhtPartitionsFullMessage.java    |   2 +-
 .../dht/preloader/GridDhtPreloader.java            |  62 +-
 .../dht/preloader/GridDhtPreloaderAssignments.java |  17 +-
 .../dht/topology/GridClientPartitionTopology.java  |   4 +-
 .../dht/topology/GridDhtPartitionTopologyImpl.java |  71 +-
 .../ignite/cache/BreakRebalanceChainTest.java      | 176 +++++
 .../ignite/cache/NotOptimizedRebalanceTest.java    | 288 ++++++++
 .../ignite/cache/RebalanceCancellationTest.java    | 495 +++++++++++++
 .../ignite/cache/affinity/PendingExchangeTest.java | 343 +++++++++
 .../IgniteCacheConfigurationTemplateTest.java      |   2 +-
 .../CacheLateAffinityAssignmentTest.java           |  38 +-
 ...gniteCacheClientNodePartitionsExchangeTest.java |  12 +-
 .../dht/topology/EvictPartitionInLogTest.java      |  48 +-
 ...ocalWalModeChangeDuringRebalancingSelfTest.java |   3 -
 .../persistence/db/wal/IgniteWalRebalanceTest.java | 171 ++---
 .../snapshot/IgniteClusterSnapshotSelfTest.java    |   7 +-
 .../TxCrossCacheMapOnInvalidTopologyTest.java      |   3 +-
 .../platform/PlatformCacheAffinityVersionTask.java |  81 +++
 .../junits/common/GridCommonAbstractTest.java      |   1 -
 .../ignite/testsuites/IgniteCacheTestSuite6.java   |   2 +
 .../ignite/testsuites/IgnitePdsTestSuite4.java     |   6 +
 .../near/IgniteCacheQueryNodeRestartSelfTest.java  |  22 +-
 ...ngingBaselineCacheQueryNodeRestartSelfTest.java |   5 +-
 ...ableBaselineCacheQueryNodeRestartsSelfTest.java |  34 +
 .../Compute/ComputeApiTest.cs                      |  12 +-
 .../EventsTestLocalListeners.cs                    |  41 +-
 .../Apache.Ignite.Core.Tests/TestUtils.Common.cs   |  43 ++
 37 files changed, 2461 insertions(+), 694 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 66c3811..ee5b2d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -974,6 +974,15 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_DISABLE_WAL_DURING_REBALANCING = "IGNITE_DISABLE_WAL_DURING_REBALANCING";
 
     /**
+     * When property is set {@code false} each next exchange will try to compare with previous.
+     * If last rebalance is equivalent with new possible one, new rebalance does not trigger.
+     * Set the property {@code true} and each exchange will try to trigger new rebalance.
+     *
+     * Default is {@code false}.
+     */
+    public static final String IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION = "IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION";
+
+    /**
      * Sets timeout for TCP client recovery descriptor reservation.
      */
     public static final String IGNITE_NIO_RECOVERY_DESCRIPTOR_RESERVATION_TIMEOUT =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index f2037e5..1c8db77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -18,12 +18,12 @@
 package org.apache.ignite.internal.processors.affinity;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
@@ -997,7 +997,7 @@ public class GridAffinityAssignmentCache {
     /**
      * @return All initialized versions.
      */
-    public Collection<AffinityTopologyVersion> cachedVersions() {
+    public NavigableSet<AffinityTopologyVersion> cachedVersions() {
         return affCache.keySet();
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index b1ebd2c..d0bdd6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -25,7 +25,6 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -86,6 +85,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Cac
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandLegacyMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
@@ -231,14 +231,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private final ConcurrentNavigableMap<AffinityTopologyVersion, AffinityTopologyVersion> lastAffTopVers =
         new ConcurrentSkipListMap<>();
 
-    /**
-     * Latest started rebalance topology version but possibly not finished yet. Value {@code NONE}
-     * means that previous rebalance is undefined and the new one should be initiated.
-     *
-     * Should not be used to determine latest rebalanced topology.
-     */
-    private volatile AffinityTopologyVersion rebTopVer = NONE;
-
     /** */
     private GridFutureAdapter<?> reconnectExchangeFut;
 
@@ -1020,13 +1012,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @return Latest rebalance topology version or {@code NONE} if there is no info.
-     */
-    public AffinityTopologyVersion rebalanceTopologyVersion() {
-        return rebTopVer;
-    }
-
-    /**
      * @return Last initialized topology future.
      */
     public GridDhtPartitionsExchangeFuture lastTopologyFuture() {
@@ -1944,8 +1929,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-                    if (grp != null &&
-                        grp.localStartVersion().compareTo(entry.getValue().topologyVersion()) > 0)
+                    if (grp != null && !grp.topology().initialized())
                         continue;
 
                     GridDhtPartitionTopology top = null;
@@ -3341,13 +3325,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 if (grp.isLocal())
                                     continue;
 
-                                if (grp.preloader().rebalanceRequired(rebTopVer, exchFut))
-                                    rebTopVer = NONE;
-
                                 changed |= grp.topology().afterExchange(exchFut);
                             }
 
-                            if (!cctx.kernalContext().clientNode() && changed && !hasPendingServerExchange()) {
+                            if (!cctx.kernalContext().clientNode() && changed) {
                                 if (log.isDebugEnabled())
                                     log.debug("Refresh partitions due to mapping was changed");
 
@@ -3355,11 +3336,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             }
                         }
 
-                        // Schedule rebalance if force rebalance or force reassign occurs.
-                        if (exchFut == null)
-                            rebTopVer = NONE;
-
-                        if (!cctx.kernalContext().clientNode() && rebTopVer.equals(NONE)) {
+                        if (rebalanceRequired(exchFut)) {
                             if (rebalanceDelay > 0)
                                 U.sleep(rebalanceDelay);
 
@@ -3393,7 +3370,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         busy = false;
                     }
 
-                    if (assignsMap != null && rebTopVer.equals(NONE)) {
+                    if (!F.isEmpty(assignsMap)) {
                         int size = assignsMap.size();
 
                         NavigableMap<CacheRebalanceOrder, List<Integer>> orderMap = new TreeMap<>();
@@ -3413,11 +3390,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             orderMap.get(order).add(grpId);
                         }
 
-                        Runnable r = null;
+                        RebalanceFuture r = null;
 
-                        List<String> rebList = new LinkedList<>();
+                        GridCompoundFuture<Boolean, Boolean> rebFut = new GridCompoundFuture<>();
 
-                        boolean assignsCancelled = false;
+                        ArrayList<String> rebList = new ArrayList<>(size);
 
                         GridCompoundFuture<Boolean, Boolean> forcedRebFut = null;
 
@@ -3430,14 +3407,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                                 GridDhtPreloaderAssignments assigns = assignsMap.get(grpId);
 
-                                if (assigns != null)
-                                    assignsCancelled |= assigns.cancelled();
-
-                                Runnable cur = grp.preloader().addAssignments(assigns,
+                                RebalanceFuture cur = grp.preloader().addAssignments(assigns,
                                     forcePreload,
                                     cnt,
                                     r,
-                                    forcedRebFut);
+                                    forcedRebFut,
+                                    rebFut);
 
                                 if (cur != null) {
                                     rebList.add(grp.cacheOrGroupName());
@@ -3447,42 +3422,45 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             }
                         }
 
+                        rebFut.markInitialized();
+
                         if (forcedRebFut != null)
                             forcedRebFut.markInitialized();
 
-                        if (assignsCancelled || hasPendingServerExchange()) {
-                            U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
-                                "[top=" + resVer + ", evt=" + exchId.discoveryEventName() +
-                                ", node=" + exchId.nodeId() + ']');
-                        }
-                        else if (r != null) {
+                        if (r != null) {
                             Collections.reverse(rebList);
 
-                            U.log(log, "Rebalancing scheduled [order=" + rebList +
-                                ", top=" + resVer + ", force=" + (exchFut == null) +
-                                ", evt=" + exchId.discoveryEventName() +
-                                ", node=" + exchId.nodeId() + ']');
-
-                            rebTopVer = resVer;
+                            RebalanceFuture finalR = r;
 
+                            // Waits until compatible rebalances are finished.
                             // Start rebalancing cache groups chain. Each group will be rebalanced
                             // sequentially one by one e.g.:
                             // ignite-sys-cache -> cacheGroupR1 -> cacheGroupP2 -> cacheGroupR3
-                            r.run();
+                            rebFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+                                @Override public void apply(IgniteInternalFuture<Boolean> f) {
+                                    U.log(log, "Rebalancing scheduled [order=" + rebList +
+                                        ", top=" + finalR.topologyVersion() +
+                                        ", evt=" + exchId.discoveryEventName() +
+                                        ", node=" + exchId.nodeId() + ']');
+
+                                    finalR.requestPartitions();
+                                }
+                            });
                         }
-                        else
+                        else {
                             U.log(log, "Skipping rebalancing (nothing scheduled) " +
                                 "[top=" + resVer + ", force=" + (exchFut == null) +
                                 ", evt=" + exchId.discoveryEventName() +
                                 ", node=" + exchId.nodeId() + ']');
+                        }
                     }
-                    else
+                    else {
                         U.log(log, "Skipping rebalancing (no affinity changes) " +
                             "[top=" + resVer +
-                            ", rebTopVer=" + rebTopVer +
                             ", evt=" + exchId.discoveryEventName() +
                             ", evtNode=" + exchId.nodeId() +
                             ", client=" + cctx.kernalContext().clientNode() + ']');
+                    }
                 }
                 catch (IgniteInterruptedCheckedException e) {
                     throw e;
@@ -3511,6 +3489,32 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
             }
         }
+
+        /**
+         * Rebalance is not required on a client node and is always required when the exchange future is null.
+         * In other cases, this method checks all caches and decides whether rebalancing is required or not
+         * for the specific exchange.
+         *
+         * @param exchFut Exchange future.
+         * @return {@code True} if rebalance is required at least for one of cache groups.
+         */
+        private boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) {
+            if (cctx.kernalContext().clientNode())
+                return false;
+
+            if (exchFut == null)
+                return true;
+
+            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                if (grp.isLocal())
+                    continue;
+
+                if (grp.preloader().rebalanceRequired(exchFut))
+                    return true;
+            }
+
+            return false;
+        }
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 1b6aca1..c70f86b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -65,11 +66,10 @@ public interface GridCachePreloader {
     public void onInitialExchangeComplete(@Nullable Throwable err);
 
     /**
-     * @param rebTopVer Previous rebalance topology version or {@code NONE} if there is no info.
      * @param exchFut Completed exchange future.
      * @return {@code True} if rebalance should be started (previous will be interrupted).
      */
-    public boolean rebalanceRequired(AffinityTopologyVersion rebTopVer, GridDhtPartitionsExchangeFuture exchFut);
+    public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut);
 
     /**
      * @param exchId Exchange ID.
@@ -85,15 +85,17 @@ public interface GridCachePreloader {
      * @param assignments Assignments to add.
      * @param forcePreload {@code True} if preload requested by {@link ForceRebalanceExchangeTask}.
      * @param rebalanceId Rebalance id created by exchange thread.
-     * @param next Runnable responsible for cache rebalancing chain.
+     * @param next Rebalance's future follows after the current one.
      * @param forcedRebFut External future for forced rebalance.
-     * @return Rebalancing runnable.
+     * @param compatibleRebFut Future for waiting for compatible rebalances.
+     * @return Future if rebalance was planned or null.
      */
-    public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
+    public RebalanceFuture addAssignments(GridDhtPreloaderAssignments assignments,
         boolean forcePreload,
         long rebalanceId,
-        Runnable next,
-        @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut);
+        final RebalanceFuture next,
+        @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut,
+        GridCompoundFuture<Boolean, Boolean> compatibleRebFut);
 
     /**
      * @return Future which will complete when preloader is safe to use.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index ff5e321..b382ba4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
@@ -138,8 +139,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean rebalanceRequired(AffinityTopologyVersion rebTopVer,
-        GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) {
         return true;
     }
 
@@ -150,11 +150,12 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
+    @Override public RebalanceFuture addAssignments(GridDhtPreloaderAssignments assignments,
         boolean forcePreload,
         long rebalanceId,
-        Runnable next,
-        @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut) {
+        RebalanceFuture next,
+        @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut,
+        GridCompoundFuture<Boolean, Boolean> compatibleRebFut) {
         return null;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index daf0f26..2bc4fe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1630,7 +1630,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 U.error(log, "Failed to update partition counter. " +
                     "Most probably a node with most actual data is out of topology or data streamer is used " +
                     "in preload mode (allowOverride=false) concurrently with cache transactions [grpName=" +
-                    grp.name() + ", partId=" + partId + ']', e);
+                    grp.cacheOrGroupName() + ", partId=" + partId + ']', e);
 
                 if (failNodeOnPartitionInconsistency)
                     ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index 3164a1e..82ce336 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointProgress;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
@@ -121,7 +122,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
     private boolean disconnected;
 
     /** Holder for groups with temporary disabled WAL. */
-    private volatile TemporaryDisabledWal tmpDisabledWal;
+    private final TemporaryDisabledWal tmpDisabledWal = new TemporaryDisabledWal();
 
     /** */
     private volatile WALDisableContext walDisableContext;
@@ -406,14 +407,21 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
      * in OWNING state if such feature is enabled.
      *
      * @param topVer Topology version.
-     * @param changedBaseline The exchange is caused by Baseline Topology change.
+     * @param exchFut Exchange future.
      */
-    public void changeLocalStatesOnExchangeDone(AffinityTopologyVersion topVer, boolean changedBaseline) {
-        if (changedBaseline
+    public void changeLocalStatesOnExchangeDone(AffinityTopologyVersion topVer, GridDhtPartitionsExchangeFuture exchFut) {
+        if (exchFut.changedBaseline()
             && IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED)
             || !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, true))
             return;
 
+        ExchangeActions actions = exchFut.exchangeActions();
+
+        if (actions != null && !F.isEmpty(actions.cacheGroupsToStop())) {
+            for (ExchangeActions.CacheGroupActionData grpActionData : actions.cacheGroupsToStop())
+                onGroupRebalanceFinished(grpActionData.descriptor().groupId());
+        }
+
         Set<Integer> grpsToEnableWal = new HashSet<>();
         Set<Integer> grpsToDisableWal = new HashSet<>();
         Set<Integer> grpsWithWalDisabled = new HashSet<>();
@@ -456,17 +464,10 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
 
             if (hasOwning && !grp.localWalEnabled())
                 grpsToEnableWal.add(grp.groupId());
-            else if (hasMoving && !hasOwning && grp.localWalEnabled()) {
+            else if (hasMoving && !hasOwning && grp.localWalEnabled())
                 grpsToDisableWal.add(grp.groupId());
-
-                grpsWithWalDisabled.add(grp.groupId());
-            }
-            else if (!grp.localWalEnabled())
-                grpsWithWalDisabled.add(grp.groupId());
         }
 
-        tmpDisabledWal = new TemporaryDisabledWal(grpsWithWalDisabled, topVer);
-
         if (grpsToEnableWal.isEmpty() && grpsToDisableWal.isEmpty())
             return;
 
@@ -481,78 +482,64 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
         for (Integer grpId : grpsToEnableWal)
             cctx.cache().cacheGroup(grpId).localWalEnabled(true, true);
 
-        for (Integer grpId : grpsToDisableWal)
-            cctx.cache().cacheGroup(grpId).localWalEnabled(false, true);
+        tmpDisabledWal.disable(grpsToDisableWal);
     }
 
     /**
      * Callback when group rebalancing is finished. If there are no pending groups, it should trigger checkpoint and
      * change partition states.
      * @param grpId Group ID.
-     * @param topVer Topology version.
      */
-    public void onGroupRebalanceFinished(int grpId, AffinityTopologyVersion topVer) {
-        TemporaryDisabledWal session0 = tmpDisabledWal;
+    public void onGroupRebalanceFinished(int grpId) {
+        Set<Integer> groupsToEnable = tmpDisabledWal.enable(grpId);
 
-        if (session0 == null || session0.topVer.compareTo(topVer) > 0)
+        if (F.isEmpty(groupsToEnable))
             return;
 
-        session0.remainingGrps.remove(grpId);
-
-        if (session0.remainingGrps.isEmpty()) {
-            synchronized (mux) {
-                if (tmpDisabledWal != session0)
-                    return;
-
-                for (Integer grpId0 : session0.disabledGrps) {
-                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId0);
-
-                    assert grp != null;
-
-                    if (!grp.localWalEnabled())
-                        grp.localWalEnabled(true, false);
-                }
+        grpId = F.first(groupsToEnable);
 
-                tmpDisabledWal = null;
-            }
+        CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
 
-            // Pending updates in groups with disabled WAL are not protected from crash.
-            // Need to trigger checkpoint for attempt to persist them.
-            CheckpointProgress cpFut = triggerCheckpoint("wal-local-state-changed-rebalance-finished-" + topVer);
+        assert grp != null : "Can not find group with id: " + grpId;
 
-            assert cpFut != null;
+        AffinityTopologyVersion lastGroupTop = grp.topology().readyTopologyVersion();
 
-            // It's safe to switch partitions to owning state only if checkpoint was successfully finished.
-            cpFut.futureFor(FINISHED).listen(new IgniteInClosureX<IgniteInternalFuture>() {
-                @Override public void applyx(IgniteInternalFuture future) {
-                    if (X.hasCause(future.error(), NodeStoppingException.class))
-                        return;
+        // Pending updates in groups with disabled WAL are not protected from crash.
+        // Need to trigger checkpoint for attempt to persist them.
+        CheckpointProgress cpFut = triggerCheckpoint("wal-local-state-changed-rebalance-finished-" + lastGroupTop);
 
-                    for (Integer grpId0 : session0.disabledGrps) {
-                        try {
-                            cctx.database().walEnabled(grpId0, true, true);
-                        }
-                        catch (Exception e) {
-                            if (!X.hasCause(e, NodeStoppingException.class))
-                                throw e;
-                        }
+        assert cpFut != null;
 
-                        CacheGroupContext grp = cctx.cache().cacheGroup(grpId0);
+        // It's safe to switch partitions to owning state only if checkpoint was successfully finished.
+        cpFut.futureFor(FINISHED).listen(new IgniteInClosureX<IgniteInternalFuture>() {
+            @Override public void applyx(IgniteInternalFuture future) {
+                if (X.hasCause(future.error(), NodeStoppingException.class))
+                    return;
 
-                        if (grp != null)
-                            grp.topology().ownMoving(topVer);
-                        else if (log.isDebugEnabled())
-                            log.debug("Cache group was destroyed before checkpoint finished, [grpId=" + grpId0 + ']');
+                for (Integer grpId0 : groupsToEnable) {
+                    try {
+                        cctx.database().walEnabled(grpId0, true, true);
+                    }
+                    catch (Exception e) {
+                        if (!X.hasCause(e, NodeStoppingException.class))
+                            throw e;
                     }
 
-                    if (log.isDebugEnabled())
-                        log.debug("Refresh partitions due to rebalance finished");
+                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId0);
 
-                    // Trigger exchange for switching to ideal assignment when all nodes are ready.
-                    cctx.exchange().refreshPartitions();
+                    if (grp != null)
+                        grp.topology().ownMoving(lastGroupTop);
+                    else if (log.isDebugEnabled())
+                        log.debug("Cache group was destroyed before checkpoint finished, [grpId=" + grpId0 + ']');
                 }
-            });
-        }
+
+                if (log.isDebugEnabled())
+                    log.debug("Refresh partitions due to rebalance finished");
+
+                // Trigger exchange for switching to ideal assignment when all nodes are ready.
+                cctx.exchange().refreshPartitions();
+            }
+        });
     }
 
     /**
@@ -1183,29 +1170,66 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
     /**
      *
      */
-    private static class TemporaryDisabledWal {
+    private class TemporaryDisabledWal {
         /** Groups with disabled WAL. */
-        private final Set<Integer> disabledGrps;
+        private final Set<Integer> disabledGrps = new HashSet<>();
 
         /** Remaining groups. */
-        private final Set<Integer> remainingGrps;
+        private final Set<Integer> remainingGrps = new HashSet<>();
 
-        /** Topology version*/
-        private final AffinityTopologyVersion topVer;
+        /**
+         * Disables WAL of groups specified.
+         *
+         * @param disabledGrps Groups' list whose WAL should disable.
+         */
+        public synchronized void disable(Set<Integer> disabledGrps) {
+            this.disabledGrps.addAll(disabledGrps);
+            this.remainingGrps.addAll(disabledGrps);
 
-        /** */
-        public TemporaryDisabledWal(
-            Set<Integer> disabledGrps,
-            AffinityTopologyVersion topVer
-        ) {
-            this.disabledGrps = Collections.unmodifiableSet(disabledGrps);
-            this.remainingGrps = new HashSet<>(disabledGrps);
-            this.topVer = topVer;
+            for (Integer grpId : disabledGrps)
+                cctx.cache().cacheGroup(grpId).localWalEnabled(false, true);
+        }
+
+        /**
+         * Memorized group which for, WAL will be enabled.
+         * If WAL for all temporary disabled groups would be enabled,
+         * WAL will be local enable and result will not be empty.
+         *
+         * @param grpId Group id.
+         * @return List of groups which were local enabled.
+         */
+        public synchronized Set<Integer> enable(int grpId) {
+            remainingGrps.remove(grpId);
+
+            if (remainingGrps.isEmpty()) {
+                HashSet<Integer> walEnablingGrps = new HashSet<>(disabledGrps.size());
+
+                for (Integer grpId0 : disabledGrps) {
+                    CacheGroupContext grp = cctx.cache().cacheGroup(grpId0);
+
+                    if (grp == null) {
+                        log.warning("Group stopped. Chnage WAL state does not need [id=" + grpId0 + "]");
+
+                        continue;
+                    }
+
+                    if (!grp.localWalEnabled())
+                        grp.localWalEnabled(true, false);
+
+                    walEnablingGrps.add(grpId0);
+                }
+
+                disabledGrps.clear();
+
+                return walEnablingGrps;
+            }
+
+            return Collections.EMPTY_SET;
         }
     }
 
     /**
-     *
+     * Temporary storage for disabled WALs of group.
      */
     public static class WALDisableContext implements MetastorageLifecycleListener {
         /** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 918cfc9..db5cc5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.IgniteCodeGeneratingFail;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -44,6 +45,9 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage {
     /** */
     public static final IgniteProductVersion VERSION_SINCE = IgniteProductVersion.fromString("2.4.4");
 
+    /** Cache rebalance topic. */
+    private static final Object REBALANCE_TOPIC = GridCachePartitionExchangeManager.rebalanceTopic(0);
+
     /** Rebalance id. */
     private long rebalanceId;
 
@@ -56,7 +60,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage {
 
     /** Topic. */
     @GridDirectTransient
-    private Object topic;
+    private Object topic = REBALANCE_TOPIC;
 
     /** Serialized topic. */
     private byte[] topicBytes;
@@ -76,11 +80,21 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage {
      * @param grpId Cache group ID.
      */
     GridDhtPartitionDemandMessage(long rebalanceId, @NotNull AffinityTopologyVersion topVer, int grpId) {
+        this(rebalanceId, topVer, grpId, new IgniteDhtDemandedPartitionsMap());
+    }
+
+    /**
+     * @param rebalanceId Rebalance id for this node.
+     * @param topVer Topology version.
+     * @param grpId Cache group ID.
+     * @param parts Demand partiton map.
+     */
+    GridDhtPartitionDemandMessage(long rebalanceId, @NotNull AffinityTopologyVersion topVer, int grpId,
+        IgniteDhtDemandedPartitionsMap parts) {
         this.grpId = grpId;
         this.rebalanceId = rebalanceId;
         this.topVer = topVer;
-
-        parts = new IgniteDhtDemandedPartitionsMap();
+        this.parts = parts;
     }
 
     /**
@@ -178,15 +192,6 @@ public class GridDhtPartitionDemandMessage extends GridCacheGroupIdMessage {
     }
 
     /**
-     * @param topic Topic.
-     * @deprecated Obsolete (Kept to solve compatibility issues).
-     */
-    @Deprecated
-    void topic(Object topic) {
-        this.topic = topic;
-    }
-
-    /**
      * @return Worker ID.
      */
     int workerId() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index ca768fc..2c64e6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -24,13 +24,16 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -117,9 +120,6 @@ public class GridDhtPartitionDemander {
     /** Last exchange future. */
     private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
 
-    /** Cache rebalance topic. */
-    private final Object rebalanceTopic;
-
     /** Rebalancing last cancelled time. */
     private final AtomicLong lastCancelledTime = new AtomicLong(-1);
 
@@ -147,8 +147,6 @@ public class GridDhtPartitionDemander {
 
         Map<Integer, Object> tops = new HashMap<>();
 
-        rebalanceTopic = GridCachePartitionExchangeManager.rebalanceTopic(0);
-
         String metricGroupName = metricName(CACHE_GROUP_METRICS_PREFIX, grp.cacheOrGroupName());
 
         MetricRegistry mreg = grp.shared().kernalContext().metric().registry(metricGroupName);
@@ -256,16 +254,6 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * @param fut Future.
-     * @return {@code True} if rebalance topology version changed by exchange thread or force
-     * reassing exchange occurs, see {@link RebalanceReassignExchangeTask} for details.
-     */
-    private boolean topologyChanged(RebalanceFuture fut) {
-        return !ctx.exchange().rebalanceTopologyVersion().equals(fut.topVer) ||
-            fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions.
-    }
-
-    /**
      * Sets last exchange future.
      *
      * @param lastFut Last future to set.
@@ -289,16 +277,19 @@ public class GridDhtPartitionDemander {
      * @param assignments Assignments to process.
      * @param force {@code True} if preload request by {@link ForceRebalanceExchangeTask}.
      * @param rebalanceId Rebalance id generated from exchange thread.
-     * @param next Runnable responsible for cache rebalancing chain.
+     * @param next A next rebalance routine in chain.
      * @param forcedRebFut External future for forced rebalance.
-     * @return Rebalancing runnable.
+     * @param compatibleRebFut Future for waiting for compatible rebalances.
+     *
+     * @return Rebalancing future or {@code null} to exclude an assignment from a chain.
      */
-    Runnable addAssignments(
+    @Nullable RebalanceFuture addAssignments(
         final GridDhtPreloaderAssignments assignments,
         boolean force,
         long rebalanceId,
-        final Runnable next,
-        @Nullable final GridCompoundFuture<Boolean, Boolean> forcedRebFut
+        final RebalanceFuture next,
+        @Nullable final GridCompoundFuture<Boolean, Boolean> forcedRebFut,
+        GridCompoundFuture<Boolean, Boolean> compatibleRebFut
     ) {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assignments);
@@ -310,21 +301,62 @@ public class GridDhtPartitionDemander {
         if ((delay == 0 || force) && assignments != null) {
             final RebalanceFuture oldFut = rebalanceFut;
 
-            final RebalanceFuture fut = new RebalanceFuture(grp, assignments, log, rebalanceId, lastCancelledTime);
+            if (assignments.cancelled()) { // Pending exchange.
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing skipped due to cancelled assignments.");
+
+                return null;
+            }
+
+            if (assignments.isEmpty()) { // Nothing to rebalance.
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing skipped due to empty assignments.");
+
+                if (oldFut.isInitial())
+                    oldFut.onDone(true);
+
+                ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
+
+                ctx.exchange().scheduleResendPartitions();
+
+                return null;
+            }
+
+            if (!force && (!oldFut.isDone() || oldFut.result()) && oldFut.compatibleWith(assignments)) {
+                if (!oldFut.isDone())
+                    compatibleRebFut.add(oldFut);
 
-            if (!grp.localWalEnabled())
+                return null;
+            }
+
+            final RebalanceFuture fut = new RebalanceFuture(grp, assignments, log, rebalanceId, next, lastCancelledTime);
+
+            if (!grp.localWalEnabled()) {
                 fut.listen(new IgniteInClosureX<IgniteInternalFuture<Boolean>>() {
                     @Override public void applyx(IgniteInternalFuture<Boolean> future) throws IgniteCheckedException {
                         if (future.get())
-                            ctx.walState().onGroupRebalanceFinished(grp.groupId(), assignments.topologyVersion());
+                            ctx.walState().onGroupRebalanceFinished(grp.groupId());
                     }
                 });
+            }
 
             if (!oldFut.isInitial())
-                oldFut.cancel();
+                oldFut.tryCancel();
             else
                 fut.listen(f -> oldFut.onDone(f.result()));
 
+            // Make sure partitions scheduled for full rebalancing are first cleared.
+            if (grp.persistenceEnabled()) {
+                for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assignments.entrySet()) {
+                    for (Integer partId : e.getValue().partitions().fullSet()) {
+                        GridDhtLocalPartition part = grp.topology().localPartition(partId);
+
+                        if (part != null && part.state() == MOVING)
+                            part.clearAsync();
+                    }
+                }
+            }
+
             if (forcedRebFut != null)
                 forcedRebFut.add(fut);
 
@@ -356,45 +388,7 @@ public class GridDhtPartitionDemander {
 
             fut.sendRebalanceStartedEvent();
 
-            if (assignments.cancelled()) { // Pending exchange.
-                if (log.isDebugEnabled())
-                    log.debug("Rebalancing skipped due to cancelled assignments.");
-
-                fut.onDone(false);
-
-                fut.sendRebalanceFinishedEvent();
-
-                return null;
-            }
-
-            if (assignments.isEmpty()) { // Nothing to rebalance.
-                if (log.isDebugEnabled())
-                    log.debug("Rebalancing skipped due to empty assignments.");
-
-                fut.onDone(true);
-
-                ((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
-
-                fut.sendRebalanceFinishedEvent();
-
-                return null;
-            }
-
-            return () -> {
-                if (next != null)
-                    fut.listen(f -> {
-                        try {
-                            if (f.get()) // Not cancelled.
-                                next.run(); // Starts next cache rebalancing (according to the order).
-                        }
-                        catch (IgniteCheckedException e) {
-                            if (log.isDebugEnabled())
-                                log.debug(e.getMessage());
-                        }
-                    });
-
-                requestPartitions(fut, assignments);
-            };
+            return fut;
         }
         else if (delay > 0) {
             for (GridCacheContext cctx : grp.caches()) {
@@ -433,212 +427,12 @@ public class GridDhtPartitionDemander {
     }
 
     /**
-     * Asynchronously sends initial demand messages formed from {@code assignments} and initiates supply-demand processes.
-     *
-     * For each node participating in rebalance process method distributes set of partitions for that node to several stripes (topics).
-     * It means that each stripe containing a subset of partitions can be processed in parallel.
-     * The number of stripes are controlled by {@link IgniteConfiguration#getRebalanceThreadPoolSize()} property.
-     *
-     * Partitions that can be rebalanced using only WAL are called historical, others are called full.
-     *
-     * Before sending messages, method awaits partitions clearing for full partitions.
-     *
-     * @param fut Rebalance future.
-     * @param assignments Assignments.
-     */
-    private void requestPartitions(final RebalanceFuture fut, GridDhtPreloaderAssignments assignments) {
-        assert fut != null;
-
-        if (topologyChanged(fut)) {
-            fut.cancel();
-
-            return;
-        }
-
-        if (!ctx.kernalContext().grid().isRebalanceEnabled()) {
-            if (log.isTraceEnabled())
-                log.trace("Cancel partition demand because rebalance disabled on current node.");
-
-            fut.cancel();
-
-            return;
-        }
-
-        final CacheConfiguration cfg = grp.config();
-
-        for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assignments.entrySet()) {
-            final ClusterNode node = e.getKey();
-
-            GridDhtPartitionDemandMessage d = e.getValue();
-
-            final IgniteDhtDemandedPartitionsMap parts;
-
-            synchronized (fut) { // Synchronized to prevent consistency issues in case of parallel cancellation.
-                if (fut.isDone())
-                    break;
-
-                if (rebalanceFut.startTime == -1)
-                    rebalanceFut.startTime = System.currentTimeMillis();
-
-                parts = fut.remaining.get(node.id());
-
-                U.log(log, "Prepared rebalancing [grp=" + grp.cacheOrGroupName()
-                    + ", mode=" + cfg.getRebalanceMode() + ", supplier=" + node.id() + ", partitionsCount=" + parts.size()
-                    + ", topVer=" + fut.topologyVersion() + "]");
-            }
-
-            if (!parts.isEmpty()) {
-                d.topic(rebalanceTopic);
-                d.rebalanceId(fut.rebalanceId);
-                d.timeout(grp.preloader().timeout());
-
-                IgniteInternalFuture<?> clearAllFuture = clearFullPartitions(fut, d.partitions().fullSet());
-
-                // Start rebalancing after clearing full partitions is finished.
-                clearAllFuture.listen(f -> ctx.kernalContext().closure().runLocalSafe(() -> {
-                    if (fut.isDone())
-                        return;
-
-                    try {
-                        if (log.isInfoEnabled())
-                            log.info("Starting rebalance routine [" + grp.cacheOrGroupName() +
-                                ", topVer=" + fut.topologyVersion() +
-                                ", supplier=" + node.id() +
-                                ", fullPartitions=" + S.compact(parts.fullSet()) +
-                                ", histPartitions=" + S.compact(parts.historicalSet()) + "]");
-
-                        ctx.io().sendOrderedMessage(node, rebalanceTopic,
-                            d.convertIfNeeded(node.version()), grp.ioPolicy(), d.timeout());
-
-                        // Cleanup required in case partitions demanded in parallel with cancellation.
-                        synchronized (fut) {
-                            if (fut.isDone())
-                                fut.cleanupRemoteContexts(node.id());
-                        }
-                    }
-                    catch (IgniteCheckedException e1) {
-                        ClusterTopologyCheckedException cause = e1.getCause(ClusterTopologyCheckedException.class);
-
-                        if (cause != null)
-                            log.warning("Failed to send initial demand request to node. " + e1.getMessage());
-                        else
-                            log.error("Failed to send initial demand request to node.", e1);
-
-                        fut.cancel();
-                    }
-                    catch (Throwable th) {
-                        log.error("Runtime error caught during initial demand request sending.", th);
-
-                        fut.cancel();
-                    }
-                }, true));
-            }
-        }
-    }
-
-    /**
-     * Creates future which will be completed when all {@code fullPartitions} are cleared.
-     *
-     * @param fut Rebalance future.
-     * @param fullPartitions Set of full partitions need to be cleared.
-     * @return Future which will be completed when given partitions are cleared.
-     */
-    private IgniteInternalFuture<?> clearFullPartitions(RebalanceFuture fut, Set<Integer> fullPartitions) {
-        final GridFutureAdapter clearAllFuture = new GridFutureAdapter();
-
-        if (fullPartitions.isEmpty()) {
-            clearAllFuture.onDone();
-
-            return clearAllFuture;
-        }
-
-        for (GridCacheContext cctx : grp.caches()) {
-            if (cctx.statisticsEnabled()) {
-                final CacheMetricsImpl metrics = cctx.cache().metrics0();
-
-                metrics.rebalanceClearingPartitions(fullPartitions.size());
-            }
-        }
-
-        final AtomicInteger clearingPartitions = new AtomicInteger(fullPartitions.size());
-
-        for (int partId : fullPartitions) {
-            if (fut.isDone()) {
-                clearAllFuture.onDone();
-
-                return clearAllFuture;
-            }
-
-            GridDhtLocalPartition part = grp.topology().localPartition(partId);
-
-            if (part != null && part.state() == MOVING) {
-                part.onClearFinished(f -> {
-                    if (!fut.isDone()) {
-                        // Cancel rebalance if partition clearing was failed.
-                        if (f.error() != null) {
-                            for (GridCacheContext cctx : grp.caches()) {
-                                if (cctx.statisticsEnabled()) {
-                                    final CacheMetricsImpl metrics = cctx.cache().metrics0();
-
-                                    metrics.rebalanceClearingPartitions(0);
-                                }
-                            }
-
-                            log.error("Unable to await partition clearing " + part, f.error());
-
-                            fut.cancel();
-
-                            clearAllFuture.onDone(f.error());
-                        }
-                        else {
-                            int remaining = clearingPartitions.decrementAndGet();
-
-                            for (GridCacheContext cctx : grp.caches()) {
-                                if (cctx.statisticsEnabled()) {
-                                    final CacheMetricsImpl metrics = cctx.cache().metrics0();
-
-                                    metrics.rebalanceClearingPartitions(remaining);
-                                }
-                            }
-
-                            if (log.isDebugEnabled())
-                                log.debug("Partition is ready for rebalance [grp=" + grp.cacheOrGroupName()
-                                    + ", p=" + part.id() + ", remaining=" + remaining + "]");
-
-                            if (remaining == 0)
-                                clearAllFuture.onDone();
-                        }
-                    }
-                    else
-                        clearAllFuture.onDone();
-                });
-            }
-            else {
-                int remaining = clearingPartitions.decrementAndGet();
-
-                for (GridCacheContext cctx : grp.caches()) {
-                    if (cctx.statisticsEnabled()) {
-                        final CacheMetricsImpl metrics = cctx.cache().metrics0();
-
-                        metrics.rebalanceClearingPartitions(remaining);
-                    }
-                }
-
-                if (remaining == 0)
-                    clearAllFuture.onDone();
-            }
-        }
-
-        return clearAllFuture;
-    }
-
-    /**
      * Enqueues supply message.
      */
     public void registerSupplyMessage(final UUID nodeId, final GridDhtPartitionSupplyMessage supplyMsg, final Runnable r) {
         final RebalanceFuture fut = rebalanceFut;
 
-        if (!topologyChanged(fut) && fut.isActual(supplyMsg.rebalanceId())) {
+        if (fut.isActual(supplyMsg.rebalanceId())) {
             boolean historical = false;
 
             for (Integer p : supplyMsg.infos().keySet()) {
@@ -679,6 +473,13 @@ public class GridDhtPartitionDemander {
         fut.cancelLock.readLock().lock();
 
         try {
+            if (fut.isDone()) {
+                if (log.isDebugEnabled())
+                    log.debug("Supply message ignored (rebalance completed) [" + demandRoutineInfo(nodeId, supplyMsg) + "]");
+
+                return;
+            }
+
             ClusterNode node = ctx.node(nodeId);
 
             if (node == null) {
@@ -689,7 +490,7 @@ public class GridDhtPartitionDemander {
             }
 
             // Topology already changed (for the future that supply message based on).
-            if (topologyChanged(fut) || !fut.isActual(supplyMsg.rebalanceId())) {
+            if (!fut.isActual(supplyMsg.rebalanceId())) {
                 if (log.isDebugEnabled())
                     log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(nodeId, supplyMsg) + "]");
 
@@ -845,12 +646,10 @@ public class GridDhtPartitionDemander {
 
                 d.timeout(grp.preloader().timeout());
 
-                d.topic(rebalanceTopic);
-
-                if (!topologyChanged(fut) && !fut.isDone()) {
+                if (!fut.isDone()) {
                     // Send demand message.
                     try {
-                        ctx.io().sendOrderedMessage(node, rebalanceTopic,
+                        ctx.io().sendOrderedMessage(node, d.topic(),
                             d.convertIfNeeded(node.version()), grp.ioPolicy(), grp.preloader().timeout());
 
                         if (log.isDebugEnabled())
@@ -865,7 +664,7 @@ public class GridDhtPartitionDemander {
                 else {
                     if (log.isDebugEnabled())
                         log.debug("Will not request next demand message [" + demandRoutineInfo(nodeId, supplyMsg) +
-                            ", topChanged=" + topologyChanged(fut) + ", rebalanceFuture=" + fut + "]");
+                            ", rebalanceFuture=" + fut + "]");
                 }
             }
             catch (IgniteSpiException | IgniteCheckedException e) {
@@ -888,7 +687,7 @@ public class GridDhtPartitionDemander {
         int p,
         final UUID nodeId,
         final GridDhtPartitionSupplyMessage supplyMsg) {
-        if (topologyChanged(fut) || !fut.isActual(supplyMsg.rebalanceId()))
+        if (fut.isDone() || !fut.isActual(supplyMsg.rebalanceId()))
             return;
 
         long queued = fut.queued.get(p).sum();
@@ -1185,12 +984,33 @@ public class GridDhtPartitionDemander {
     }
 
     /**
+     * Internal states of rebalance future.
+     */
+    private enum RebalanceFutureState {
+        /** Init. */
+        INIT,
+
+        /** Started. */
+        STARTED,
+
+        /** Marked as cancelled. */
+        MARK_CANCELLED,
+    }
+
+    /**
      *
      */
     public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
+        /** State updater. */
+        private static final AtomicReferenceFieldUpdater<RebalanceFuture, RebalanceFutureState> STATE_UPD =
+            AtomicReferenceFieldUpdater.newUpdater(RebalanceFuture.class, RebalanceFutureState.class, "state");
+
         /** */
         private final GridCacheSharedContext<?, ?> ctx;
 
+        /** Internal state. */
+        private volatile RebalanceFutureState state = RebalanceFutureState.INIT;
+
         /** */
         private final CacheGroupContext grp;
 
@@ -1248,23 +1068,37 @@ public class GridDhtPartitionDemander {
         /** Rebalancing last cancelled time. */
         private final AtomicLong lastCancelledTime;
 
+        /** Next future in chain. */
+        private final RebalanceFuture next;
+
+        /** Assigment. */
+        private final GridDhtPreloaderAssignments assignments;
+
+        /** Partitions which have been scheduled for rebalance from specific supplier. */
+        private final Map<ClusterNode, Set<Integer>> rebalancingParts;
+
         /**
          * @param grp Cache group.
          * @param assignments Assignments.
          * @param log Logger.
-         *
-            @param rebalanceId Rebalance id.
+         * @param rebalanceId Rebalance id.
+         * @param next Next rebalance future.
+         * @param lastCancelledTime Cancelled time.
          */
         RebalanceFuture(
             CacheGroupContext grp,
             GridDhtPreloaderAssignments assignments,
             IgniteLogger log,
             long rebalanceId,
+            RebalanceFuture next,
             AtomicLong lastCancelledTime) {
             assert assignments != null;
 
+            this.rebalancingParts = U.newHashMap(assignments.size());
+            this.assignments = assignments;
             exchId = assignments.exchangeId();
             topVer = assignments.topologyVersion();
+            this.next = next;
 
             this.lastCancelledTime = lastCancelledTime;
 
@@ -1276,6 +1110,11 @@ public class GridDhtPartitionDemander {
 
                 partitionsLeft.addAndGet(v.partitions().size());
 
+                rebalancingParts.put(k, new HashSet<Integer>(v.partitions().size()) {{
+                    addAll(v.partitions().historicalSet());
+                    addAll(v.partitions().fullSet());
+                }});
+
                 historical.addAll(v.partitions().historicalSet());
 
                 Stream.concat(v.partitions().historicalSet().stream(), v.partitions().fullSet().stream())
@@ -1301,6 +1140,8 @@ public class GridDhtPartitionDemander {
          * Dummy future. Will be done by real one.
          */
         RebalanceFuture() {
+            this.rebalancingParts = null;
+            this.assignments = null;
             this.exchId = null;
             this.topVer = null;
             this.ctx = null;
@@ -1309,10 +1150,224 @@ public class GridDhtPartitionDemander {
             this.rebalanceId = -1;
             this.routines = 0;
             this.cancelLock = new ReentrantReadWriteLock();
+            this.next = null;
             this.lastCancelledTime = new AtomicLong();
         }
 
         /**
+         * Asynchronously sends initial demand messages formed from {@code assignments} and initiates supply-demand processes.
+         *
+         * For each node participating in rebalance process method distributes set of partitions for that node to several stripes (topics).
+         * It means that each stripe containing a subset of partitions can be processed in parallel.
+         * The number of stripes are controlled by {@link IgniteConfiguration#getRebalanceThreadPoolSize()} property.
+         *
+         * Partitions that can be rebalanced using only WAL are called historical, others are called full.
+         *
+         * Before sending messages, method awaits partitions clearing for full partitions.
+         */
+        public void requestPartitions() {
+            if (!STATE_UPD.compareAndSet(this, RebalanceFutureState.INIT, RebalanceFutureState.STARTED)) {
+                cancel();
+
+                return;
+            }
+
+            if (!ctx.kernalContext().grid().isRebalanceEnabled()) {
+                if (log.isTraceEnabled())
+                    log.trace("Cancel partition demand because rebalance disabled on current node.");
+
+                cancel();
+
+                return;
+            }
+
+            if (isDone()) {
+                assert !result() : "Rebalance future was done, but partitions never requested [grp="
+                    + grp.cacheOrGroupName() + ", topVer=" + topologyVersion() + "]";
+
+                return;
+            }
+
+            final CacheConfiguration cfg = grp.config();
+
+            for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assignments.entrySet()) {
+                final ClusterNode node = e.getKey();
+
+                GridDhtPartitionDemandMessage d = e.getValue();
+
+                final IgniteDhtDemandedPartitionsMap parts;
+
+                synchronized (this) { // Synchronized to prevent consistency issues in case of parallel cancellation.
+                    if (isDone())
+                        return;
+
+                    if (startTime == -1)
+                        startTime = System.currentTimeMillis();
+
+                    parts = remaining.get(node.id());
+
+                    U.log(log, "Prepared rebalancing [grp=" + grp.cacheOrGroupName()
+                        + ", mode=" + cfg.getRebalanceMode() + ", supplier=" + node.id() + ", partitionsCount=" + parts.size()
+                        + ", topVer=" + topologyVersion() + "]");
+                }
+
+                if (!parts.isEmpty()) {
+                    d.rebalanceId(rebalanceId);
+                    d.timeout(grp.preloader().timeout());
+
+                    IgniteInternalFuture<?> clearAllFuture = clearFullPartitions(this, d.partitions().fullSet());
+
+                    // Start rebalancing after clearing full partitions is finished.
+                    clearAllFuture.listen(f -> ctx.kernalContext().closure().runLocalSafe(() -> {
+                        if (isDone())
+                            return;
+
+                        try {
+                            if (log.isInfoEnabled())
+                                log.info("Starting rebalance routine [" + grp.cacheOrGroupName() +
+                                    ", topVer=" + topologyVersion() +
+                                    ", supplier=" + node.id() +
+                                    ", fullPartitions=" + S.compact(parts.fullSet()) +
+                                    ", histPartitions=" + S.compact(parts.historicalSet()) + "]");
+
+                            ctx.io().sendOrderedMessage(node, d.topic(),
+                                d.convertIfNeeded(node.version()), grp.ioPolicy(), d.timeout());
+
+                            // Cleanup required in case partitions demanded in parallel with cancellation.
+                            synchronized (this) {
+                                if (isDone())
+                                    cleanupRemoteContexts(node.id());
+                            }
+                        }
+                        catch (IgniteCheckedException e1) {
+                            ClusterTopologyCheckedException cause = e1.getCause(ClusterTopologyCheckedException.class);
+
+                            if (cause != null)
+                                log.warning("Failed to send initial demand request to node. " + e1.getMessage());
+                            else
+                                log.error("Failed to send initial demand request to node.", e1);
+
+                            cancel();
+                        }
+                        catch (Throwable th) {
+                            log.error("Runtime error caught during initial demand request sending.", th);
+
+                            cancel();
+                        }
+                    }, true));
+                }
+            }
+        }
+
+        /**
+         * Creates future which will be completed when all {@code fullPartitions} are cleared.
+         *
+         * @param fut Rebalance future.
+         * @param fullPartitions Set of full partitions need to be cleared.
+         * @return Future which will be completed when given partitions are cleared.
+         */
+        private IgniteInternalFuture<?> clearFullPartitions(RebalanceFuture fut, Set<Integer> fullPartitions) {
+            final GridFutureAdapter clearAllFuture = new GridFutureAdapter();
+
+            if (fullPartitions.isEmpty()) {
+                clearAllFuture.onDone();
+
+                return clearAllFuture;
+            }
+
+            for (GridCacheContext cctx : grp.caches()) {
+                if (cctx.statisticsEnabled()) {
+                    final CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+                    metrics.rebalanceClearingPartitions(fullPartitions.size());
+                }
+            }
+
+            final AtomicInteger clearingPartitions = new AtomicInteger(fullPartitions.size());
+
+            for (int partId : fullPartitions) {
+                if (fut.isDone()) {
+                    clearAllFuture.onDone();
+
+                    return clearAllFuture;
+                }
+
+                GridDhtLocalPartition part = grp.topology().localPartition(partId);
+
+                if (part != null && part.state() == MOVING) {
+                    part.onClearFinished(f -> {
+                        if (!fut.isDone()) {
+                            // Cancel rebalance if partition clearing was failed.
+                            if (f.error() != null) {
+                                for (GridCacheContext cctx : grp.caches()) {
+                                    if (cctx.statisticsEnabled()) {
+                                        final CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+                                        metrics.rebalanceClearingPartitions(0);
+                                    }
+                                }
+
+                                log.error("Unable to await partition clearing " + part, f.error());
+
+                                fut.cancel();
+
+                                clearAllFuture.onDone(f.error());
+                            }
+                            else {
+                                int remaining = clearingPartitions.decrementAndGet();
+
+                                for (GridCacheContext cctx : grp.caches()) {
+                                    if (cctx.statisticsEnabled()) {
+                                        final CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+                                        metrics.rebalanceClearingPartitions(remaining);
+                                    }
+                                }
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Partition is ready for rebalance [grp=" + grp.cacheOrGroupName()
+                                        + ", p=" + part.id() + ", remaining=" + remaining + "]");
+
+                                if (remaining == 0)
+                                    clearAllFuture.onDone();
+                            }
+                        }
+                        else
+                            clearAllFuture.onDone();
+                    });
+                }
+                else {
+                    int remaining = clearingPartitions.decrementAndGet();
+
+                    for (GridCacheContext cctx : grp.caches()) {
+                        if (cctx.statisticsEnabled()) {
+                            final CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+                            metrics.rebalanceClearingPartitions(remaining);
+                        }
+                    }
+
+                    if (remaining == 0)
+                        clearAllFuture.onDone();
+                }
+            }
+
+            return clearAllFuture;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
+            if (super.onDone(res, err)) {
+                if (next != null)
+                    next.requestPartitions(); // Go to next item in chain everything if it exists.
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
          * @return Topology version.
          */
         public AffinityTopologyVersion topologyVersion() {
@@ -1335,6 +1390,16 @@ public class GridDhtPartitionDemander {
         }
 
         /**
+         * Cancel future or mark for cancel {@code RebalanceFutureState#MARK_CANCELLED}.
+         */
+        private void tryCancel() {
+            if (STATE_UPD.compareAndSet(this, RebalanceFutureState.INIT, RebalanceFutureState.MARK_CANCELLED))
+                return;
+
+            cancel();
+        }
+
+        /**
          * Cancels this future.
          *
          * @return {@code True}.
@@ -1389,37 +1454,33 @@ public class GridDhtPartitionDemander {
         /**
          * @param nodeId Node id.
          */
-        private void cancel(UUID nodeId) {
-            synchronized (this) {
-                if (isDone())
-                    return;
+        private synchronized void cancel(UUID nodeId) {
+            if (isDone())
+                return;
 
-                U.log(log, ("Cancelled rebalancing [grp=" + grp.cacheOrGroupName() +
-                    ", supplier=" + nodeId + ", topVer=" + topologyVersion() + ']'));
+            U.log(log, ("Cancelled rebalancing [grp=" + grp.cacheOrGroupName() +
+                ", supplier=" + nodeId + ", topVer=" + topologyVersion() + ']'));
 
-                cleanupRemoteContexts(nodeId);
+            cleanupRemoteContexts(nodeId);
 
-                remaining.remove(nodeId);
+            remaining.remove(nodeId);
 
-                onDone(false); // Finishing rebalance future as non completed.
+            onDone(false); // Finishing rebalance future as non completed.
 
-                checkIsDone(); // But will finish syncFuture only when other nodes are preloaded or rebalancing cancelled.
-            }
+            checkIsDone(); // But will finish syncFuture only when other nodes are preloaded or rebalancing cancelled.
         }
 
         /**
          * @param nodeId Node id.
          * @param p Partition id.
          */
-        private void partitionMissed(UUID nodeId, int p) {
-            synchronized (this) {
-                if (isDone())
-                    return;
+        private synchronized void partitionMissed(UUID nodeId, int p) {
+            if (isDone())
+                return;
 
-                missed.computeIfAbsent(nodeId, k -> new HashSet<>());
+            missed.computeIfAbsent(nodeId, k -> new HashSet<>());
 
-                missed.get(nodeId).add(p);
-            }
+            missed.get(nodeId).add(p);
         }
 
         /**
@@ -1443,8 +1504,6 @@ public class GridDhtPartitionDemander {
             try {
                 Object rebalanceTopic = GridCachePartitionExchangeManager.rebalanceTopic(0);
 
-                d.topic(rebalanceTopic);
-
                 ctx.io().sendOrderedMessage(node, rebalanceTopic,
                     d.convertIfNeeded(node.version()), grp.ioPolicy(), grp.preloader().timeout());
             }
@@ -1458,44 +1517,42 @@ public class GridDhtPartitionDemander {
          * @param nodeId Node id.
          * @param p Partition number.
          */
-        private void partitionDone(UUID nodeId, int p, boolean updateState) {
-            synchronized (this) {
-                if (updateState && grp.localWalEnabled())
-                    grp.topology().own(grp.topology().localPartition(p));
+        private synchronized void partitionDone(UUID nodeId, int p, boolean updateState) {
+            if (updateState && grp.localWalEnabled())
+                grp.topology().own(grp.topology().localPartition(p));
 
-                if (isDone())
-                    return;
+            if (isDone())
+                return;
 
-                if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
-                    rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchId.discoveryEvent());
+            if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchId.discoveryEvent());
 
-                IgniteDhtDemandedPartitionsMap parts = remaining.get(nodeId);
+            IgniteDhtDemandedPartitionsMap parts = remaining.get(nodeId);
 
-                assert parts != null : "Remaining not found [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId +
-                    ", part=" + p + "]";
+            assert parts != null : "Remaining not found [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId +
+                ", part=" + p + "]";
 
-                boolean rmvd = parts.remove(p);
+            boolean rmvd = parts.remove(p);
 
-                assert rmvd : "Partition already done [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId +
-                    ", part=" + p + ", left=" + parts + "]";
+            assert rmvd : "Partition already done [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId +
+                ", part=" + p + ", left=" + parts + "]";
 
                 if (rmvd)
                     partitionsLeft.decrementAndGet();
 
-                if (parts.isEmpty()) {
-                    int remainingRoutines = remaining.size() - 1;
+            if (parts.isEmpty()) {
+                int remainingRoutines = remaining.size() - 1;
 
-                    U.log(log, "Completed " + ((remainingRoutines == 0 ? "(final) " : "") +
-                        "rebalancing [grp=" + grp.cacheOrGroupName() +
-                        ", supplier=" + nodeId +
-                        ", topVer=" + topologyVersion() +
-                        ", progress=" + (routines - remainingRoutines) + "/" + routines + "]"));
+                U.log(log, "Completed " + ((remainingRoutines == 0 ? "(final) " : "") +
+                    "rebalancing [grp=" + grp.cacheOrGroupName() +
+                    ", supplier=" + nodeId +
+                    ", topVer=" + topologyVersion() +
+                    ", progress=" + (routines - remainingRoutines) + "/" + routines + "]"));
 
-                    remaining.remove(nodeId);
-                }
-
-                checkIsDone();
+                remaining.remove(nodeId);
             }
+
+            checkIsDone();
         }
 
         /**
@@ -1538,7 +1595,8 @@ public class GridDhtPartitionDemander {
                     log.debug("Partitions have been scheduled to resend [reason=" +
                         "Rebalance is done [grp=" + grp.cacheOrGroupName() + "]");
 
-                ctx.exchange().scheduleResendPartitions();
+                if (!cancelled)
+                    ctx.exchange().scheduleResendPartitions();
 
                 Collection<Integer> m = new HashSet<>();
 
@@ -1587,6 +1645,87 @@ public class GridDhtPartitionDemander {
                 rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, exchId.discoveryEvent());
         }
 
+        /**
+         * @param otherAssignments Newest assigmnets.
+         *
+         * @return {@code True} when future compared with other, {@code False} otherwise.
+         */
+        public boolean compatibleWith(GridDhtPreloaderAssignments otherAssignments) {
+            if (isInitial() || ((GridDhtPreloader)grp.preloader()).disableRebalancingCancellationOptimization())
+                return false;
+
+            if (ctx.exchange().lastAffinityChangedTopologyVersion(topVer).equals(
+                ctx.exchange().lastAffinityChangedTopologyVersion(otherAssignments.topologyVersion()))) {
+                if (log.isDebugEnabled())
+                    log.debug("Rebalancing is forced on the same topology [grp="
+                        + grp.cacheOrGroupName() + ", " + "top=" + topVer + ']');
+
+                return false;
+            }
+
+            if (otherAssignments.affinityReassign()) {
+                if (log.isDebugEnabled())
+                    log.debug("Some of owned partitions were reassigned through coordinator [grp="
+                        + grp.cacheOrGroupName() + ", " + "init=" + topVer + " ,other=" + otherAssignments.topologyVersion() + ']');
+
+                return false;
+            }
+
+            Set<Integer> p0 = new HashSet<>();
+            Set<Integer> p1 = new HashSet<>();
+
+            // Not compatible if a supplier has left.
+            for (ClusterNode node : rebalancingParts.keySet()) {
+                if (!grp.cacheObjectContext().kernalContext().discovery().alive(node))
+                    return false;
+            }
+
+            for (Set<Integer> partitions : rebalancingParts.values())
+                p0.addAll(partitions);
+
+            for (GridDhtPartitionDemandMessage message : otherAssignments.values()) {
+                p1.addAll(message.partitions().fullSet());
+                p1.addAll(message.partitions().historicalSet());
+            }
+
+            // Not compatible if not a subset.
+            if (!p0.containsAll(p1))
+                return false;
+
+            p1 = Stream.concat(grp.affinity().cachedAffinity(otherAssignments.topologyVersion())
+                .primaryPartitions(ctx.localNodeId()).stream(), grp.affinity()
+                .cachedAffinity(otherAssignments.topologyVersion()).backupPartitions(ctx.localNodeId()).stream())
+                .collect(Collectors.toSet());
+
+            NavigableSet<AffinityTopologyVersion> toCheck = grp.affinity().cachedVersions()
+                .headSet(otherAssignments.topologyVersion(), false);
+
+            if (!toCheck.contains(topVer)) {
+                log.warning("History is not enough for checking compatible last rebalance, new rebalance started " +
+                    "[grp=" + grp.cacheOrGroupName() + ", lastTop=" + topVer + ']');
+
+                return false;
+            }
+
+            for (AffinityTopologyVersion previousTopVer : toCheck.descendingSet()) {
+                if (previousTopVer.before(topVer))
+                    break;
+
+                if (!ctx.exchange().lastAffinityChangedTopologyVersion(previousTopVer).equals(previousTopVer))
+                    continue;
+
+                p0 = Stream.concat(grp.affinity().cachedAffinity(previousTopVer).primaryPartitions(ctx.localNodeId()).stream(),
+                    grp.affinity().cachedAffinity(previousTopVer).backupPartitions(ctx.localNodeId()).stream())
+                    .collect(Collectors.toSet());
+
+                // Not compatible if owners are different.
+                if (!p0.equals(p1))
+                    return false;
+            }
+
+            return true;
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(RebalanceFuture.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 0751cac..50d0677 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -377,6 +377,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     /** Specified only in case of 'cluster is fully rebalanced' state achieved. */
     private volatile RebalancedInfo rebalancedInfo;
 
+    /** Some of owned by affinity partitions were changed state to moving on this exchange. */
+    private volatile boolean affinityReassign;
+
     /**
      * @param cctx Cache context.
      * @param busyLock Busy lock.
@@ -2427,7 +2430,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 }
 
                 if (changedAffinity())
-                    cctx.walState().changeLocalStatesOnExchangeDone(res, changedBaseline());
+                    cctx.walState().changeLocalStatesOnExchangeDone(res, this);
             }
         }
         catch (Throwable t) {
@@ -3697,8 +3700,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     }
                 }
                 else if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage
-                    && ((SnapshotDiscoveryMessage)discoveryCustomMessage).needAssignPartitions())
+                    && ((SnapshotDiscoveryMessage)discoveryCustomMessage).needAssignPartitions()) {
+                    markAffinityReassign();
+
                     assignPartitionsStates();
+                }
             }
             else {
                 if (exchCtx.events().hasServerJoin())
@@ -4464,6 +4470,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap()))
                 cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
 
+            if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+                DiscoveryCustomMessage discoveryCustomMessage = ((DiscoveryCustomEvent)firstDiscoEvt).customMessage();
+
+                if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage
+                    && ((SnapshotDiscoveryMessage)discoveryCustomMessage).needAssignPartitions())
+                    markAffinityReassign();
+            }
+
             onDone(resTopVer, null);
         }
         catch (IgniteCheckedException e) {
@@ -5172,6 +5186,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Marks this future as affinity reassign.
+     */
+    public void markAffinityReassign() {
+        affinityReassign = true;
+    }
+
+    /**
+     * @return True if some owned partition was reassigned, false otherwise.
+     */
+    public boolean affinityReassign() {
+        return affinityReassign;
+    }
+
+    /**
      * Add or merge updates received from coordinator while exchange in progress.
      *
      * @param fullMsg Full message with exchangeId = null.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 9282ded..d34bebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -873,7 +873,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 ClusterNode newMapSentBy = discovery.node(updMap.nodeId());
 
                 if (newMapSentBy == null)
-                    return;
+                    continue;
 
                 if (currentMapSentBy == null || newMapSentBy.order() > currentMapSentBy.order() || updMap.compareTo(currMap) >= 0)
                     partitions().put(grpId, updMap);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 67ba018..8f0d390 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
@@ -35,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -44,11 +46,11 @@ import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
 
@@ -59,6 +61,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** Default preload resend timeout. */
     public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
 
+    /** Disable rebalancing cancellation optimization. */
+    private final boolean disableRebalancingCancellationOptimization = IgniteSystemProperties.getBoolean(
+        IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION);
+
     /** */
     private GridDhtPartitionTopology top;
 
@@ -129,6 +135,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         return new NodeStoppingException("Operation has been cancelled (cache or node is stopping).");
     }
 
+    /**
+     * @return Rebalance cancellation optimization flag.
+     */
+    public boolean disableRebalancingCancellationOptimization() {
+        return disableRebalancingCancellationOptimization;
+    }
+
     /** {@inheritDoc} */
     @Override public void onInitialExchangeComplete(@Nullable Throwable err) {
         if (err == null)
@@ -145,34 +158,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean rebalanceRequired(AffinityTopologyVersion rebTopVer,
-        GridDhtPartitionsExchangeFuture exchFut) {
-        if (ctx.kernalContext().clientNode() || rebTopVer.equals(AffinityTopologyVersion.NONE))
+    @Override public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) {
+        if (ctx.kernalContext().clientNode())
             return false; // No-op.
 
-        if (exchFut.resetLostPartitionFor(grp.cacheOrGroupName()))
-            return true;
-
-        if (exchFut.localJoinExchange())
-            return true; // Required, can have outdated updSeq partition counter if node reconnects.
-
-        if (!grp.affinity().cachedVersions().contains(rebTopVer)) {
-            assert rebTopVer.compareTo(grp.localStartVersion()) <= 0 :
-                "Empty history allowed only for newly started cache group [rebTopVer=" + rebTopVer +
-                    ", localStartTopVer=" + grp.localStartVersion() + ']';
-
-            return true; // Required, since no history info available.
-        }
-
-        final IgniteInternalFuture<Boolean> rebFut = rebalanceFuture();
-
-        if (rebFut.isDone() && !rebFut.result())
-            return true; // Required, previous rebalance cancelled.
-
         AffinityTopologyVersion lastAffChangeTopVer =
             ctx.exchange().lastAffinityChangedTopologyVersion(exchFut.topologyVersion());
 
-        return lastAffChangeTopVer.compareTo(rebTopVer) > 0;
+        return lastAffChangeTopVer.equals(exchFut.topologyVersion());
     }
 
     /** {@inheritDoc} */
@@ -184,7 +177,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         GridDhtPartitionTopology top = grp.topology();
 
         if (!grp.rebalanceEnabled())
-            return new GridDhtPreloaderAssignments(exchId, top.readyTopologyVersion());
+            return new GridDhtPreloaderAssignments(exchId, top.readyTopologyVersion(), false);
 
         int partitions = grp.affinity().partitions();
 
@@ -195,7 +188,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 ", grp=" + grp.name() +
                 ", topVer=" + top.readyTopologyVersion() + ']';
 
-        GridDhtPreloaderAssignments assignments = new GridDhtPreloaderAssignments(exchId, topVer);
+        GridDhtPreloaderAssignments assignments = new GridDhtPreloaderAssignments(exchId, topVer,
+            exchFut != null && exchFut.affinityReassign());
 
         AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
 
@@ -245,8 +239,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                     part.resetUpdateCounter();
                 }
 
-                assert part.state() == MOVING : "Partition has invalid state for rebalance " + aff.topologyVersion() + " " + part;
-
                 ClusterNode histSupplier = null;
 
                 if (grp.persistenceEnabled() && exchFut != null) {
@@ -274,11 +266,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                     msg.partitions().addHistorical(p, part.initialUpdateCounter(), countersMap.updateCounter(p), partitions);
                 }
                 else {
-                    // If for some reason (for example if supplier fails and new supplier is elected) partition is
-                    // assigned for full rebalance force clearing if not yet set.
-                    if (grp.persistenceEnabled() && exchFut != null && !exchFut.isClearingPartition(grp, p))
-                        part.clearAsync();
-
                     List<ClusterNode> picked = remoteOwners(p, topVer);
 
                     if (picked.isEmpty()) {
@@ -380,14 +367,15 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public Runnable addAssignments(
+    @Override public RebalanceFuture addAssignments(
         GridDhtPreloaderAssignments assignments,
         boolean forceRebalance,
         long rebalanceId,
-        Runnable next,
-        @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut
+        final RebalanceFuture next,
+        @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut,
+        GridCompoundFuture<Boolean, Boolean> compatibleRebFut
     ) {
-        return demander.addAssignments(assignments, forceRebalance, rebalanceId, next, forcedRebFut);
+        return demander.addAssignments(assignments, forceRebalance, rebalanceId, next, forcedRebFut, compatibleRebFut);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
index 8783c73..acb1c52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java
@@ -39,16 +39,31 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode,
     /** */
     private boolean cancelled;
 
+    /** Some of owned by affinity partitions were changed state to moving. */
+    private final boolean affinityReassign;
+
     /**
      * @param exchangeId Exchange ID.
      * @param topVer Last join order.
      */
-    public GridDhtPreloaderAssignments(GridDhtPartitionExchangeId exchangeId, AffinityTopologyVersion topVer) {
+    public GridDhtPreloaderAssignments(
+        GridDhtPartitionExchangeId exchangeId,
+        AffinityTopologyVersion topVer,
+        boolean affinityReassign
+    ) {
         assert exchangeId != null;
         assert topVer.topologyVersion() > 0 : topVer;
 
         this.exchangeId = exchangeId;
         this.topVer = topVer;
+        this.affinityReassign = affinityReassign;
+    }
+
+    /**
+     * @return True if partitions were reassigned.
+     */
+    public boolean affinityReassign() {
+        return affinityReassign;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index e2fdf3c..2574c25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -713,9 +713,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
      * @return True if current partition map should be overwritten by new partition map, false in other case.
      */
     private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) {
-        return newMap != null &&
-            (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 ||
-                newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && newMap.updateSequence() > currentMap.updateSequence());
+        return newMap != null && newMap.compareTo(currentMap) > 0;
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 528bd63..a3cd8b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -787,20 +787,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                             log.debug("Will not own partition (there are owners to rebalance from) " +
                                                 "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", owners = " + owners + ']');
                                     }
-
-                                    // It's important to clear non empty moving partitions before full rebalancing.
-                                    // Consider the scenario:
-                                    // Node1 has keys k1 and k2 in the same partition.
-                                    // Node2 started rebalancing from Node1.
-                                    // Node2 received k1, k2 and failed before moving partition to OWNING state.
-                                    // Node1 removes k2 but update has not been delivered to Node1 because of failure.
-                                    // After new full rebalance Node1 will only send k1 to Node2 causing lost removal.
-                                    // NOTE: avoid calling clearAsync for partition twice per topology version.
-                                    if (grp.persistenceEnabled() &&
-                                            exchFut.isClearingPartition(grp, locPart.id()) &&
-                                            !locPart.isClearing() &&
-                                            !locPart.isEmpty())
-                                        locPart.clearAsync();
                                 }
                                 else
                                     updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
@@ -1418,10 +1404,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
      * @return True if current partition map should be overwritten by new partition map, false in other case
      */
     private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) {
-        return newMap != null &&
-            (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 ||
-                newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 &&
-                    newMap.updateSequence() > currentMap.updateSequence());
+        return newMap != null && newMap.compareTo(currentMap) > 0;
     }
 
     /** {@inheritDoc} */
@@ -1498,16 +1481,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                     }
                 }
 
-                if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) > 0) {
-                    U.warn(log, "Stale version for full partition map update message (will ignore) [" +
-                        "grp=" + grp.cacheOrGroupName() +
-                        ", lastTopChange=" + lastTopChangeVer +
-                        ", readTopVer=" + readyTopVer +
-                        ", msgVer=" + msgTopVer + ']');
-
-                    return false;
-                }
-
                 boolean fullMapUpdated = (node2part == null);
 
                 if (node2part != null) {
@@ -1519,7 +1492,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                             if (log.isDebugEnabled()) {
                                 log.debug("Overriding partition map in full update map [" +
-                                    "grp=" + grp.cacheOrGroupName() +
+                                    "node=" + part.nodeId() +
+                                    ", grp=" + grp.cacheOrGroupName() +
                                     ", exchVer=" + exchangeVer +
                                     ", curPart=" + mapString(part) +
                                     ", newPart=" + mapString(newPart) + ']');
@@ -1529,8 +1503,16 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                                 updateSeq.setIfGreater(newPart.updateSequence());
                         }
                         else {
-                            // If for some nodes current partition has a newer map,
-                            // then we keep the newer value.
+                            // If for some nodes current partition has a newer map, then we keep the newer value.
+                            if (log.isDebugEnabled()) {
+                                log.debug("Partitions map for the node keeps newer value than message [" +
+                                    "node=" + part.nodeId() +
+                                    ", grp=" + grp.cacheOrGroupName() +
+                                    ", exchVer=" + exchangeVer +
+                                    ", curPart=" + mapString(part) +
+                                    ", newPart=" + mapString(newPart) + ']');
+                            }
+
                             partMap.put(part.nodeId(), part);
                         }
                     }
@@ -2294,7 +2276,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
 
                 for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) {
                     int part = entry.getKey();
-                    Set<UUID> newOwners = entry.getValue();
+                    Set<UUID> maxCounterPartOwners = entry.getValue();
 
                     GridDhtLocalPartition locPart = localPartition(part);
 
@@ -2302,7 +2284,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         continue;
 
                     // Partition state should be mutated only on joining nodes if they are exists for the exchange.
-                    if (joinedNodes.isEmpty() && !newOwners.contains(locNodeId)) {
+                    if (joinedNodes.isEmpty() && !maxCounterPartOwners.contains(locNodeId)) {
                         rebalancePartition(part, !haveHist.contains(part), exchFut);
 
                         res.computeIfAbsent(locNodeId, n -> new HashSet<>()).add(part);
@@ -2312,7 +2294,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 // Then process remote partitions.
                 for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) {
                     int part = entry.getKey();
-                    Set<UUID> newOwners = entry.getValue();
+                    Set<UUID> maxCounterPartOwners = entry.getValue();
 
                     for (Map.Entry<UUID, GridDhtPartitionMap> remotes : node2part.entrySet()) {
                         UUID remoteNodeId = remotes.getKey();
@@ -2327,7 +2309,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         if (state != OWNING)
                             continue;
 
-                        if (!newOwners.contains(remoteNodeId)) {
+                        if (!maxCounterPartOwners.contains(remoteNodeId)) {
                             partMap.put(part, MOVING);
 
                             partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion());
@@ -2707,12 +2689,19 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         try {
             AffinityTopologyVersion lastAffChangeVer = ctx.exchange().lastAffinityChangedTopologyVersion(lastTopChangeVer);
 
-            if (lastAffChangeVer.compareTo(rebFinishedTopVer) > 0 && log.isInfoEnabled())
-                log.info("Affinity topology changed, no MOVING partitions will be owned " +
-                    "[rebFinishedTopVer=" + rebFinishedTopVer +
-                    ", lastAffChangeVer=" + lastAffChangeVer + "]");
+            if (lastAffChangeVer.compareTo(rebFinishedTopVer) > 0) {
+                if (log.isInfoEnabled()) {
+                    log.info("Affinity topology changed, no MOVING partitions will be owned " +
+                        "[rebFinishedTopVer=" + rebFinishedTopVer +
+                        ", lastAffChangeVer=" + lastAffChangeVer + "]");
+                }
+
+                grp.preloader().forceRebalance();
+
+                return;
+            }
 
-            for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) {
+            for (GridDhtLocalPartition locPart : currentLocalPartitions()) {
                 if (locPart.state() == MOVING) {
                     boolean reserved = locPart.reserve();
 
@@ -2720,7 +2709,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                         if (reserved && locPart.state() == MOVING &&
                             lastAffChangeVer.compareTo(rebFinishedTopVer) <= 0 &&
                             rebFinishedTopVer.compareTo(lastTopChangeVer) <= 0)
-                                grp.topology().own(locPart);
+                            own(locPart);
                     }
                     finally {
                         if (reserved)
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/BreakRebalanceChainTest.java b/modules/core/src/test/java/org/apache/ignite/cache/BreakRebalanceChainTest.java
new file mode 100644
index 0000000..96df899
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/BreakRebalanceChainTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test checks what happens when the rebalance chain is breaking of two parts.
+ */
+public class BreakRebalanceChainTest extends GridCommonAbstractTest {
+    /** Node name suffex. Used for {@link CustomNodeFilter}. */
+    public static final String FILTERED_NODE_SUFFIX = "_filtered";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setConsistentId(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()))
+            .setCacheConfiguration(
+                new CacheConfiguration(DEFAULT_CACHE_NAME + 1)
+                    .setAffinity(new RendezvousAffinityFunction(false, 15))
+                    .setNodeFilter(new CustomNodeFilter())
+                    .setRebalanceOrder(1)
+                    .setBackups(1),
+                new CacheConfiguration(DEFAULT_CACHE_NAME + 2)
+                    .setAffinity(new RendezvousAffinityFunction(false, 15))
+                    .setRebalanceOrder(2)
+                    .setBackups(1),
+                new CacheConfiguration(DEFAULT_CACHE_NAME + 3)
+                    .setAffinity(new RendezvousAffinityFunction(false, 15))
+                    .setNodeFilter(new CustomNodeFilter())
+                    .setRebalanceOrder(3)
+                    .setBackups(1));
+    }
+
+    /**
+     * Custom node filter. It filters all node that name contains a {@link BreakRebalanceChainTest.FILTERED_NODE_SUFFIX}.
+     */
+    private static class CustomNodeFilter implements IgnitePredicate<ClusterNode> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            return !node.consistentId().toString().contains(FILTERED_NODE_SUFFIX);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void test() throws Exception {
+        startGrids(2);
+
+        awaitPartitionMapExchange();
+
+        IgniteConfiguration cfg = optimize(getConfiguration(getTestIgniteInstanceName(2)));
+
+        TestRecordingCommunicationSpi communicationSpi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+        ConcurrentHashMap<String, Long> rebalancingCaches = new ConcurrentHashMap<>();
+
+        communicationSpi.blockMessages((node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage)msg;
+
+                long rebId = U.field(demandMsg, "rebalanceId");
+
+                if (demandMsg.groupId() == CU.cacheId(DEFAULT_CACHE_NAME + 1)) {
+                    rebalancingCaches.put(DEFAULT_CACHE_NAME + 1, rebId);
+
+                    return true;
+                }
+                else if (demandMsg.groupId() == CU.cacheId(DEFAULT_CACHE_NAME + 2)) {
+                    rebalancingCaches.put(DEFAULT_CACHE_NAME + 2, rebId);
+
+                    return true;
+                }
+                else if (demandMsg.groupId() == CU.cacheId(DEFAULT_CACHE_NAME + 3)) {
+                    rebalancingCaches.put(DEFAULT_CACHE_NAME + 3, rebId);
+
+                    return true;
+                }
+            }
+
+            return false;
+        });
+
+        startGrid(cfg);
+
+        communicationSpi.waitForBlocked();
+
+        assertEquals("Several parallel rebalace detected.", blockedDemand(rebalancingCaches), 1);
+
+        IgniteEx filteredNode = startGrid(getTestIgniteInstanceName(3) + FILTERED_NODE_SUFFIX);
+
+        IgniteInternalFuture<Boolean>[] futs = getAllRebalanceFutures(filteredNode);
+
+        for (IgniteInternalFuture fut : futs)
+            fut.get(10_000);
+
+        assertEquals("Several parallel rebalace detected.", blockedDemand(rebalancingCaches), 1);
+
+        communicationSpi.stopBlock();
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * @param rebalancingCaches Map of blocked demande messages for caches.
+     * @return Count of blocked messages.
+     */
+    private int blockedDemand(ConcurrentHashMap<String, Long> rebalancingCaches) {
+        int mesages = 0;
+
+        for (Map.Entry<String, Long> entry : rebalancingCaches.entrySet()) {
+            if (entry.getValue() > 0) {
+                mesages++;
+
+                log.info("Demand for partitions on cache " + entry.getKey() + " rebalance id " + entry.getValue());
+            }
+        }
+
+        return mesages;
+    }
+
+    /**
+     * Finds all existed rebalance future by all cache for Ignite's instance specified.
+     *
+     * @param ignite Ignite.
+     * @return Array of rebelance futures.
+     */
+    private IgniteInternalFuture<Boolean>[] getAllRebalanceFutures(IgniteEx ignite) {
+        IgniteInternalFuture<Boolean>[] futs = new IgniteInternalFuture[ignite.cacheNames().size()];
+
+        int i = 0;
+
+        for (String cache : ignite.cacheNames()) {
+            futs[i] = ignite.context().cache().cacheGroup(CU.cacheId(cache)).preloader().rebalanceFuture();
+
+            i++;
+        }
+        return futs;
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/NotOptimizedRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/cache/NotOptimizedRebalanceTest.java
new file mode 100644
index 0000000..5bc63d5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/NotOptimizedRebalanceTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION;
+
+/**
+ * Test checks rebalance behavior when several exchanges trigger sequence.
+ */
+@WithSystemProperty(key = IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION, value = "true")
+public class NotOptimizedRebalanceTest extends GridCommonAbstractTest {
+    /** Start cluster nodes. */
+    public static final int NODES_CNT = 3;
+
+    /** Persistence enabled. */
+    public boolean persistenceEnabled;
+
+    /** Count of backup partitions. */
+    public static final int BACKUPS = 2;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
+            .setConsistentId(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setWalSegmentSize(4 * 1024 * 1024)
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setMaxSize(100L * 1024 * 1024)
+                    .setPersistenceEnabled(persistenceEnabled)))
+            .setCacheConfiguration(
+                new CacheConfiguration(DEFAULT_CACHE_NAME)
+                    .setAffinity(new RendezvousAffinityFunction(false, 15))
+                    .setBackups(BACKUPS));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * Checks rebalance with persistence.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testRebalanceWithPersistence() throws Exception {
+        testRebalance(true, true);
+    }
+
+    /**
+     * Checks rebalance without persistence.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceWithoutPersistence() throws Exception {
+        testRebalance(false, true);
+    }
+
+    /**
+     * Checks rebalance with persistence and client joining/lifting.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testRebalanceWithPersistenceAndClient() throws Exception {
+        testRebalance(true, false);
+    }
+
+    /**
+     * Checks rebalance without persistence and client joining/lifting..
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceWithoutPersistenceAndClient() throws Exception {
+        testRebalance(false, false);
+    }
+
+    /**
+     * Trigger rebalance when node left topology.
+     *
+     * @param persistence Persistent flag.
+     * @throws Exception If failed.
+     */
+    public void testRebalance(boolean persistence, boolean serverJoin) throws Exception {
+        persistenceEnabled = persistence;
+
+        IgniteEx ignite0 = startGrids(NODES_CNT);
+
+        ignite0.cluster().active(true);
+
+        ignite0.cluster().baselineAutoAdjustEnabled(false);
+
+        IgniteEx newNode = serverJoin ? startGrid(NODES_CNT) : startClientGrid(NODES_CNT);
+
+        grid(1).close();
+
+        for (String cache : ignite0.cacheNames())
+            loadData(ignite0, cache);
+
+        awaitPartitionMapExchange();
+
+        TestRecordingCommunicationSpi commSpi1 = startNodeWithBlockingRebalance(getTestIgniteInstanceName(1));
+
+        commSpi1.waitForBlocked();
+
+        Map<CacheGroupContext, IgniteInternalFuture<Boolean>> futs = getAllRebalanceFuturesByGroup(grid(1));
+
+        checkAllFuturesProcessing(futs);
+
+        for (int i = 0; i < 3; i++) {
+            newNode.close();
+
+            checkTopology(NODES_CNT);
+
+            newNode = serverJoin ? startGrid(NODES_CNT) : startClientGrid(NODES_CNT);
+
+            checkTopology(NODES_CNT + 1);
+        }
+
+        if (serverJoin)
+            checkAllFuturesCancelled(futs);
+        else
+            checkAllFuturesProcessing(futs);
+
+        commSpi1.stopBlock();
+
+        awaitPartitionMapExchange();
+
+        Map<CacheGroupContext, IgniteInternalFuture<Boolean>> newFuts = getAllRebalanceFuturesByGroup(grid(1));
+
+        for (Map.Entry<CacheGroupContext, IgniteInternalFuture<Boolean>> grpFut : futs.entrySet()) {
+            IgniteInternalFuture<Boolean> fut = grpFut.getValue();
+            IgniteInternalFuture<Boolean> newFut = newFuts.get(grpFut.getKey());
+
+            if (serverJoin)
+                assertTrue(futureInfoString(fut), fut.isDone() && !fut.get());
+            else
+                assertSame(fut, newFut);
+
+            assertTrue(futureInfoString(newFut), newFut.isDone() && newFut.get());
+        }
+    }
+
+    /**
+     * @param futs Matching group to rebalance's future.
+     * @throws org.apache.ignite.IgniteCheckedException
+     */
+    public void checkAllFuturesCancelled(Map<CacheGroupContext, IgniteInternalFuture<Boolean>> futs)
+        throws org.apache.ignite.IgniteCheckedException {
+        for (IgniteInternalFuture<Boolean> fut : futs.values())
+            assertTrue(futureInfoString(fut), fut.isDone() && !fut.get());
+    }
+
+    /**
+     * @param futs Matching group to rebalance's future.
+     */
+    public void checkAllFuturesProcessing(Map<CacheGroupContext, IgniteInternalFuture<Boolean>> futs) {
+        for (IgniteInternalFuture<Boolean> fut : futs.values())
+            assertFalse(futureInfoString(fut), fut.isDone());
+    }
+
+    /**
+     * Finds all existed rebalance future by all cache for Ignite's instance specified.
+     *
+     * @param ignite Ignite.
+     * @return Array of rebelance futures.
+     */
+    private Map<CacheGroupContext, IgniteInternalFuture<Boolean>> getAllRebalanceFuturesByGroup(IgniteEx ignite) {
+        HashMap<CacheGroupContext, IgniteInternalFuture<Boolean>> futs = new HashMap<>(ignite.cacheNames().size());
+
+        for (String cache : ignite.cacheNames()) {
+            IgniteInternalFuture<Boolean> fut = ignite.context().cache().cacheGroup(CU.cacheId(cache)).preloader().rebalanceFuture();
+
+            futs.put(ignite.context().cache().cacheGroup(CU.cacheId(cache)), fut);
+        }
+        return futs;
+    }
+
+    /**
+     * Prepares string representation of rebalance future.
+     *
+     * @param rebalanceFuture Rebalance future.
+     * @return Information string about passed future.
+     */
+    private String futureInfoString(IgniteInternalFuture<Boolean> rebalanceFuture) {
+        return "Fut: " + rebalanceFuture
+            + " is done: " + rebalanceFuture.isDone()
+            + " result: " + (rebalanceFuture.isDone() ? rebalanceFuture.result() : "None");
+    }
+
+    /**
+     * Starts node with name <code>name</code> and blocks demand message for custom caches.
+     *
+     * @param name Node instance name.
+     * @return Test communication SPI.
+     * @throws Exception If failed.
+     */
+    private TestRecordingCommunicationSpi startNodeWithBlockingRebalance(String name) throws Exception {
+        IgniteConfiguration cfg = optimize(getConfiguration(name));
+
+        TestRecordingCommunicationSpi communicationSpi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+        communicationSpi.blockMessages((node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage demandMessage = (GridDhtPartitionDemandMessage)msg;
+
+                if (CU.cacheId(DEFAULT_CACHE_NAME) != demandMessage.groupId())
+                    return false;
+
+                info("Message was caught: " + msg.getClass().getSimpleName()
+                    + " rebalanceId = " + U.field(demandMessage, "rebalanceId")
+                    + " to: " + node.consistentId()
+                    + " by cache id: " + demandMessage.groupId());
+
+                return true;
+            }
+
+            return false;
+        });
+
+        startGrid(cfg);
+
+        return communicationSpi;
+    }
+
+    /**
+     * Loades several data entries to cache specified.
+     *
+     * @param ignite Ignite.
+     * @param cacheName Cache name.
+     */
+    private void loadData(Ignite ignite, String cacheName) {
+        try (IgniteDataStreamer streamer = ignite.dataStreamer(cacheName)) {
+            streamer.allowOverwrite(true);
+
+            for (int i = 0; i < 100; i++)
+                streamer.addData(i, System.nanoTime());
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCancellationTest.java b/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCancellationTest.java
new file mode 100644
index 0000000..25b44e0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/RebalanceCancellationTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test cases when rebalance processed and not cancelled during various exchange events.
+ */
+public class RebalanceCancellationTest extends GridCommonAbstractTest {
+    /** Start cluster nodes. */
+    public static final int NODES_CNT = 3;
+
+    /** Count of backup partitions. */
+    public static final int BACKUPS = 2;
+
+    /** In memory data region name. */
+    public static final String MEM_REGION = "mem-region";
+
+    /** In memory cache name. */
+    public static final String MEM_REGOIN_CACHE = DEFAULT_CACHE_NAME + "_mem";
+
+    /** In memory dynamic cache name. */
+    public static final String DYNAMIC_CACHE_NAME = DEFAULT_CACHE_NAME + "_dynamic";
+
+    /** Node name suffex. Used for {@link CustomNodeFilter}. */
+    public static final String FILTERED_NODE_SUFFIX = "_filtered";
+
+    /** Persistence enabled. */
+    public boolean persistenceEnabled;
+
+    /** Add additional non-persistence data region. */
+    public boolean addtiotionalMemRegion;
+
+    /** Filter node. */
+    public boolean filterNode;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
+            .setConsistentId(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setPersistenceEnabled(persistenceEnabled)))
+            .setCacheConfiguration(
+                new CacheConfiguration(DEFAULT_CACHE_NAME)
+                    .setAffinity(new RendezvousAffinityFunction(false, 15))
+                    .setBackups(BACKUPS));
+
+        if (addtiotionalMemRegion) {
+            cfg.setCacheConfiguration(cfg.getCacheConfiguration()[0],
+                new CacheConfiguration(MEM_REGOIN_CACHE)
+                    .setDataRegionName(MEM_REGION)
+                    .setBackups(BACKUPS))
+                .getDataStorageConfiguration()
+                .setDataRegionConfigurations(new DataRegionConfiguration()
+                    .setName(MEM_REGION));
+        }
+
+        if (filterNode) {
+            for (CacheConfiguration ccfg : cfg.getCacheConfiguration())
+                ccfg.setNodeFilter(new CustomNodeFilter());
+        }
+
+        return cfg;
+    }
+
+    /**
+     * Custom node filter. It filters all node that name contains a {@link FILTERED_NODE_SUFFIX}.
+     */
+    private static class CustomNodeFilter implements IgnitePredicate<ClusterNode> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            return !node.consistentId().toString().contains(FILTERED_NODE_SUFFIX);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+        cleanPersistenceDir();
+    }
+
+    /**
+     * Non baseline node leaves cluster with only persistent caches during rebalance.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceNoneBltNodeLeftOnOnlyPersistenceCluster() throws Exception {
+        testRebalanceNoneBltNode(true, false, false);
+    }
+
+    /**
+     * Non baseline node leaves cluster with only memory caches during rebalance.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceNoneBltNodeLeftOnOnlyInMemoryCluster() throws Exception {
+        testRebalanceNoneBltNode(false, false, false);
+    }
+
+    /**
+     * Non baseline node leaves cluster with persistent and memory caches during rebalance.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceNoneBltNodeLeftOnMixedCluster() throws Exception {
+        testRebalanceNoneBltNode(true, true, false);
+    }
+
+    /**
+     * Non baseline node fails in cluster with only persistent caches during rebalance.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceNoneBltNodeFailedOnOnlyPersistenceCluster() throws Exception {
+        testRebalanceNoneBltNode(true, false, true);
+    }
+
+    /**
+     * Non baseline node fails in cluster with only memory caches during rebalance.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceNoneBltNodeFailedOnOnlyInMemoryCluster() throws Exception {
+        testRebalanceNoneBltNode(false, false, true);
+    }
+
+    /**
+     * Non baseline node fails in cluster with persistent and memory caches during rebalance.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceNoneBltNodeFailedOnMixedCluster() throws Exception {
+        testRebalanceNoneBltNode(true, true, true);
+    }
+
+    /**
+     * Filtered node leaves cluster with persistent region.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceFilteredNodeOnOnlyPersistenceCluster() throws Exception {
+        testRebalanceFilteredNode(true, false);
+    }
+
+    /**
+     * Filtered node leaves cluster with memory region.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceFilteredNodeOnOnlyInMemoryCluster() throws Exception {
+        testRebalanceFilteredNode(false, false);
+    }
+
+    /**
+     * Filtered node leaves cluster with persistent and memory regions.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceFilteredNodeOnMixedCluster() throws Exception {
+        testRebalanceFilteredNode(true, true);
+    }
+
+    /**
+     * Cache stops/starts several times on persistent cluster.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceDynamicCacheOnOnlyPersistenceCluster() throws Exception {
+        testRebalanceDynamicCache(true, false);
+    }
+
+    /**
+     * Cache stop/start several times on memory cluster.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceDynamicCacheOnOnlyInMemoryCluster() throws Exception {
+        testRebalanceDynamicCache(false, false);
+    }
+
+    /**
+     * Cache stop/start several times on cluster with persistent and memory regions.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceDynamicCacheOnMixedCluster() throws Exception {
+        testRebalanceDynamicCache(true, true);
+    }
+
+    /**
+     * Trigger rebalance when dynamic caches stop/start.
+     *
+     * @param persistence Persistent flag.
+     * @param addtiotionalRegion Use additional (non default) region.
+     * @throws Exception If failed.
+     */
+    public void testRebalanceDynamicCache(boolean persistence, boolean addtiotionalRegion) throws Exception {
+        persistenceEnabled = persistence;
+        addtiotionalMemRegion = addtiotionalRegion;
+
+        IgniteEx ignite0 = startGrids(NODES_CNT);
+
+        ignite0.cluster().active(true);
+
+        grid(1).close();
+
+        for (String cache : ignite0.cacheNames())
+            loadData(ignite0, cache);
+
+        awaitPartitionMapExchange();
+
+        TestRecordingCommunicationSpi commSpi1 = startNodeWithBlockingRebalance(getTestIgniteInstanceName(1));
+
+        commSpi1.waitForBlocked();
+
+        IgniteInternalFuture<Boolean>[] futs = getAllRebalanceFutures(ignite0);
+
+        int previousCaches = ignite0.cacheNames().size();
+
+        for (int i = 0; i < 3; i++) {
+            ignite0.createCache(DYNAMIC_CACHE_NAME);
+
+            assertEquals(previousCaches + 1, ignite0.cacheNames().size());
+
+            ignite0.destroyCache(DYNAMIC_CACHE_NAME);
+
+            assertEquals(previousCaches, ignite0.cacheNames().size());
+        }
+
+        for (IgniteInternalFuture<Boolean> fut : futs)
+            assertFalse(futInfoString(fut), fut.isDone());
+
+        commSpi1.stopBlock();
+
+        awaitPartitionMapExchange();
+
+        for (IgniteInternalFuture<Boolean> fut : futs)
+            assertTrue(futInfoString(fut), fut.isDone() && fut.get());
+    }
+
+    /**
+     * Trigger rebalance when non-blt node left topology.
+     *
+     * @param persistence Persistent flag.
+     * @param addtiotionalRegion Use additional (non default) region.
+     * @param fail If true node forcibly falling.
+     * @throws Exception If failed.
+     */
+    public void testRebalanceNoneBltNode(boolean persistence, boolean addtiotionalRegion,
+        boolean fail) throws Exception {
+        persistenceEnabled = persistence;
+        addtiotionalMemRegion = addtiotionalRegion;
+
+        IgniteEx ignite0 = startGrids(NODES_CNT);
+
+        ignite0.cluster().active(true);
+
+        ignite0.cluster().baselineAutoAdjustEnabled(false);
+
+        IgniteEx newNode = startGrid(NODES_CNT);
+
+        grid(1).close();
+
+        for (String cache : ignite0.cacheNames())
+            loadData(ignite0, cache);
+
+        awaitPartitionMapExchange();
+
+        TestRecordingCommunicationSpi commSpi1 = startNodeWithBlockingRebalance(getTestIgniteInstanceName(1));
+
+        commSpi1.waitForBlocked();
+
+        IgniteInternalFuture<Boolean>[] futs = getAllRebalanceFutures(ignite0);
+
+        for (int i = 0; i < 3; i++) {
+            if (fail) {
+                ignite0.configuration().getDiscoverySpi().failNode(newNode.localNode().id(), "Fail node by test.");
+
+                newNode.close();
+            }
+            else
+                newNode.close();
+
+            checkTopology(NODES_CNT);
+
+            newNode = startGrid(NODES_CNT);
+
+            checkTopology(NODES_CNT + 1);
+        }
+
+        for (IgniteInternalFuture<Boolean> fut : futs) {
+            CacheGroupContext grp = U.field(fut, "grp");
+
+            if (CU.isPersistentCache(grp.config(), ignite0.configuration().getDataStorageConfiguration()))
+                assertFalse(futInfoString(fut), fut.isDone());
+        }
+
+        commSpi1.stopBlock();
+
+        awaitPartitionMapExchange();
+
+        for (IgniteInternalFuture<Boolean> fut : futs) {
+            CacheGroupContext grp = U.field(fut, "grp");
+
+            if (CU.isPersistentCache(grp.config(), ignite0.configuration().getDataStorageConfiguration()))
+                assertTrue(futInfoString(fut), fut.isDone() && fut.get());
+        }
+    }
+
+    /**
+     * Trigger rebalance when filtered node left topology.
+     *
+     * @param persistence Persistent flag.
+     * @param addtiotionalRegion Use additional (non default) region.
+     * @throws Exception If failed.
+     */
+    public void testRebalanceFilteredNode(boolean persistence, boolean addtiotionalRegion) throws Exception {
+        persistenceEnabled = persistence;
+        addtiotionalMemRegion = addtiotionalRegion;
+        filterNode = true;
+
+        IgniteEx ignite0 = startGrids(NODES_CNT);
+        IgniteEx filteredNode = startGrid(getTestIgniteInstanceName(NODES_CNT) + FILTERED_NODE_SUFFIX);
+
+        ignite0.cluster().active(true);
+
+        grid(1).close();
+
+        for (String cache : ignite0.cacheNames())
+            loadData(ignite0, cache);
+
+        awaitPartitionMapExchange();
+
+        TestRecordingCommunicationSpi commSpi1 = startNodeWithBlockingRebalance(getTestIgniteInstanceName(1));
+
+        commSpi1.waitForBlocked();
+
+        IgniteInternalFuture<Boolean>[] futs = getAllRebalanceFutures(ignite0);
+
+        for (int k = 0; k < 3; k++) {
+            filteredNode.close();
+
+            checkTopology(NODES_CNT);
+
+            filteredNode = startGrid(getTestIgniteInstanceName(NODES_CNT) + FILTERED_NODE_SUFFIX);
+        }
+
+        for (IgniteInternalFuture<Boolean> fut : futs)
+            assertFalse(futInfoString(fut), fut.isDone());
+
+        commSpi1.stopBlock();
+
+        awaitPartitionMapExchange();
+
+        for (IgniteInternalFuture<Boolean> fut : futs)
+            assertTrue(futInfoString(fut), fut.isDone() && fut.get());
+    }
+
+    /**
+     * Finds all existed rebalance future by all cache for Ignite's instance specified.
+     *
+     * @param ignite Ignite.
+     * @return Array of rebelance futures.
+     */
+    private IgniteInternalFuture<Boolean>[] getAllRebalanceFutures(IgniteEx ignite) {
+        IgniteInternalFuture<Boolean>[] futs = new IgniteInternalFuture[ignite.cacheNames().size()];
+
+        int i = 0;
+
+        for (String cache : ignite.cacheNames()) {
+            futs[i] = grid(1).context().cache()
+                .cacheGroup(CU.cacheId(cache)).preloader().rebalanceFuture();
+
+            assertFalse(futInfoString(futs[i]), futs[i].isDone());
+
+            i++;
+        }
+        return futs;
+    }
+
+    /**
+     * Prepares string representation of rebalance future.
+     *
+     * @param rebalanceFuture Rebalance future.
+     * @return Information string about passed future.
+     */
+    private String futInfoString(IgniteInternalFuture<Boolean> rebalanceFuture) {
+        return "Fut: " + rebalanceFuture
+            + " is done: " + rebalanceFuture.isDone()
+            + " result: " + (rebalanceFuture.isDone() ? rebalanceFuture.result() : "None");
+    }
+
+    /**
+     * Loades several data entries to cache specified.
+     *
+     * @param ignite Ignite.
+     * @param cacheName Cache name.
+     */
+    private void loadData(Ignite ignite, String cacheName) {
+        try (IgniteDataStreamer streamer = ignite.dataStreamer(cacheName)) {
+            streamer.allowOverwrite(true);
+
+            for (int i = 0; i < 100; i++)
+                streamer.addData(i, System.nanoTime());
+        }
+    }
+
+    /**
+     * Starts node with name <code>name</code> and blocks demand message for custom caches.
+     *
+     * @param name Node instance name.
+     * @return Test communication SPI.
+     * @throws Exception If failed.
+     */
+    private TestRecordingCommunicationSpi startNodeWithBlockingRebalance(String name) throws Exception {
+        IgniteConfiguration cfg = optimize(getConfiguration(name));
+
+        TestRecordingCommunicationSpi communicationSpi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+        communicationSpi.blockMessages((node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage demandMessage = (GridDhtPartitionDemandMessage)msg;
+
+                if (CU.cacheId(DEFAULT_CACHE_NAME) != demandMessage.groupId()
+                    && CU.cacheId(MEM_REGOIN_CACHE) != demandMessage.groupId())
+                    return false;
+
+                info("Message was caught: " + msg.getClass().getSimpleName()
+                    + " rebalanceId = " + U.field(demandMessage, "rebalanceId")
+                    + " to: " + node.consistentId()
+                    + " by cache id: " + demandMessage.groupId());
+
+                return true;
+            }
+
+            return false;
+        });
+
+        startGrid(cfg);
+
+        return communicationSpi;
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/PendingExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/PendingExchangeTest.java
new file mode 100644
index 0000000..b67e055
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/PendingExchangeTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION;
+
+/**
+ * Test creates two exchange in same moment, in order to when first executing second would already in queue.
+ */
+@WithSystemProperty(key = IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION, value = "false")
+public class PendingExchangeTest extends GridCommonAbstractTest {
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+                .setBackups(1));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        super.beforeTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * Thats starts several caches in order to affinity history is exhausted.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    @WithSystemProperty(key = IGNITE_AFFINITY_HISTORY_SIZE, value = "2")
+    public void testWithShortAfinityHistory() throws Exception {
+        createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+            GridCompoundFuture compFut = new GridCompoundFuture();
+
+            for (int i = 0; i < 20; i++) {
+                int finalNum = i;
+
+                compFut.add(GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME + "_new" + finalNum)));
+            }
+
+            compFut.markInitialized();
+
+            waitForExchnagesBegin(exchangeManager, 20);
+
+            return compFut;
+        });
+    }
+
+    /**
+     * Thats starts one cache and several clients in one moment in order leading to pending client exchanges.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStartSeveralClients() throws Exception {
+        createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+            GridCompoundFuture compFut = new GridCompoundFuture();
+
+            for (int i = 0; i < 5; i++) {
+                int finalNum = i;
+
+                compFut.add(GridTestUtils.runAsync(() -> startClientGrid("new_client" + finalNum)));
+            }
+
+            //Need to explicitly wait for laying of client exchanges on exchange queue, before cache start exchnage.
+            waitForExchnagesBegin(exchangeManager, 5);
+
+            compFut.add(GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME + "_new")));
+
+            compFut.markInitialized();
+
+            waitForExchnagesBegin(exchangeManager, 6);
+
+            return compFut;
+        });
+    }
+
+    /**
+     * Test checks that pending exchange will lead to stable topology.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStartCachePending() throws Exception {
+        createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) ->
+            GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME + "_new")));
+    }
+
+    /**
+     * Start and stop cache.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStopStartCachePending() throws Exception {
+        createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+            GridCompoundFuture compFut = new GridCompoundFuture();
+
+            compFut.add(GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME + "_new")));
+            compFut.add(GridTestUtils.runAsync(() -> ignite.destroyCache(DEFAULT_CACHE_NAME + "_new")));
+
+            compFut.markInitialized();
+
+            waitForExchnagesBegin(exchangeManager, 1);
+
+            return compFut;
+        });
+    }
+
+    /**
+     * Start several cache and stop their.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStopStartSeveralCachePending() throws Exception {
+        createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+            GridCompoundFuture compFut = new GridCompoundFuture();
+
+            for (int i = 0; i < 5; i++) {
+                int finalNum = i;
+
+                compFut.add(GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME +
+                    "_new" + finalNum)));
+                compFut.add(GridTestUtils.runAsync(() -> ignite.destroyCache(DEFAULT_CACHE_NAME +
+                    "_new" + finalNum)));
+            }
+
+            compFut.markInitialized();
+
+            waitForExchnagesBegin(exchangeManager, 5);
+
+            return compFut;
+        });
+    }
+
+    /**
+     * Start server and cache.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStartServerAndCache() throws Exception {
+        createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+            GridCompoundFuture compFut = new GridCompoundFuture();
+
+            compFut.add(GridTestUtils.runAsync(() -> startGrid("new_srv")));
+            compFut.add(GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME + "_new")));
+
+            compFut.markInitialized();
+
+            waitForExchnagesBegin(exchangeManager, 2);
+
+            return compFut;
+        });
+    }
+
+    /**
+     * Start several servers, clients and caches.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStartServalServersWithClisntAndCache() throws Exception {
+        createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+            GridCompoundFuture compFut = new GridCompoundFuture();
+
+            for (int i = 0; i < 3; i++) {
+                int finalSrvNum = i;
+
+                compFut.add(GridTestUtils.runAsync(() -> startGrid("new_srv" + finalSrvNum)));
+                compFut.add(GridTestUtils.runAsync(() -> startClientGrid("new_client" + finalSrvNum)));
+                compFut.add(GridTestUtils.runAsync(() -> ignite.createCache(DEFAULT_CACHE_NAME + "_new" + finalSrvNum)));
+            }
+
+            compFut.markInitialized();
+
+            waitForExchnagesBegin(exchangeManager, 9);
+
+            return compFut;
+        });
+    }
+
+    /**
+     * Start and stop several servers and clients.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStartStopServalServersWithClisnt() throws Exception {
+        createClusterWithPendingExchnageDuringRebalance((ignite, exchangeManager) -> {
+            GridCompoundFuture compFut = new GridCompoundFuture();
+
+            for (int i = 0; i < 2; i++) {
+                int finalSrvNum = i;
+
+                compFut.add(GridTestUtils.runAsync(() -> startGrid("new_srv" + finalSrvNum)));
+                compFut.add(GridTestUtils.runAsync(() -> startClientGrid("new_client" + finalSrvNum)));
+
+                compFut.add(GridTestUtils.runAsync(() -> stopGrid("new_srv" + finalSrvNum)));
+                compFut.add(GridTestUtils.runAsync(() -> stopGrid("new_client" + finalSrvNum)));
+            }
+
+            compFut.markInitialized();
+
+            waitForExchnagesBegin(exchangeManager, 4);
+
+            return compFut;
+        });
+    }
+
+    /**
+     * Waiting for exchanges beginning.
+     *
+     * @param ignite Ignite.
+     */
+    private void waitForExchnagesBegin(GridCachePartitionExchangeManager exchangeManager, int exchanges) {
+        GridWorker exchWorker = U.field(exchangeManager, "exchWorker");
+        Queue<CachePartitionExchangeWorkerTask> exchnageQueue = U.field(exchWorker, "futQ");
+
+        try {
+            assertTrue(GridTestUtils.waitForCondition(() -> {
+                int exFuts = 0;
+
+                for (CachePartitionExchangeWorkerTask task : exchnageQueue) {
+                    if (task instanceof GridDhtPartitionsExchangeFuture)
+                        exFuts++;
+                }
+
+                return exFuts >= exchanges;
+            }, 30_000));
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            fail("Can’t wait for the exchnages beginning.");
+        }
+    }
+
+    /**
+     * @param clo Closure triggering exchange.
+     * @throws Exception If failed.
+     */
+    private void createClusterWithPendingExchnageDuringRebalance(PendingExchangeTrigger clo) throws Exception {
+        IgniteEx ignite0 = startGrids(3);
+
+        try (IgniteDataStreamer streamer = ignite0.dataStreamer(DEFAULT_CACHE_NAME)) {
+            for (int i = 0; i < 1000; i++)
+                streamer.addData(i, i);
+        }
+
+        awaitPartitionMapExchange();
+
+        GridCachePartitionExchangeManager exchangeManager1 = ignite(1).context().cache().context().exchange();
+
+        CountDownLatch exchangeLatch = new CountDownLatch(1);
+
+        AffinityTopologyVersion readyTop = exchangeManager1.readyAffinityVersion();
+
+        exchangeManager1.registerExchangeAwareComponent(new PartitionsExchangeAware() {
+            @Override public void onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+                U.awaitQuiet(exchangeLatch);
+            }
+        });
+
+        IgniteInternalFuture startNodeFut = GridTestUtils.runAsync(() -> stopGrid(2));
+
+        assertTrue(GridTestUtils.waitForCondition(() ->
+            exchangeManager1.lastTopologyFuture().initialVersion().after(readyTop), 10_000));
+
+        IgniteInternalFuture exchangeTrigger = clo.trigger(ignite0, exchangeManager1);
+
+        assertTrue(GridTestUtils.waitForCondition(exchangeManager1::hasPendingServerExchange, 10_000));
+
+        exchangeLatch.countDown();
+
+        startNodeFut.get(10_000);
+        exchangeTrigger.get(10_000);
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * Trigger pending exchange closure interface.
+     */
+    private static interface PendingExchangeTrigger {
+
+        /**
+         * Invoke exchange.
+         *
+         * @param ignite Ignite.
+         * @param exchangeManager Exchnage manager.
+         * @return
+         */
+        IgniteInternalFuture trigger(Ignite ignite, GridCachePartitionExchangeManager exchangeManager);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java
index 80ca5a8..1d703cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigurationTemplateTest.java
@@ -341,7 +341,7 @@ public class IgniteCacheConfigurationTemplateTest extends GridCommonAbstractTest
 
         evt = evtLatch.await(3000, TimeUnit.MILLISECONDS);
 
-        assertTrue(evt);
+        assertFalse(evt);
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 4427447..3e85257 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -1329,7 +1329,9 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
             }
         });
 
-        checkAffinity(cnt, topVer(ord - 1, 1), true);
+        AffinityTopologyVersion currentTop = ignite(0).context().cache().context().exchange().readyAffinityVersion();
+
+        checkAffinity(cnt, currentTop, true);
 
         stopNode(stopId, ord);
 
@@ -1508,7 +1510,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
 
         for (int i = 0; i < NODES; i++) {
             TestRecordingCommunicationSpi spi =
-                    (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+                (TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
 
             spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
                 @Override public boolean apply(ClusterNode node, Message msg) {
@@ -1524,8 +1526,12 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
 
         IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(new Callable<Void>() {
             @Override public Void call() throws Exception {
-                while (!joined.get())
-                    U.sleep(10);
+                for (int j = 1; j < NODES; j++) {
+                    TestRecordingCommunicationSpi spi =
+                        (TestRecordingCommunicationSpi)ignite(j).configuration().getCommunicationSpi();
+
+                    spi.waitForBlocked();
+                }
 
                 for (int i = 0; i < NODES; i++)
                     stopGrid(getTestIgniteInstanceName(i), false, false);
@@ -2147,6 +2153,30 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
         for (int i = 0; i < ITERATIONS; i++) {
             log.info("Iteration: " + i);
 
+            TestRecordingCommunicationSpi[] testSpis = new TestRecordingCommunicationSpi[NODES];
+
+            for (int j = 0; j < NODES; j++) {
+                testSpis[j] = new TestRecordingCommunicationSpi();
+
+                testSpis[j].blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+            }
+
+            //Ensure exchanges merge.
+            spiC = igniteInstanceName -> testSpis[getTestIgniteInstanceIndex(igniteInstanceName)];
+
+            GridTestUtils.runAsync(() -> {
+                try {
+                    for (int j = 1; j < NODES; j++)
+                        testSpis[j].waitForBlocked();
+                }
+                catch (InterruptedException e) {
+                    log.error("Thread interrupted.", e);
+                }
+
+                for (TestRecordingCommunicationSpi testSpi : testSpis)
+                    testSpi.stopBlock();
+            });
+
             startGridsMultiThreaded(NODES);
 
             for (int t = 0; t < NODES; t++)
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
index a8c185c..18e4aff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java
@@ -75,7 +75,7 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
 
         cfg.setCommunicationSpi(new TestCommunicationSpi());
 
-        cfg.setIncludeEventTypes(EventType.EVTS_ALL);
+        cfg.setIncludeEventTypes(EventType.EVT_CACHE_REBALANCE_STARTED, EventType.EVT_CACHE_REBALANCE_STOPPED);
 
         return cfg;
     }
@@ -156,11 +156,11 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
 
         Ignite ignite1 = startClientGrid(1);
 
-        assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+        assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
 
         ignite1.close();
 
-        assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+        assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
 
         ignite1 = startClientGrid(1);
 
@@ -176,11 +176,13 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr
             }
         }, EventType.EVT_CACHE_REBALANCE_STARTED, EventType.EVT_CACHE_REBALANCE_STOPPED);
 
-        assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+        assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
 
         startGrid(2);
 
-        assertTrue(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
+        awaitPartitionMapExchange();
+
+        assertFalse(evtLatch0.await(1000, TimeUnit.MILLISECONDS));
         assertFalse(evtLatch1.await(1000, TimeUnit.MILLISECONDS));
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictPartitionInLogTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictPartitionInLogTest.java
index 15d4eba..9d6c86d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictPartitionInLogTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictPartitionInLogTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.ListeningTestLogger;
 import org.apache.ignite.testframework.LogListener;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
@@ -61,6 +62,9 @@ public class EvictPartitionInLogTest extends GridCommonAbstractTest {
     /** Cache names. */
     private static final String[] DEFAULT_CACHE_NAMES = {DEFAULT_CACHE_NAME + "0", DEFAULT_CACHE_NAME + "1"};
 
+    /** Cache's backups. */
+    public int backups = 0;
+
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
@@ -88,7 +92,7 @@ public class EvictPartitionInLogTest extends GridCommonAbstractTest {
                     .map(cacheName ->
                         new CacheConfiguration<>(cacheName)
                             .setGroupName(cacheName)
-                            .setBackups(0)
+                            .setBackups(backups)
                             .setAffinity(new RendezvousAffinityFunction(false, 12))
                             .setIndexedTypes(Integer.class, Integer.class)
                     ).toArray(CacheConfiguration[]::new)
@@ -96,8 +100,7 @@ public class EvictPartitionInLogTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Test checks the presence of evicted partitions (RENTING state) in log
-     * without duplicate partitions.
+     * Test checks the presence of evicted partitions (RENTING state) in log without duplicate partitions.
      *
      * @throws Exception If failed.
      */
@@ -128,14 +131,19 @@ public class EvictPartitionInLogTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Test checks the presence of evicted partitions (MOVING state) in log
-     * without duplicate partitions.
+     * Test checks the presence of evicted partitions (MOVING state) in log without duplicate partitions.
      *
      * @throws Exception If failed.
      */
     @Test
     public void testEvictPartByMovingState() throws Exception {
-        IgniteEx node = startGrid(0);
+        backups = 1;
+
+        IgniteEx node = startGrids(3);
+        awaitPartitionMapExchange();
+
+        stopGrid(2);
+
         awaitPartitionMapExchange();
 
         Map<Integer, Collection<Integer>> parseParts = new ConcurrentHashMap<>();
@@ -162,7 +170,12 @@ public class EvictPartitionInLogTest extends GridCommonAbstractTest {
             .collect(toList());
 
         parts.subList(0, parts.size() - 1).forEach(GridDhtLocalPartition::clearAsync);
-        rebFuts.forEach(rebFut -> rebFut.onDone(Boolean.TRUE));
+
+        rebFuts.forEach(rebFut -> {
+            GridTestUtils.setFieldValue(rebFut, "next", null);
+
+            rebFut.onDone(Boolean.TRUE);
+        });
 
         doSleep(100);
         rebFuts.forEach(GridFutureAdapter::reset);
@@ -227,24 +240,27 @@ public class EvictPartitionInLogTest extends GridCommonAbstractTest {
             .map(cacheName -> "grpId=" + CU.cacheId(cacheName) + ", grpName=" + cacheName)
             .collect(toList());
 
-        Pattern extractParts = Pattern.compile(reason + "=\\[([0-9\\-,]*)]]");
+        Pattern extractParts = Pattern.compile(reason + "=\\[([0-9\\-,]*)]");
         Pattern extractGrpId = Pattern.compile("grpId=([0-9]*)");
 
         LogListener.Builder builder = LogListener.matches(logStr -> {
-            if (logStr.contains("Partitions have been scheduled for eviction:")) {
-                Matcher grpIdMatcher = extractGrpId.matcher(logStr);
-                Matcher partsMatcher = extractParts.matcher(logStr);
+            String msgPrefix = "Partitions have been scheduled for eviction:";
+            if (!logStr.contains(msgPrefix))
+                return false;
+
+            of(logStr.replace(msgPrefix, "").split("], \\[")).forEach(s -> {
+
+                Matcher grpIdMatcher = extractGrpId.matcher(s);
+                Matcher partsMatcher = extractParts.matcher(s);
 
                 //find and parsing grpId and partitions
-                while (grpIdMatcher.find() && partsMatcher.find()) {
+                if (grpIdMatcher.find() && partsMatcher.find()) {
                     evictParts.computeIfAbsent(parseInt(grpIdMatcher.group(1)), i -> new ConcurrentLinkedQueue<>())
                         .addAll(parseContentCompactStr(partsMatcher.group(1)));
                 }
+            });
 
-                return cacheInfos.stream().allMatch(logStr::contains);
-            }
-            else
-                return false;
+            return cacheInfos.stream().allMatch(logStr::contains);
         });
 
         return builder.build();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
index 75476e6..77ebd3b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/LocalWalModeChangeDuringRebalancingSelfTest.java
@@ -519,9 +519,6 @@ public class LocalWalModeChangeDuringRebalancingSelfTest extends GridCommonAbstr
         // Await fully exchange complete.
         awaitExchange(newIgnite);
 
-        for (Ignite g : G.allGrids())
-            g.cache(DEFAULT_CACHE_NAME).rebalance();
-
         assertFalse(grpCtx.walEnabled());
 
         // TODO : test with client node as well
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 597ff2b..02bbf6d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -36,7 +36,6 @@ import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -82,6 +81,9 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
     /** Block message predicate to set to Communication SPI in node configuration. */
     private IgniteBiPredicate<ClusterNode, Message> blockMessagePredicate;
 
+    /** */
+    private int backups;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         System.setProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); //to make all rebalance wal-based
@@ -93,20 +95,22 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(CACHE_NAME)
             .setAtomicityMode(CacheAtomicityMode.ATOMIC)
             .setRebalanceMode(CacheRebalanceMode.ASYNC)
-            .setCacheMode(CacheMode.REPLICATED)
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setBackups(backups)
             .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT));
 
         cfg.setCacheConfiguration(ccfg);
 
         DataStorageConfiguration dbCfg = new DataStorageConfiguration()
-                    .setWalHistorySize(Integer.MAX_VALUE)
-                    .setWalMode(WALMode.LOG_ONLY)
-                    .setCheckpointFrequency(15 * 60 * 1000)
-                    .setDefaultDataRegionConfiguration(
-                        new DataRegionConfiguration()
-                            .setPersistenceEnabled(true)
-                            .setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE)
-                    );
+            .setWalSegmentSize(4 * 1024 * 1024)
+            .setWalHistorySize(Integer.MAX_VALUE)
+            .setWalMode(WALMode.LOG_ONLY)
+            .setCheckpointFrequency(15 * 60 * 1000)
+            .setDefaultDataRegionConfiguration(
+                new DataRegionConfiguration()
+                    .setPersistenceEnabled(true)
+                    .setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE)
+            );
 
         cfg.setDataStorageConfiguration(dbCfg);
 
@@ -153,6 +157,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
      */
     @Test
     public void testSimple() throws Exception {
+        backups = 4;
+
         IgniteEx ig0 = startGrid(0);
         IgniteEx ig1 = startGrid(1);
 
@@ -193,6 +199,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
      */
     @Test
     public void testRebalanceRemoves() throws Exception {
+        backups = 4;
+
         IgniteEx ig0 = startGrid(0);
         IgniteEx ig1 = startGrid(1);
 
@@ -241,9 +249,11 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
      */
     @Test
     public void testWithLocalWalChange() throws Exception {
+        backups = 4;
+
         System.setProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, "true");
 
-        IgniteEx crd = (IgniteEx) startGrids(4);
+        IgniteEx crd = startGrids(4);
 
         crd.cluster().active(true);
 
@@ -260,7 +270,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
 
         stopAllGrids();
 
-        IgniteEx ig0 = (IgniteEx) startGrids(2);
+        IgniteEx ig0 = startGrids(2);
 
         ig0.cluster().active(true);
 
@@ -331,6 +341,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
      */
     @Test
     public void testWithGlobalWalChange() throws Exception {
+        backups = 4;
+
         // Prepare some data.
         IgniteEx crd = (IgniteEx) startGrids(3);
 
@@ -411,6 +423,8 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
      */
     @Test
     public void testRebalanceCancelOnSupplyError() throws Exception {
+        backups = 4;
+
         // Prepare some data.
         IgniteEx crd = (IgniteEx) startGrids(3);
 
@@ -459,7 +473,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         final GridCachePreloader preloader = demanderNode.cachex(CACHE_NAME).context().group().preloader();
 
         GridTestUtils.waitForCondition(() ->
-            ((GridDhtPartitionDemander.RebalanceFuture) preloader.rebalanceFuture()).topologyVersion().equals(curTopVer),
+                ((GridDhtPartitionDemander.RebalanceFuture)preloader.rebalanceFuture()).topologyVersion().equals(curTopVer),
             getTestTimeout()
         );
 
@@ -500,71 +514,6 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Check that historical rebalance doesn't start on the cleared partition when some cluster node restarts.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testRebalanceRestartWithNodeBlinking() throws Exception {
-        int entryCnt = PARTS_CNT * 200;
-
-        // Start 3 nodes cluster:
-        //  node0 - coordinator (main supplier for historical rebalance)
-        //  node1 - some node that will generate NODE_LEFT/NODE_JOINED events
-        //  node2 - historical rebalance demander
-        IgniteEx crd = (IgniteEx)startGridsMultiThreaded(3);
-
-        crd.cluster().state(ClusterState.ACTIVE);
-        crd.cluster().baselineAutoAdjustEnabled(false);
-
-        IgniteCache<Integer, String> cache0 = crd.cache(CACHE_NAME);
-
-        for (int i = 0; i < entryCnt / 2; i++)
-            cache0.put(i, String.valueOf(i));
-
-        forceCheckpoint();
-
-        stopGrid(2);
-
-        for (int i = entryCnt / 2; i < entryCnt; i++)
-            cache0.put(i, String.valueOf(i));
-
-        blockMessagePredicate = (node, msg) -> {
-            if (msg instanceof GridDhtPartitionDemandMessage) {
-                GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
-
-                return msg0.groupId() == CU.cacheId(CACHE_NAME) && msg0.partitions().size() == PARTS_CNT;
-            }
-
-            return false;
-        };
-
-        startGrid(2);
-
-        TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(grid(2));
-
-        // Wait until node2 starts historical rebalancning.
-        spi2.waitForBlocked(1);
-
-        // Interruption of rebalancing by NODE_LEFT event, historical supplier should not be provided.
-        stopGrid(1);
-
-        // Wait until the full rebalance begins.
-        spi2.waitForBlocked(2);
-
-        // Interrupting it again by NODE_JOINED and get a historical supplier again.
-        startGrid(1);
-
-        spi2.stopBlock();
-
-        awaitPartitionMapExchange();
-
-        // Verify data on demander node.
-        for (int i = 0; i < entryCnt; i++)
-            assertEquals(String.valueOf(i), grid(2).cache(CACHE_NAME).get(i));
-    }
-
-    /**
      *
      */
     private static class IndexedObject {
@@ -666,6 +615,72 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Check that historical rebalance doesn't start on the cleared partition when some cluster node restarts.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRebalanceRestartWithNodeBlinking() throws Exception {
+        backups = 2;
+
+        int entryCnt = PARTS_CNT * 200;
+
+        IgniteEx crd = (IgniteEx)startGridsMultiThreaded(3);
+
+        crd.cluster().active(true);
+
+        IgniteCache<Integer, String> cache0 = crd.cache(CACHE_NAME);
+
+        for (int i = 0; i < entryCnt / 2; i++)
+            cache0.put(i, String.valueOf(i));
+
+        forceCheckpoint();
+
+        stopGrid(2);
+
+        for (int i = entryCnt / 2; i < entryCnt; i++)
+            cache0.put(i, String.valueOf(i));
+
+        blockMessagePredicate = (node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+                return msg0.groupId() == CU.cacheId(CACHE_NAME);
+            }
+
+            return false;
+        };
+
+        startGrid(2);
+
+        TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(grid(2));
+
+        // Wait until node2 starts historical rebalancning.
+        spi2.waitForBlocked(1);
+
+        // Interruption of rebalancing by left supplier, should remap to new supplier with full rebalancing.
+        stopGrid(0);
+
+        // Wait until the full rebalance begins with g1 as a supplier.
+        spi2.waitForBlocked(2);
+
+        blockMessagePredicate = null;
+
+        startGrid(0); // Should not force rebalancing remap.
+
+        startGrid(4);
+        resetBaselineTopology(); // Should force rebalancing remap.
+
+        spi2.waitForBlocked(3);
+        spi2.stopBlock();
+
+        awaitPartitionMapExchange();
+
+        // Verify data on demander node.
+        assertPartitionsSame(idleVerify(grid(0), CACHE_NAME));
+    }
+
+    /**
      *
      */
     static class FailingIOFactory implements FileIOFactory {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
index eba9811..f4c7fc7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java
@@ -889,7 +889,12 @@ public class IgniteClusterSnapshotSelfTest extends AbstractSnapshotSelfTest {
 
         for (Ignite grid : G.allGrids()) {
             TestRecordingCommunicationSpi.spi(grid)
-                .blockMessages((node, msg) -> msg instanceof GridDhtPartitionsSingleMessage);
+                .blockMessages((node, msg) -> {
+                    if (msg instanceof GridDhtPartitionsSingleMessage)
+                        return ((GridDhtPartitionsSingleMessage)msg).exchangeId() != null;
+
+                    return false;
+                });
         }
 
         IgniteFuture<Void> fut = grid(1).snapshot().createSnapshot(SNAPSHOT_NAME);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
index a2e63cb..1d94ed1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxCrossCacheMapOnInvalidTopologyTest.java
@@ -34,7 +34,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
@@ -181,7 +180,7 @@ public class TxCrossCacheMapOnInvalidTopologyTest extends GridCommonAbstractTest
                     GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)m;
 
                     // Allow full rebalance for cache 1 and system cache.
-                    if (msg.groupId() == CU.cacheId(CACHE1) || msg.groupId() == CU.cacheId(GridCacheUtils.UTILITY_CACHE_NAME))
+                    if (msg.groupId() != CU.cacheId(CACHE2))
                         return false;
 
                     // Allow only first batch for cache 2.
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheAffinityVersionTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheAffinityVersionTask.java
new file mode 100644
index 0000000..8eb27c6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheAffinityVersionTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.platform;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * Task retrieves a affinity topology version for cache.
+ */
+public class PlatformCacheAffinityVersionTask extends ComputeTaskAdapter<String, Object> {
+    /** {@inheritDoc} */
+    @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        String arg) throws IgniteException {
+
+        return Collections.singletonMap(new Job(arg), F.find(subgrid, null,
+            (IgnitePredicate<? super ClusterNode>)ClusterNode::isLocal));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException {
+        return results.get(0).getData();
+    }
+
+    /**
+     * Job.
+     */
+    private static class Job extends ComputeJobAdapter {
+        /** Ignite instance. */
+        @IgniteInstanceResource
+        protected transient Ignite ignite;
+
+        /**
+         * Cache name.
+         */
+        private String cacheName;
+
+        /**
+         * @param cacheName Cache name.
+         */
+        public Job(String cacheName) {
+            this.cacheName = cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object execute() throws IgniteException {
+            AffinityTopologyVersion topVer = ((IgniteEx)ignite).context().cache().cache(cacheName).context().group()
+                .topology().readyTopologyVersion();
+
+            return new Long[] {topVer.topologyVersion(), (long)topVer.minorTopologyVersion()};
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index bd52959..74e78a5 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -710,7 +710,6 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                                 GridDhtLocalPartition loc = top.localPartition(p, readyVer, false);
 
                                 boolean notPrimary = !affNodes.isEmpty() &&
-                                    !exchMgr.rebalanceTopologyVersion().equals(AffinityTopologyVersion.NONE) &&
                                     !affNodes.get(0).equals(dht.context().affinity().primaryByPartition(p, readyVer));
 
                                 if (affNodesCnt != ownerNodesCnt || !affNodes.containsAll(owners) ||
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index 4898008..a1d9902 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import org.apache.ignite.cache.affinity.PendingExchangeTest;
 import org.apache.ignite.internal.processors.cache.CacheIgniteOutOfMemoryExceptionTest;
 import org.apache.ignite.internal.processors.cache.CacheNoAffinityExchangeTest;
 import org.apache.ignite.internal.processors.cache.ClientFastReplyCoordinatorFailureTest;
@@ -90,6 +91,7 @@ public class IgniteCacheTestSuite6 {
         GridTestUtils.addTestIfNeeded(suite, IgnitePessimisticTxSuspendResumeTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, CacheExchangeMergeTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, PendingExchangeTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, ExchangeMergeStaleServerNodesTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, ClientFastReplyCoordinatorFailureTest.class, ignoredTests);
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 77492a0..d824cd0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -20,7 +20,10 @@ package org.apache.ignite.testsuites;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import org.apache.ignite.cache.BreakRebalanceChainTest;
+import org.apache.ignite.cache.NotOptimizedRebalanceTest;
 import org.apache.ignite.cache.RebalanceCompleteDuringExchangeTest;
+import org.apache.ignite.cache.RebalanceCancellationTest;
 import org.apache.ignite.cache.ResetLostPartitionTest;
 import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse;
 import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
@@ -78,6 +81,9 @@ public class IgnitePdsTestSuite4 {
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsCacheWalDisabledOnRebalancingTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsStartWIthEmptyArchive.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CorruptedTreeFailureHandlingTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, RebalanceCancellationTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, NotOptimizedRebalanceTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, BreakRebalanceChainTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsRestartAfterFailedToWriteMetaPageTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsRemoveDuringRebalancingTest.class, ignoredTests);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
index 35aff9e..2c672e1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
@@ -102,6 +102,8 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe
 
         final IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
 
+        grid(0).cluster().baselineAutoAdjustEnabled(false);
+
         assert cache != null;
 
         for (int i = 0; i < KEY_CNT; i++)
@@ -168,7 +170,7 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe
 
         info("Awaiting rebalance events [restartCnt=" + restartCnt.get() + ']');
 
-        boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 15000);
+        boolean success = lsnr.awaitEvents(countRebalances(GRID_CNT, restartCnt.get()), 15000);
 
         for (int i = 0; i < GRID_CNT; i++)
             grid(i).events().stopLocalListen(lsnr, EventType.EVT_CACHE_REBALANCE_STOPPED);
@@ -177,14 +179,22 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe
     }
 
     /**
+     * This method calculates coutn of Rebalances will be stopped.
+     *
+     * @param nodes Count of nodes into cluster.
+     * @param restarts Count of restarts separete node that was happened.
+     * @return Count of Rebalance events which will be triggered.
+     */
+    protected int countRebalances(int nodes, int restarts) {
+        return nodes * restarts;
+    }
+
+    /**
      *
      */
     protected IgniteInternalFuture createRestartAction(final AtomicBoolean done, final AtomicInteger restartCnt) throws Exception {
         return multithreadedAsync(new Callable<Object>() {
             /** */
-            private final long nodeLifeTime = 2 * 1000;
-
-            /** */
             private final int logFreq = 50;
 
             @SuppressWarnings({"BusyWait"})
@@ -194,10 +204,12 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe
 
                     startGrid(idx);
 
-                    Thread.sleep(nodeLifeTime);
+                    resetBaselineTopology();
 
                     stopGrid(idx);
 
+                    resetBaselineTopology();
+
                     int c = restartCnt.incrementAndGet();
 
                     if (c % logFreq == 0)
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteChangingBaselineCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteChangingBaselineCacheQueryNodeRestartSelfTest.java
index c6e9e82..8acb09f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteChangingBaselineCacheQueryNodeRestartSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteChangingBaselineCacheQueryNodeRestartSelfTest.java
@@ -97,11 +97,12 @@ public class IgniteChangingBaselineCacheQueryNodeRestartSelfTest extends IgniteC
                         lastOpChangeUp = true;
                     }
 
-                    grid(0).cluster().setBaselineTopology(baselineNodes(grid(0).cluster().forServers().nodes()));
+                    resetBaselineTopology();
 
                     Thread.sleep(baselineTopChangeInterval);
 
-                    int c = restartCnt.incrementAndGet();
+                    //Only stopping node triggers Rebalance.
+                    int c = lastOpChangeUp ? restartCnt.get() : restartCnt.incrementAndGet();
 
                     if (c % logFreq == 0)
                         info("BaselineTopology changes: " + c);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteStableBaselineCacheQueryNodeRestartsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteStableBaselineCacheQueryNodeRestartsSelfTest.java
index fc8f5dd..24d2b49 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteStableBaselineCacheQueryNodeRestartsSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/baseline/IgniteStableBaselineCacheQueryNodeRestartsSelfTest.java
@@ -16,9 +16,13 @@
  */
 package org.apache.ignite.internal.processors.database.baseline;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQueryNodeRestartSelfTest;
 
 /**
@@ -59,6 +63,36 @@ public class IgniteStableBaselineCacheQueryNodeRestartsSelfTest extends IgniteCa
     }
 
     /** {@inheritDoc} */
+    @Override protected IgniteInternalFuture createRestartAction(final AtomicBoolean done, final AtomicInteger restartCnt) throws Exception {
+        return multithreadedAsync(new Callable<Object>() {
+            /** */
+            private final int logFreq = 50;
+
+            @SuppressWarnings({"BusyWait"})
+            @Override public Object call() throws Exception {
+                while (!done.get()) {
+                    int idx = gridCount();
+
+                    startGrid(idx);
+
+                    stopGrid(idx);
+
+                    int c = restartCnt.incrementAndGet();
+
+                    if (c % logFreq == 0)
+                        info("Node restarts: " + c);
+                }
+
+                return true;
+            }
+        }, 1, "restart-thread");
+    }
+
+    @Override protected int countRebalances(int nodes, int restarts) {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
         stopAllGrids();
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
index a3a8979..1e913db 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
@@ -26,10 +26,10 @@ namespace Apache.Ignite.Core.Tests.Compute
     using System.Threading;
     using System.Threading.Tasks;
     using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cache.Affinity;
     using Apache.Ignite.Core.Client;
     using Apache.Ignite.Core.Cluster;
     using Apache.Ignite.Core.Compute;
-    using Apache.Ignite.Core.Events;
     using Apache.Ignite.Core.Impl;
     using Apache.Ignite.Core.Resource;
     using NUnit.Framework;
@@ -67,12 +67,12 @@ namespace Apache.Ignite.Core.Tests.Compute
 
             _grid1 = Ignition.Start(Configuration(configs.Item1));
             _grid2 = Ignition.Start(Configuration(configs.Item2));
-            _grid3 = Ignition.Start(Configuration(configs.Item3));
 
-            // Wait for rebalance.
-            var events = _grid1.GetEvents();
-            events.EnableLocal(EventType.CacheRebalanceStopped);
-            events.WaitForLocal(EventType.CacheRebalanceStopped);
+            AffinityTopologyVersion waitingTop = new AffinityTopologyVersion(2, 1);
+            
+            Assert.True(_grid1.WaitTopology(waitingTop), "Failed to wait topology " + waitingTop);
+            
+            _grid3 = Ignition.Start(Configuration(configs.Item3));
 
             // Start thin client.
             _igniteClient = Ignition.StartClient(GetThinClientConfiguration());
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTestLocalListeners.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTestLocalListeners.cs
index ca98801..556fbf4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTestLocalListeners.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/EventsTestLocalListeners.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Tests
 {
     using System.Collections.Generic;
     using System.Linq;
+    using Apache.Ignite.Core.Cache.Affinity;
     using Apache.Ignite.Core.Cache.Configuration;
     using Apache.Ignite.Core.Common;
     using Apache.Ignite.Core.Events;
@@ -38,23 +39,41 @@ namespace Apache.Ignite.Core.Tests
         [Test]
         public void TestRebalanceEvents()
         {
+            ICollection<int> cacheRebalanceStopStartEvts = new[]
+            {
+                EventType.CacheRebalanceStarted,
+                EventType.CacheRebalanceStopped
+            };
+            
             var listener = new Listener<CacheRebalancingEvent>();
 
-            using (Ignition.Start(GetConfig(listener, EventType.CacheRebalanceAll)))
+            using (IIgnite ignite0 = Ignition.Start(GetConfig(listener, cacheRebalanceStopStartEvts, "TestRebalanceEvents")))
             {
-                var events = listener.GetEvents();
+                var cache = ignite0.GetOrCreateCache<int, int>(CacheName);
 
-                Assert.AreEqual(2, events.Count);
+                for (int i = 0; i < 2000; i++)
+                    cache[i] = i;
+                
+                using (IIgnite ignite1 = Ignition.Start(GetConfig(listener, cacheRebalanceStopStartEvts)))
+                {
+                    AffinityTopologyVersion afterRebalanceTop =  new AffinityTopologyVersion(2, 1);
+                    
+                    Assert.True(ignite1.WaitTopology(afterRebalanceTop, CacheName), "Failed to wait topology " + afterRebalanceTop);
+                    
+                    var events = listener.GetEvents();
+
+                    Assert.AreEqual(2, events.Count);
 
-                var rebalanceStart = events.First();
+                    var rebalanceStart = events.First();
 
-                Assert.AreEqual(CacheName, rebalanceStart.CacheName);
-                Assert.AreEqual(EventType.CacheRebalanceStarted, rebalanceStart.Type);
+                    Assert.AreEqual(CacheName, rebalanceStart.CacheName);
+                    Assert.AreEqual(EventType.CacheRebalanceStarted, rebalanceStart.Type);
 
-                var rebalanceStop = events.Last();
+                    var rebalanceStop = events.Last();
 
-                Assert.AreEqual(CacheName, rebalanceStop.CacheName);
-                Assert.AreEqual(EventType.CacheRebalanceStopped, rebalanceStop.Type);
+                    Assert.AreEqual(CacheName, rebalanceStop.CacheName);
+                    Assert.AreEqual(EventType.CacheRebalanceStopped, rebalanceStop.Type);
+                }
             }
         }
 
@@ -131,11 +150,13 @@ namespace Apache.Ignite.Core.Tests
         /// <summary>
         /// Gets the configuration.
         /// </summary>
-        private static IgniteConfiguration GetConfig<T>(IEventListener<T> listener, ICollection<int> eventTypes) 
+        private static IgniteConfiguration GetConfig<T>(IEventListener<T> listener, ICollection<int> eventTypes,
+            string instanceName = null) 
             where T : IEvent
         {
             return new IgniteConfiguration(TestUtils.GetTestConfiguration())
             {
+                IgniteInstanceName = instanceName,
                 LocalEventListeners = new[]
                 {
                     new LocalEventListener<T>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.Common.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.Common.cs
index fffff90..92f3761 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.Common.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.Common.cs
@@ -26,6 +26,7 @@ namespace Apache.Ignite.Core.Tests
     using System.Reflection;
     using System.Threading;
     using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cache.Affinity;
     using Apache.Ignite.Core.Cluster;
     using Apache.Ignite.Core.Discovery.Tcp;
     using Apache.Ignite.Core.Discovery.Tcp.Static;
@@ -47,6 +48,9 @@ namespace Apache.Ignite.Core.Tests
         /** */
         public const int DfltBusywaitSleepInterval = 200;
 
+        /** System cache name. */
+        public const string utilityCacheName = "ignite-sys-cache";
+
         /** Work dir. */
         private static readonly string WorkDir =
             // ReSharper disable once AssignNullToNotNullAttribute
@@ -245,6 +249,45 @@ namespace Apache.Ignite.Core.Tests
         }
 
         /// <summary>
+        /// Waits for particular topology on specific cache (system cache by default).
+        /// </summary>
+        /// <param name="grid">Grid.</param>
+        /// <param name="waitingTop">Topology version.</param>
+        /// <param name="cacheName">Cache name.</param>
+        /// <param name="timeout">Timeout.</param>
+        /// <returns>
+        ///   <c>True</c> if topology took required size.
+        /// </returns>
+        public static bool WaitTopology(this IIgnite grid, AffinityTopologyVersion waitingTop,
+            string cacheName = utilityCacheName, int timeout = 30000)
+        {
+            int checkPeriod = 200;
+
+            // Wait for late affinity.
+            for (var iter = 0;; iter++)
+            {
+                var result = grid.GetCompute().ExecuteJavaTask<long[]>(
+                    "org.apache.ignite.platform.PlatformCacheAffinityVersionTask", cacheName);
+                var top = new AffinityTopologyVersion(result[0], (int) result[1]);
+                if (top.CompareTo(waitingTop) >= 0)
+                {
+                    Console.Out.WriteLine("Current topology: " + top);
+                    break;
+                }
+
+                if (iter % 10 == 0)
+                    Console.Out.WriteLine("Waiting topology cur=" + top + " wait=" + waitingTop);
+
+                if (iter * checkPeriod > timeout)
+                    return false;
+
+                Thread.Sleep(checkPeriod);
+            }
+
+            return true;
+        }
+
+        /// <summary>
         /// Waits for condition, polling in busy wait loop.
         /// </summary>
         /// <param name="cond">Condition.</param>