You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2020/07/06 10:14:55 UTC

[ignite] branch master updated: IGNITE-13193 Added fallback to full rebalance if historical one has failed.

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new fde65a6  IGNITE-13193 Added fallback to full rebalance if historical one has failed.
fde65a6 is described below

commit fde65a6bdaa782108ea50b2bb746a9980aa5b680
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Mon Jul 6 13:14:23 2020 +0300

    IGNITE-13193 Added fallback to full rebalance if historical one has failed.
---
 .../cache/GridCachePartitionExchangeManager.java   | 310 +++++------
 .../processors/cache/GridCachePreloader.java       |   5 +-
 .../cache/GridCachePreloaderAdapter.java           |   5 +-
 .../dht/preloader/GridDhtPartitionDemander.java    |  55 +-
 .../dht/preloader/GridDhtPartitionSupplier.java    |  91 ++--
 .../preloader/GridDhtPartitionsExchangeFuture.java |  74 ++-
 .../dht/preloader/GridDhtPreloader.java            |  27 +-
 ...java => IgniteHistoricalIteratorException.java} |  33 +-
 .../preloader/RebalanceReassignExchangeTask.java   |  15 +-
 .../cache/persistence/GridCacheOffheapManager.java | 142 ++---
 .../IgniteShutdownOnSupplyMessageFailureTest.java  |  17 +
 .../persistence/db/wal/IgniteWalRebalanceTest.java | 581 ++++++++++++++++++++-
 12 files changed, 1050 insertions(+), 305 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 15aad21..fc8bef2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1173,8 +1173,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /**
      * @param exchId Exchange ID.
      */
-    public void forceReassign(GridDhtPartitionExchangeId exchId) {
-        exchWorker.forceReassign(exchId);
+    public void forceReassign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture fut) {
+        exchWorker.forceReassign(exchId, fut);
     }
 
     /**
@@ -2878,9 +2878,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         /** */
         private AffinityTopologyVersion lastFutVer;
 
-        /** Busy flag used as performance optimization to stop current preloading. */
-        private volatile boolean busy;
-
         /** */
         private boolean crd;
 
@@ -2901,9 +2898,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         /**
          * @param exchId Exchange ID.
          */
-        void forceReassign(GridDhtPartitionExchangeId exchId) {
-            if (!hasPendingExchange() && !busy)
-                futQ.add(new RebalanceReassignExchangeTask(exchId));
+        void forceReassign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture fut) {
+            if (!hasPendingExchange())
+                futQ.add(new RebalanceReassignExchangeTask(exchId, fut));
         }
 
         /**
@@ -3048,11 +3045,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             if (!futQ.isEmpty()) {
                 for (CachePartitionExchangeWorkerTask task : futQ) {
                     if (task instanceof GridDhtPartitionsExchangeFuture) {
-                        // First event is enough to check,
-                        // because only current exchange future can have multiple discovery events (exchange merge).
-                        ClusterNode triggeredBy = ((GridDhtPartitionsExchangeFuture) task).firstEvent().eventNode();
-
-                        if (!triggeredBy.isClient())
+                        if (((GridDhtPartitionsExchangeFuture)task).changedAffinity())
                             return true;
                     }
                 }
@@ -3173,8 +3166,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                         continue;
                     }
 
-                    busy = true;
-
                     Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
 
                     boolean forcePreload = false;
@@ -3185,197 +3176,220 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                     AffinityTopologyVersion resVer = null;
 
-                    try {
-                        if (isCancelled())
-                            break;
+                    if (isCancelled())
+                        break;
 
-                        if (task instanceof RebalanceReassignExchangeTask)
-                            exchId = ((RebalanceReassignExchangeTask) task).exchangeId();
-                        else if (task instanceof ForceRebalanceExchangeTask) {
-                            forcePreload = true;
+                    if (task instanceof RebalanceReassignExchangeTask) {
+                        RebalanceReassignExchangeTask reassignTask = (RebalanceReassignExchangeTask)task;
 
-                            timeout = 0; // Force refresh.
+                        exchId = reassignTask.exchangeId();
 
-                            exchId = ((ForceRebalanceExchangeTask)task).exchangeId();
-                        }
-                        else {
-                            assert task instanceof GridDhtPartitionsExchangeFuture : task;
+                        GridDhtPartitionsExchangeFuture fut = reassignTask.future();
 
-                            exchFut = (GridDhtPartitionsExchangeFuture)task;
+                        assert fut.changedAffinity() :
+                            "Reassignment request started for exchange future which didn't change affinity " +
+                                "[exchId=" + exchId + ", fut=" + exchFut + ']';
 
-                            exchId = exchFut.exchangeId();
+                        if (fut.hasInapplicableNodesForRebalance()) {
+                              GridDhtPartitionsExchangeFuture lastFut = lastFinishedFut.get();
 
-                            lastInitializedFut = exchFut;
+                              AffinityTopologyVersion lastAffChangedVer = cctx.exchange().
+                                  lastAffinityChangedTopologyVersion(lastFut.topologyVersion());
 
-                            boolean newCrd = false;
+                             if (fut.topologyVersion().equals(lastAffChangedVer))
+                                  exchFut = fut;
+                             else if (lastAffChangedVer.after(exchId.topologyVersion())) {
+                                // There is a new exchange which should trigger rebalancing.
+                                // This reassignment request can be skipped.
+                                if (log.isInfoEnabled()) {
+                                    log.info("Partitions reassignment request skipped due to affinity was already changed" +
+                                        " [reassignTopVer=" + exchId.topologyVersion() +
+                                        ", lastAffChangedTopVer=" + lastAffChangedVer + ']');
+                                }
 
-                            if (!crd) {
-                                List<ClusterNode> srvNodes = exchFut.firstEventCache().serverNodes();
+                                continue;
+                             }
+                         }
+                    }
+                    else if (task instanceof ForceRebalanceExchangeTask) {
+                        forcePreload = true;
 
-                                crd = newCrd = !srvNodes.isEmpty() && srvNodes.get(0).isLocal();
-                            }
+                        timeout = 0; // Force refresh.
 
-                            if (!exchFut.changedAffinity()) {
-                                GridDhtPartitionsExchangeFuture lastFut = lastFinishedFut.get();
+                        exchId = ((ForceRebalanceExchangeTask)task).exchangeId();
+                    }
+                    else {
+                        assert task instanceof GridDhtPartitionsExchangeFuture : task;
 
-                                if (lastFut != null) {
-                                    if (!lastFut.changedAffinity()) {
-                                        // If lastFut corresponds to merged exchange, it is essential to use
-                                        // topologyVersion() instead of initialVersion() - nodes joined in this PME
-                                        // will have DiscoCache only for the last version.
-                                        AffinityTopologyVersion lastAffVer = cctx.exchange()
-                                            .lastAffinityChangedTopologyVersion(lastFut.topologyVersion());
+                        exchFut = (GridDhtPartitionsExchangeFuture)task;
 
-                                        cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(),
-                                            lastAffVer);
-                                    }
-                                    else
-                                        cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(),
-                                            lastFut.topologyVersion());
+                        exchId = exchFut.exchangeId();
+
+                        lastInitializedFut = exchFut;
+
+                        boolean newCrd = false;
+
+                        if (!crd) {
+                            List<ClusterNode> srvNodes = exchFut.firstEventCache().serverNodes();
+
+                            crd = newCrd = !srvNodes.isEmpty() && srvNodes.get(0).isLocal();
+                        }
+
+                        if (!exchFut.changedAffinity()) {
+                            GridDhtPartitionsExchangeFuture lastFut = lastFinishedFut.get();
+
+                            if (lastFut != null) {
+                                if (!lastFut.changedAffinity()) {
+                                    // If lastFut corresponds to merged exchange, it is essential to use
+                                    // topologyVersion() instead of initialVersion() - nodes joined in this PME
+                                    // will have DiscoCache only for the last version.
+                                    AffinityTopologyVersion lastAffVer = cctx.exchange()
+                                        .lastAffinityChangedTopologyVersion(lastFut.topologyVersion());
+
+                                    cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(),
+                                        lastAffVer);
                                 }
+                                else
+                                    cctx.exchange().lastAffinityChangedTopologyVersion(exchFut.initialVersion(),
+                                        lastFut.topologyVersion());
                             }
+                        }
 
-                            exchFut.timeBag().finishGlobalStage("Waiting in exchange queue");
+                        exchFut.timeBag().finishGlobalStage("Waiting in exchange queue");
 
-                            exchFut.init(newCrd);
+                        exchFut.init(newCrd);
 
-                            int dumpCnt = 0;
+                        int dumpCnt = 0;
 
-                            long waitStartNanos = System.nanoTime();
+                        long waitStartNanos = System.nanoTime();
 
-                            // Call rollback logic only for client node, for server nodes
-                            // rollback logic is in GridDhtPartitionsExchangeFuture.
-                            boolean txRolledBack = !cctx.localNode().isClient();
+                        // Call rollback logic only for client node, for server nodes
+                        // rollback logic is in GridDhtPartitionsExchangeFuture.
+                        boolean txRolledBack = !cctx.localNode().isClient();
 
-                            IgniteConfiguration cfg = cctx.gridConfig();
+                        IgniteConfiguration cfg = cctx.gridConfig();
 
-                            final long dumpTimeout = 2 * cfg.getNetworkTimeout();
+                        final long dumpTimeout = 2 * cfg.getNetworkTimeout();
 
-                            long nextDumpTime = 0;
+                        long nextDumpTime = 0;
 
-                            while (true) {
-                                // Read txTimeoutOnPME from configuration after every iteration.
-                                long curTimeout = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
+                        while (true) {
+                            // Read txTimeoutOnPME from configuration after every iteration.
+                            long curTimeout = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
+
+                            try {
+                                long exchTimeout = curTimeout > 0 && !txRolledBack
+                                    ? Math.min(curTimeout, dumpTimeout)
+                                    : dumpTimeout;
+
+                                blockingSectionBegin();
 
                                 try {
-                                    long exchTimeout = curTimeout > 0 && !txRolledBack
-                                        ? Math.min(curTimeout, dumpTimeout)
-                                        : dumpTimeout;
+                                    resVer = exchFut.get(exchTimeout, TimeUnit.MILLISECONDS);
+                                } finally {
+                                    blockingSectionEnd();
+                                }
 
-                                    blockingSectionBegin();
+                                onIdle();
+
+                                break;
+                            }
+                            catch (IgniteFutureTimeoutCheckedException ignored) {
+                                updateHeartbeat();
+
+                                if (nextDumpTime <= U.currentTimeMillis()) {
+                                    U.warn(diagnosticLog, "Failed to wait for partition map exchange [" +
+                                        "topVer=" + exchFut.initialVersion() +
+                                        ", node=" + cctx.localNodeId() + "]. " +
+                                        (curTimeout <= 0 && !txRolledBack ? "Consider changing " +
+                                        "TransactionConfiguration.txTimeoutOnPartitionMapExchange" +
+                                        " to non default value to avoid this message. " : "") +
+                                        "Dumping pending objects that might be the cause: ");
 
                                     try {
-                                        resVer = exchFut.get(exchTimeout, TimeUnit.MILLISECONDS);
-                                    } finally {
-                                        blockingSectionEnd();
+                                        dumpDebugInfo(exchFut);
                                     }
-
-                                    onIdle();
-
-                                    break;
-                                }
-                                catch (IgniteFutureTimeoutCheckedException ignored) {
-                                    updateHeartbeat();
-
-                                    if (nextDumpTime <= U.currentTimeMillis()) {
-                                        U.warn(diagnosticLog, "Failed to wait for partition map exchange [" +
-                                            "topVer=" + exchFut.initialVersion() +
-                                            ", node=" + cctx.localNodeId() + "]. " +
-                                            (curTimeout <= 0 && !txRolledBack ? "Consider changing " +
-                                            "TransactionConfiguration.txTimeoutOnPartitionMapExchange" +
-                                            " to non default value to avoid this message. " : "") +
-                                            "Dumping pending objects that might be the cause: ");
-
-                                        try {
-                                            dumpDebugInfo(exchFut);
-                                        }
-                                        catch (Exception e) {
-                                            U.error(diagnosticLog, "Failed to dump debug information: " + e, e);
-                                        }
-
-                                        nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, dumpTimeout);
+                                    catch (Exception e) {
+                                        U.error(diagnosticLog, "Failed to dump debug information: " + e, e);
                                     }
 
-                                    long passedMillis = U.millisSinceNanos(waitStartNanos);
+                                    nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, dumpTimeout);
+                                }
 
-                                    if (!txRolledBack && curTimeout > 0 && passedMillis >= curTimeout) {
-                                        txRolledBack = true; // Try automatic rollback only once.
+                                long passedMillis = U.millisSinceNanos(waitStartNanos);
 
-                                        cctx.tm().rollbackOnTopologyChange(exchFut.initialVersion());
-                                    }
-                                }
-                                catch (Exception e) {
-                                    if (exchFut.reconnectOnError(e))
-                                        throw new IgniteNeedReconnectException(cctx.localNode(), e);
+                                if (!txRolledBack && curTimeout > 0 && passedMillis >= curTimeout) {
+                                    txRolledBack = true; // Try automatic rollback only once.
 
-                                    throw e;
+                                    cctx.tm().rollbackOnTopologyChange(exchFut.initialVersion());
                                 }
                             }
+                            catch (Exception e) {
+                                if (exchFut.reconnectOnError(e))
+                                    throw new IgniteNeedReconnectException(cctx.localNode(), e);
 
-                            removeMergedFutures(resVer, exchFut);
+                                throw e;
+                            }
+                        }
 
-                            if (log.isTraceEnabled())
-                                log.trace("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
-                                    this + ']');
+                        removeMergedFutures(resVer, exchFut);
 
-                            if (exchFut.exchangeId().nodeId().equals(cctx.localNodeId()))
-                                lastRefresh.compareAndSet(-1, U.currentTimeMillis());
+                        if (log.isTraceEnabled())
+                            log.trace("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
+                                this + ']');
 
-                            // Just pick first worker to do this, so we don't
-                            // invoke topology callback more than once for the
-                            // same event.
+                        if (exchFut.exchangeId().nodeId().equals(cctx.localNodeId()))
+                            lastRefresh.compareAndSet(-1, U.currentTimeMillis());
 
-                            boolean changed = false;
+                        // Just pick first worker to do this, so we don't
+                        // invoke topology callback more than once for the
+                        // same event.
+                        boolean changed = false;
 
-                            for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                                if (grp.isLocal())
-                                    continue;
+                        for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                            if (grp.isLocal())
+                                continue;
 
-                                changed |= grp.topology().afterExchange(exchFut);
-                            }
+                            changed |= grp.topology().afterExchange(exchFut);
+                        }
 
-                            if (!cctx.kernalContext().clientNode() && changed) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Refresh partitions due to mapping was changed");
+                        if (!cctx.kernalContext().clientNode() && changed) {
+                            if (log.isDebugEnabled())
+                                log.debug("Refresh partitions due to mapping was changed");
 
-                                refreshPartitions();
-                            }
+                            refreshPartitions();
                         }
+                    }
 
-                        if (rebalanceRequired(exchFut)) {
-                            if (rebalanceDelay > 0)
-                                U.sleep(rebalanceDelay);
+                    if (rebalanceRequired(exchFut)) {
+                        if (rebalanceDelay > 0)
+                            U.sleep(rebalanceDelay);
 
-                            assignsMap = new HashMap<>();
+                        assignsMap = new HashMap<>();
 
-                            IgniteCacheSnapshotManager snp = cctx.snapshot();
+                        IgniteCacheSnapshotManager snp = cctx.snapshot();
 
-                            for (final CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                                long delay = grp.config().getRebalanceDelay();
+                        for (final CacheGroupContext grp : cctx.cache().cacheGroups()) {
+                            long delay = grp.config().getRebalanceDelay();
 
-                                boolean disableRebalance = snp.partitionsAreFrozen(grp);
+                            boolean disableRebalance = snp.partitionsAreFrozen(grp);
 
-                                GridDhtPreloaderAssignments assigns = null;
+                            GridDhtPreloaderAssignments assigns = null;
 
-                                // Don't delay for dummy reassigns to avoid infinite recursion.
-                                if ((delay == 0 || forcePreload) && !disableRebalance)
-                                    assigns = grp.preloader().generateAssignments(exchId, exchFut);
+                            // Don't delay for dummy reassigns to avoid infinite recursion.
+                            if ((delay == 0 || forcePreload) && !disableRebalance)
+                                assigns = grp.preloader().generateAssignments(exchId, exchFut);
 
-                                assignsMap.put(grp.groupId(), assigns);
+                            assignsMap.put(grp.groupId(), assigns);
 
-                                if (resVer == null && !grp.isLocal())
-                                    resVer = grp.topology().readyTopologyVersion();
-                            }
+                            if (resVer == null && !grp.isLocal())
+                                resVer = grp.topology().readyTopologyVersion();
                         }
-
-                        if (resVer == null)
-                            resVer = exchId.topologyVersion();
-                    }
-                    finally {
-                        // Must flip busy flag before assignments are given to demand workers.
-                        busy = false;
                     }
 
+                    if (resVer == null)
+                        resVer = exchId.topologyVersion();
+
                     if (!F.isEmpty(assignsMap)) {
                         int size = assignsMap.size();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index c70f86b..3422a16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -76,8 +76,9 @@ public interface GridCachePreloader {
      * @param exchFut Completed exchange future. Can be {@code null} if forced or reassigned generation occurs.
      * @return Partition assignments which will be requested from supplier nodes.
      */
-    @Nullable public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId,
-                                                                     @Nullable GridDhtPartitionsExchangeFuture exchFut);
+    @Nullable public GridDhtPreloaderAssignments generateAssignments(
+        GridDhtPartitionExchangeId exchId,
+        @Nullable GridDhtPartitionsExchangeFuture exchFut);
 
     /**
      * Adds assignments to preloader.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index b382ba4..a98d192 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -144,8 +144,9 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId,
-                                                                     GridDhtPartitionsExchangeFuture exchFut) {
+    @Override public GridDhtPreloaderAssignments generateAssignments(
+        GridDhtPartitionExchangeId exchId,
+        GridDhtPartitionsExchangeFuture exchFut) {
         return null;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 5277085..514dd1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -327,7 +327,7 @@ public class GridDhtPartitionDemander {
                 return null;
             }
 
-            final RebalanceFuture fut = new RebalanceFuture(grp, assignments, log, rebalanceId, next, lastCancelledTime);
+            final RebalanceFuture fut = new RebalanceFuture(grp, lastExchangeFut, assignments, log, rebalanceId, next, lastCancelledTime);
 
             if (!grp.localWalEnabled()) {
                 fut.listen(new IgniteInClosureX<IgniteInternalFuture<Boolean>>() {
@@ -482,7 +482,7 @@ public class GridDhtPartitionDemander {
 
             if (node == null) {
                 if (log.isDebugEnabled())
-                    log.debug("Supply message ignored (supplier has left cluster) [" + demandRoutineInfo(nodeId, supplyMsg) + "]");
+                    log.debug("Supply message ignored (supplier has left cluster) [" + demandRoutineInfo(nodeId, supplyMsg) + ']');
 
                 return;
             }
@@ -490,13 +490,13 @@ public class GridDhtPartitionDemander {
             // Topology already changed (for the future that supply message based on).
             if (!fut.isActual(supplyMsg.rebalanceId())) {
                 if (log.isDebugEnabled())
-                    log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(nodeId, supplyMsg) + "]");
+                    log.debug("Supply message ignored (topology changed) [" + demandRoutineInfo(nodeId, supplyMsg) + ']');
 
                 return;
             }
 
             if (log.isDebugEnabled())
-                log.debug("Received supply message [" + demandRoutineInfo(nodeId, supplyMsg) + "]");
+                log.debug("Received supply message [" + demandRoutineInfo(nodeId, supplyMsg) + ']');
 
             // Check whether there were error during supply message unmarshalling process.
             if (supplyMsg.classError() != null) {
@@ -616,7 +616,7 @@ public class GridDhtPartitionDemander {
 
                             if (log.isDebugEnabled())
                                 log.debug("Skipping rebalancing partition (state is not MOVING): " +
-                                    "[" + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + "]");
+                                    '[' + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + ']');
                         }
                     }
                     else {
@@ -624,7 +624,7 @@ public class GridDhtPartitionDemander {
 
                         if (log.isDebugEnabled())
                             log.debug("Skipping rebalancing partition (affinity changed): " +
-                                "[" + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + "]");
+                                '[' + demandRoutineInfo(nodeId, supplyMsg) + ", p=" + p + ']');
                     }
                 }
 
@@ -662,7 +662,7 @@ public class GridDhtPartitionDemander {
                 else {
                     if (log.isDebugEnabled())
                         log.debug("Will not request next demand message [" + demandRoutineInfo(nodeId, supplyMsg) +
-                            ", rebalanceFuture=" + fut + "]");
+                            ", rebalanceFuture=" + fut + ']');
                 }
             }
             catch (IgniteSpiException | IgniteCheckedException e) {
@@ -985,14 +985,14 @@ public class GridDhtPartitionDemander {
      * Internal states of rebalance future.
      */
     private enum RebalanceFutureState {
-        /** Init. */
+        /** Initial state. */
         INIT,
 
-        /** Started. */
+        /** Rebalance future started and requested required partitions. */
         STARTED,
 
-        /** Marked as cancelled. */
-        MARK_CANCELLED,
+        /** Marked as cancelled. This means partitions will not be requested. */
+        MARK_CANCELLED
     }
 
     /**
@@ -1018,13 +1018,17 @@ public class GridDhtPartitionDemander {
         /** Remaining. */
         private final Map<UUID, IgniteDhtDemandedPartitionsMap> remaining = new HashMap<>();
 
-        /** Missed. */
+        /** Collection of missed partitions and partitions that could not be rebalanced from a supplier. */
         private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
 
         /** Exchange ID. */
         @GridToStringExclude
         private final GridDhtPartitionExchangeId exchId;
 
+        /** Coresponding exchange future. */
+        @GridToStringExclude
+        private final GridDhtPartitionsExchangeFuture exchFut;
+
         /** Topology version. */
         private final AffinityTopologyVersion topVer;
 
@@ -1076,7 +1080,10 @@ public class GridDhtPartitionDemander {
         private final Map<ClusterNode, Set<Integer>> rebalancingParts;
 
         /**
-         * @param grp Cache group.
+         * Creates a new rebalance future.
+         *
+         * @param grp Cache group context.
+         * @param exchFut Exchange future.
          * @param assignments Assignments.
          * @param log Logger.
          * @param rebalanceId Rebalance id.
@@ -1085,17 +1092,21 @@ public class GridDhtPartitionDemander {
          */
         RebalanceFuture(
             CacheGroupContext grp,
+            GridDhtPartitionsExchangeFuture exchFut,
             GridDhtPreloaderAssignments assignments,
             IgniteLogger log,
             long rebalanceId,
             RebalanceFuture next,
-            AtomicLong lastCancelledTime) {
+            AtomicLong lastCancelledTime
+        ) {
             assert assignments != null;
+            assert assignments != null : "Asiignments must not be null.";
 
             this.rebalancingParts = U.newHashMap(assignments.size());
             this.assignments = assignments;
             exchId = assignments.exchangeId();
             topVer = assignments.topologyVersion();
+            this.exchFut = exchFut;
             this.next = next;
 
             this.lastCancelledTime = lastCancelledTime;
@@ -1142,6 +1153,7 @@ public class GridDhtPartitionDemander {
             this.assignments = null;
             this.exchId = null;
             this.topVer = null;
+            this.exchFut = null;
             this.ctx = null;
             this.grp = null;
             this.log = null;
@@ -1476,6 +1488,19 @@ public class GridDhtPartitionDemander {
             if (isDone())
                 return;
 
+            IgniteDhtDemandedPartitionsMap parts = remaining.get(nodeId);
+
+            assert parts != null : "Remaining not found [grp=" + grp.cacheOrGroupName() + ", fromNode=" + nodeId +
+                ", part=" + p + "]";
+
+            if (parts.historicalMap().contains(p)) {
+                // The partition p cannot be wal rebalanced,
+                // let's exclude the given nodeId and give a try to full rebalance.
+                exchFut.markNodeAsInapplicableForHistoricalRebalance(nodeId);
+            }
+            else
+                exchFut.markNodeAsInapplicableForFullRebalance(nodeId, grp.groupId(), p);
+
             missed.computeIfAbsent(nodeId, k -> new HashSet<>());
 
             missed.get(nodeId).add(p);
@@ -1611,7 +1636,7 @@ public class GridDhtPartitionDemander {
 
                     onDone(false); // Finished but has missed partitions, will force dummy exchange
 
-                    ctx.exchange().forceReassign(exchId);
+                    ctx.exchange().forceReassign(exchId, exchFut);
 
                     return;
                 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 8dfb7c2..d054b05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -18,10 +18,12 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -47,7 +49,9 @@ import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware;
 import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -214,6 +218,15 @@ class GridDhtPartitionSupplier {
 
         SupplyContext sctx = null;
 
+        Set<Integer> remainingParts = null;
+
+        GridDhtPartitionSupplyMessage supplyMsg = new GridDhtPartitionSupplyMessage(
+            demandMsg.rebalanceId(),
+            grp.groupId(),
+            demandMsg.topologyVersion(),
+            grp.deploymentEnabled()
+        );
+
         try {
             synchronized (scMap) {
                 sctx = scMap.remove(contextId);
@@ -257,15 +270,6 @@ class GridDhtPartitionSupplier {
             else
                 maxBatchesCnt = 1;
 
-            GridDhtPartitionSupplyMessage supplyMsg = new GridDhtPartitionSupplyMessage(
-                demandMsg.rebalanceId(),
-                grp.groupId(),
-                demandMsg.topologyVersion(),
-                grp.deploymentEnabled()
-            );
-
-            Set<Integer> remainingParts;
-
             if (sctx == null || sctx.iterator == null) {
                 iter = grp.offheap().rebalanceIterator(demandMsg.partitions(), demandMsg.topologyVersion());
 
@@ -455,42 +459,65 @@ class GridDhtPartitionSupplier {
             }
             else
                 U.error(log, "Failed to continue supplying ["
-                    + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t);
+                    + supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t);
 
             try {
                 if (sctx != null)
                     clearContext(sctx, log);
-                else if (iter != null)
-                    iter.close();
             }
             catch (Throwable t1) {
                 U.error(log, "Failed to cleanup supplying context ["
-                    + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t1);
+                    + supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1);
             }
 
             if (!sendErrMsg)
                 return;
 
+            boolean fallbackToFullRebalance = X.hasCause(t, IgniteHistoricalIteratorException.class);
+
             try {
-                GridDhtPartitionSupplyMessageV2 errMsg = new GridDhtPartitionSupplyMessageV2(
-                    demandMsg.rebalanceId(),
-                    grp.groupId(),
-                    demandMsg.topologyVersion(),
-                    grp.deploymentEnabled(),
-                    t
-                );
+                GridDhtPartitionSupplyMessage errMsg;
+
+                if (fallbackToFullRebalance) {
+                    // Mark the last checkpoint as not applicable for WAL rebalance.
+                    grp.shared().database().lastCheckpointInapplicableForWalRebalance(grp.groupId());
+
+                    // Mark all remaining partitions as missed to trigger full rebalance.
+                    if (iter == null && F.isEmpty(remainingParts)) {
+                        remainingParts = new HashSet<>(demandMsg.partitions().fullSet());
+                        remainingParts.addAll(demandMsg.partitions().historicalSet());
+                    }
+
+                    for (int p : Optional.ofNullable(remainingParts).orElseGet(Collections::emptySet))
+                        supplyMsg.missed(p);
+
+                    errMsg = supplyMsg;
+                }
+                else {
+                    errMsg = new GridDhtPartitionSupplyMessageV2(
+                        demandMsg.rebalanceId(),
+                        grp.groupId(),
+                        demandMsg.topologyVersion(),
+                        grp.deploymentEnabled(),
+                        t
+                    );
+                }
 
                 reply(topicId, demanderNode, demandMsg, errMsg, contextId);
             }
             catch (Throwable t1) {
                 U.error(log, "Failed to send supply error message ["
-                    + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t1);
+                    + supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1);
             }
 
-            grp.shared().kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR,
-                new IgniteCheckedException("Failed to continue supplying ["
-                    + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t)
-            ));
+            // If fallback to full rebalance is possible then let's try to switch to it
+            // instead of triggering failure handler.
+            if (!fallbackToFullRebalance) {
+                grp.shared().kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR,
+                    new IgniteCheckedException("Failed to continue supplying ["
+                        + supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t)
+                ));
+            }
         }
     }
 
@@ -537,7 +564,7 @@ class GridDhtPartitionSupplier {
      * @param demander Recipient of supply message.
      * @param demandMsg Demand message.
      * @param supplyMsg Supply message.
-     * @param contextId Supply context id.
+     * @param ctxId Supply context id.
      * @return {@code True} if message was sent, {@code false} if recipient left grid.
      * @throws IgniteCheckedException If failed.
      */
@@ -546,7 +573,7 @@ class GridDhtPartitionSupplier {
         ClusterNode demander,
         GridDhtPartitionDemandMessage demandMsg,
         GridDhtPartitionSupplyMessage supplyMsg,
-        T3<UUID, Integer, AffinityTopologyVersion> contextId
+        T3<UUID, Integer, AffinityTopologyVersion> ctxId
     ) throws IgniteCheckedException {
         try {
             if (log.isDebugEnabled())
@@ -567,7 +594,7 @@ class GridDhtPartitionSupplier {
                 log.debug("Failed to send supply message (demander left): [" + supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]");
 
             synchronized (scMap) {
-                clearContext(scMap.remove(contextId), log);
+                clearContext(scMap.remove(ctxId), log);
             }
 
             return false;
@@ -588,21 +615,21 @@ class GridDhtPartitionSupplier {
     /**
      * Saves supply context with given parameters to {@code scMap}.
      *
-     * @param contextId Supply context id.
+     * @param ctxId Supply context id.
      * @param entryIt Entries rebalance iterator.
      * @param remainingParts Set of partitions that weren't sent yet.
      * @param rebalanceId Rebalance id.
      */
     private void saveSupplyContext(
-        T3<UUID, Integer, AffinityTopologyVersion> contextId,
+        T3<UUID, Integer, AffinityTopologyVersion> ctxId,
         IgniteRebalanceIterator entryIt,
         Set<Integer> remainingParts,
         long rebalanceId
     ) {
         synchronized (scMap) {
-            assert scMap.get(contextId) == null;
+            assert scMap.get(ctxId) == null;
 
-            scMap.put(contextId, new SupplyContext(entryIt, remainingParts, rebalanceId));
+            scMap.put(ctxId, new SupplyContext(entryIt, remainingParts, rebalanceId));
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index c940813..a34a098 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -313,6 +313,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     @GridToStringExclude
     private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
 
+    /** Set of nodes that cannot be used for wal rebalancing due to some reason. */
+    private Set<UUID> exclusionsFromHistoricalRebalance = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
+    /**
+     * Set of nodes that cannot be used for full rebalancing due missed partitions.
+     * Mapping pair of groupId and nodeId to set of partitions.
+     */
+    private Map<T2<Integer, UUID>, Set<Integer>> exclusionsFromFullRebalance = new ConcurrentHashMap<>();
+
     /** Reserved max available history for calculation of history supplier on coordinator. */
     private volatile Map<Integer /** Group. */, Map<Integer /** Partition */, Long /** Counter. */>> partHistReserved;
 
@@ -537,7 +546,68 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @return List of IDs of history supplier nodes or empty list if these doesn't exist.
      */
     @Nullable public List<UUID> partitionHistorySupplier(int grpId, int partId, long cntrSince) {
-        return partHistSuppliers.getSupplier(grpId, partId, cntrSince);
+        List<UUID> histSuppliers = partHistSuppliers.getSupplier(grpId, partId, cntrSince);
+
+        return histSuppliers.stream().filter((supplier) -> !exclusionsFromHistoricalRebalance.contains(supplier))
+            .collect(Collectors.toList());
+    }
+
+    /**
+     * Marks the given node as not applicable for historical rebalancing.
+     *
+     * @param nodeId Node id that should not be used for wal rebalancing (aka historical supplier).
+     */
+    public void markNodeAsInapplicableForHistoricalRebalance(UUID nodeId) {
+        exclusionsFromHistoricalRebalance.add(nodeId);
+    }
+
+    /**
+     * Marks the given node as not applicable for full rebalancing
+     * for the given group and partition.
+     *
+     * @param nodeId Node id that should not be used for full rebalancing.
+     * @param grpId Cache group id.
+     * @param p Partition id.
+     */
+    public void markNodeAsInapplicableForFullRebalance(UUID nodeId, int grpId, int p) {
+        Set<Integer> parts = exclusionsFromFullRebalance.computeIfAbsent(new T2<>(grpId, nodeId), t2 ->
+            Collections.newSetFromMap(new ConcurrentHashMap<>())
+        );
+
+        parts.add(p);
+    }
+
+    /**
+     * @return {@code true} if there are nodes which are inapplicable for historical rebalancing.
+     */
+    public boolean hasInapplicableNodesForHistoricalRebalance() {
+        return !exclusionsFromHistoricalRebalance.isEmpty();
+    }
+
+    /**
+     * @return {@code true} if there are nodes which are inapplicable for full rebalancing.
+     */
+    public boolean hasInapplicableNodesForFullRebalance() {
+        return !exclusionsFromFullRebalance.isEmpty();
+    }
+
+    /**
+     * @return {@code true} if there are nodes which are inapplicable for rebalancing.
+     */
+    public boolean hasInapplicableNodesForRebalance() {
+        return hasInapplicableNodesForHistoricalRebalance() || hasInapplicableNodesForFullRebalance();
+    }
+
+    /**
+     * @param nodeId Node id to check.
+     * @param grpId Cache group id.
+     * @param p Partition id.
+     * @return {@code true} if the node is applicable for full rebalancing.
+     */
+    public boolean isNodeApplicableForFullRebalance(UUID nodeId, int grpId, int p) {
+        return Optional.ofNullable(exclusionsFromFullRebalance.get(new T2<>(grpId, nodeId)))
+            .map(s -> !s.contains(p))
+            .orElse(true);
     }
 
     /**
@@ -2696,6 +2766,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
         newCrdFut = null;
         exchangeLocE = null;
         exchangeGlobalExceptions.clear();
+        exclusionsFromHistoricalRebalance.clear();
+        exclusionsFromFullRebalance.clear();
         if (finishState != null)
             finishState.cleanUp();
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 23ff455..bdaae9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION;
@@ -186,7 +187,9 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
         AffinityTopologyVersion topVer = top.readyTopologyVersion();
 
-        assert exchFut == null || exchFut.context().events().topologyVersion().equals(top.readyTopologyVersion()) :
+        assert exchFut == null ||
+            exchFut.context().events().topologyVersion().equals(top.readyTopologyVersion()) ||
+            exchFut.context().events().topologyVersion().equals(ctx.exchange().lastAffinityChangedTopologyVersion(top.readyTopologyVersion())) :
             "Topology version mismatch [exchId=" + exchId +
                 ", grp=" + grp.name() +
                 ", topVer=" + top.readyTopologyVersion() + ']';
@@ -275,7 +278,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                         addHistorical(p, part.initialUpdateCounter(), countersMap.updateCounter(p), partitions);
                 }
                 else {
-                    List<ClusterNode> picked = remoteOwners(p, topVer);
+                    int partId = p;
+                    List<ClusterNode> picked = remoteOwners(p, topVer, node -> {
+                        if (exchFut != null && !exchFut.isNodeApplicableForFullRebalance(node.id(), grp.groupId(), partId))
+                            return false;
+
+                        return true;
+                    });
 
                     if (!picked.isEmpty()) {
                         ClusterNode n = picked.get(p % picked.size());
@@ -319,12 +328,24 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
      * @return Nodes owning this partition.
      */
     private List<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
+        return remoteOwners(p, topVer, node -> true);
+    }
+
+    /**
+     * Returns remote owners (excluding local node) for specified partition {@code p}
+     * which is additionally filtered by the specified predicate.
+     *
+     * @param p Partition.
+     * @param topVer Topology version.
+     * @return Nodes owning this partition.
+     */
+    private List<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer, IgnitePredicate<ClusterNode> pred) {
         List<ClusterNode> owners = grp.topology().owners(p, topVer);
 
         List<ClusterNode> res = new ArrayList<>(owners.size());
 
         for (ClusterNode owner : owners) {
-            if (!owner.id().equals(ctx.localNodeId()))
+            if (!owner.id().equals(ctx.localNodeId()) && pred.apply(owner))
                 res.add(owner);
         }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteHistoricalIteratorException.java
similarity index 58%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteHistoricalIteratorException.java
index 7e473be..5b641b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteHistoricalIteratorException.java
@@ -13,37 +13,36 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
  */
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
-import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+import org.apache.ignite.IgniteException;
 
 /**
- *
+ * Thrown when {@link IgniteHistoricalIterator} cannot iterate over WAL for some reason.
  */
-public class RebalanceReassignExchangeTask implements CachePartitionExchangeWorkerTask {
+public class IgniteHistoricalIteratorException extends IgniteException {
     /** */
-    private final GridDhtPartitionExchangeId exchId;
+    private static final long serialVersionUID = 0L;
 
     /**
-     * @param exchId Exchange ID.
+     * Creates a new exception with the specified cause.
+     *
+     * @param cause Cause.
      */
-    public RebalanceReassignExchangeTask(GridDhtPartitionExchangeId exchId) {
-        assert exchId != null;
-
-        this.exchId = exchId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean skipForExchangeMerge() {
-        return true;
+    public IgniteHistoricalIteratorException(Throwable cause) {
+        super(cause);
     }
 
     /**
-     * @return Exchange ID.
+     * Creates a new exception with the specified message and cause.
+     *
+     * @param msg Detail message.
+     * @param cause Cause.
      */
-    public GridDhtPartitionExchangeId exchangeId() {
-        return exchId;
+    public IgniteHistoricalIteratorException(String msg, Throwable cause) {
+        super(msg, cause);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
index 7e473be..5cffcb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java
@@ -26,13 +26,19 @@ public class RebalanceReassignExchangeTask implements CachePartitionExchangeWork
     /** */
     private final GridDhtPartitionExchangeId exchId;
 
+    /** */
+    private final GridDhtPartitionsExchangeFuture exchFut;
+
     /**
      * @param exchId Exchange ID.
+     * @param exchFut Exchange future.
      */
-    public RebalanceReassignExchangeTask(GridDhtPartitionExchangeId exchId) {
+    public RebalanceReassignExchangeTask(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut) {
         assert exchId != null;
+        assert exchFut != null;
 
         this.exchId = exchId;
+        this.exchFut = exchFut;
     }
 
     /** {@inheritDoc} */
@@ -46,4 +52,11 @@ public class RebalanceReassignExchangeTask implements CachePartitionExchangeWork
     public GridDhtPartitionExchangeId exchangeId() {
         return exchId;
     }
+
+    /**
+     * @return Exchange future.
+     */
+    public GridDhtPartitionsExchangeFuture future() {
+        return exchFut;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 4f325a2..aa0520c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
@@ -105,6 +106,7 @@ import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.lang.IgnitePredicateX;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -1002,7 +1004,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
     /** {@inheritDoc} */
     @Override @Nullable protected IgniteHistoricalIterator historicalIterator(
-        CachePartitionPartialCountersMap partCntrs, Set<Integer> missing) throws IgniteCheckedException {
+        CachePartitionPartialCountersMap partCntrs,
+        Set<Integer> missing
+    ) throws IgniteCheckedException {
         if (partCntrs == null || partCntrs.isEmpty())
             return null;
 
@@ -1022,14 +1026,22 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
         FileWALPointer minPtr = (FileWALPointer)database.checkpointHistory().searchEarliestWalPointer(grp.groupId(), partsCounters);
 
-        WALIterator it = grp.shared().wal().replay(minPtr);
+        try {
+            WALIterator it = grp.shared().wal().replay(minPtr);
+
+            WALHistoricalIterator histIt = new WALHistoricalIterator(log, grp, partCntrs, it);
 
-        WALHistoricalIterator iterator = new WALHistoricalIterator(log, grp, partCntrs, it);
+            // Add historical partitions which are unabled to reserve to missing set.
+            missing.addAll(histIt.missingParts);
 
-        // Add historical partitions which are unabled to reserve to missing set.
-        missing.addAll(iterator.missingParts);
+            return histIt;
+        }
+        catch (Exception ex) {
+            if (!X.hasCause(ex, IgniteHistoricalIteratorException.class))
+                throw new IgniteHistoricalIteratorException(ex);
 
-        return iterator;
+            throw ex;
+        }
     }
 
     /** {@inheritDoc} */
@@ -1349,92 +1361,98 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
          *
          */
         private void advance() {
-            next = null;
+            try {
+                next = null;
 
-            outer: while (doneParts.size() != partMap.size()) {
-                if (entryIt != null) {
-                    while (entryIt.hasNext()) {
-                        DataEntry entry = entryIt.next();
+                outer:
+                while (doneParts.size() != partMap.size()) {
+                    if (entryIt != null) {
+                        while (entryIt.hasNext()) {
+                            DataEntry entry = entryIt.next();
 
-                        if (cacheIds.contains(entry.cacheId())) {
-                            int idx = partMap.partitionIndex(entry.partitionId());
+                            if (cacheIds.contains(entry.cacheId())) {
+                                int idx = partMap.partitionIndex(entry.partitionId());
 
-                            if (idx < 0 || missingParts.contains(idx))
-                                continue;
+                                if (idx < 0 || missingParts.contains(idx))
+                                    continue;
 
-                            long from = partMap.initialUpdateCounterAt(idx);
-                            long to = partMap.updateCounterAt(idx);
+                                long from = partMap.initialUpdateCounterAt(idx);
+                                long to = partMap.updateCounterAt(idx);
 
-                            if (entry.partitionCounter() > from && entry.partitionCounter() <= to) {
-                                // Partition will be marked as done for current entry on next iteration.
-                                if (++rebalancedCntrs[idx] == to)
-                                    donePart = entry.partitionId();
+                                if (entry.partitionCounter() > from && entry.partitionCounter() <= to) {
+                                    // Partition will be marked as done for current entry on next iteration.
+                                    if (++rebalancedCntrs[idx] == to)
+                                        donePart = entry.partitionId();
 
-                                next = entry;
+                                    next = entry;
 
-                                return;
+                                    return;
+                                }
                             }
                         }
                     }
-                }
 
-                entryIt = null;
+                    entryIt = null;
 
-                // Search for next DataEntry while applying rollback counters.
-                while (walIt.hasNext()) {
-                    IgniteBiTuple<WALPointer, WALRecord> rec = walIt.next();
+                    // Search for next DataEntry while applying rollback counters.
+                    while (walIt.hasNext()) {
+                        IgniteBiTuple<WALPointer, WALRecord> rec = walIt.next();
 
-                    if (rec.get2() instanceof DataRecord) {
-                        DataRecord data = (DataRecord)rec.get2();
+                        if (rec.get2() instanceof DataRecord) {
+                            DataRecord data = (DataRecord)rec.get2();
 
-                        entryIt = data.writeEntries().iterator();
+                            entryIt = data.writeEntries().iterator();
 
-                        // Move on to the next valid data entry.
-                        continue outer;
-                    }
-                    else if (rec.get2() instanceof RollbackRecord) {
-                        RollbackRecord rbRec = (RollbackRecord)rec.get2();
+                            // Move on to the next valid data entry.
+                            continue outer;
+                        }
+                        else if (rec.get2() instanceof RollbackRecord) {
+                            RollbackRecord rbRec = (RollbackRecord)rec.get2();
 
-                        if (grp.groupId() == rbRec.groupId()) {
-                            int idx = partMap.partitionIndex(rbRec.partitionId());
+                            if (grp.groupId() == rbRec.groupId()) {
+                                int idx = partMap.partitionIndex(rbRec.partitionId());
 
-                            if (idx < 0 || missingParts.contains(idx))
-                                continue;
+                                if (idx < 0 || missingParts.contains(idx))
+                                    continue;
 
-                            long from = partMap.initialUpdateCounterAt(idx);
-                            long to = partMap.updateCounterAt(idx);
+                                long from = partMap.initialUpdateCounterAt(idx);
+                                long to = partMap.updateCounterAt(idx);
 
-                            rebalancedCntrs[idx] += rbRec.overlap(from, to);
+                                rebalancedCntrs[idx] += rbRec.overlap(from, to);
 
-                            if (rebalancedCntrs[idx] == partMap.updateCounterAt(idx)) {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("Partition done [grpId=" + grp.groupId() +
-                                        ", partId=" + donePart +
-                                        ", from=" + from +
-                                        ", to=" + to + ']');
-                                }
+                                if (rebalancedCntrs[idx] == partMap.updateCounterAt(idx)) {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug("Partition done [grpId=" + grp.groupId() +
+                                            ", partId=" + donePart +
+                                            ", from=" + from +
+                                            ", to=" + to + ']');
+                                    }
 
-                                doneParts.add(rbRec.partitionId()); // Add to done set immediately.
+                                    doneParts.add(rbRec.partitionId()); // Add to done set immediately.
+                                }
                             }
                         }
                     }
-                }
 
-                if (entryIt == null && doneParts.size() != partMap.size()) {
-                    for (int i = 0; i < partMap.size(); i++) {
-                        int p = partMap.partitionAt(i);
+                    if (entryIt == null && doneParts.size() != partMap.size()) {
+                        for (int i = 0; i < partMap.size(); i++) {
+                            int p = partMap.partitionAt(i);
 
-                        if (!doneParts.contains(p)) {
-                            log.warning("Some partition entries were missed during historical rebalance [grp=" + grp + ", part=" + p + ", missed=" +
-                                    (partMap.updateCounterAt(i) - rebalancedCntrs[i]) + ']');
+                            if (!doneParts.contains(p)) {
+                                log.warning("Some partition entries were missed during historical rebalance [grp=" + grp + ", part=" + p + ", missed=" +
+                                        (partMap.updateCounterAt(i) - rebalancedCntrs[i]) + ']');
 
-                            doneParts.add(p);
-                        }
-                    }
+                                    doneParts.add(p);
+                                }
+                            }
 
-                    return;
+                        return;
+                    }
                 }
             }
+            catch (Exception ex) {
+                throw new IgniteHistoricalIteratorException(ex);
+            }
         }
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java
index 2bc7fc7..2452662 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgniteShutdownOnSupplyMessageFailureTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
@@ -44,10 +45,13 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecora
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED;
+
 /** */
 public class IgniteShutdownOnSupplyMessageFailureTest extends GridCommonAbstractTest {
     /** Rebalance cache name. */
@@ -84,6 +88,8 @@ public class IgniteShutdownOnSupplyMessageFailureTest extends GridCommonAbstract
         if (name.equals(getTestIgniteInstanceName(NODE_NAME_WITH_TEST_FILE_FACTORY))) {
             conf.setFileIOFactory(new FailingFileIOFactory(canFailFirstNode));
 
+            cfg.setIncludeEventTypes(EVT_CACHE_REBALANCE_PART_SUPPLIED);
+
             cfg.setFailureHandler(new TestFailureHandler());
         }
         else
@@ -132,8 +138,19 @@ public class IgniteShutdownOnSupplyMessageFailureTest extends GridCommonAbstract
 
         populateCache(ig, TEST_REBALANCE_CACHE, 3_000, 6_000);
 
+        // Breaks historical rebalance. The second node will try to switch to full rebalance.
         canFailFirstNode.set(true);
 
+        // Break full rebalance.
+        IgnitePredicate<CacheRebalancingEvent> locLsnr = evt -> {
+            if (TEST_REBALANCE_CACHE.equals(evt.cacheName()))
+                throw new AssertionError(new IOException("Test crash"));
+
+            return true;
+        };
+
+        ig.events().localListen(locLsnr, EVT_CACHE_REBALANCE_PART_SUPPLIED);
+
         startGrid(1);
 
         WAIT_ON_SUPPLY_MESSAGE_FAILURE.await();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 02bbf6d..191be72 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -19,20 +19,28 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.file.OpenOption;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.ClusterNodeAttributeAffinityBackupFilter;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cluster.ClusterNode;
@@ -41,31 +49,48 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCachePreloader;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.WalTestUtils;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
 
 /**
@@ -79,11 +104,17 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
     private static final int PARTS_CNT = 32;
 
     /** Block message predicate to set to Communication SPI in node configuration. */
-    private IgniteBiPredicate<ClusterNode, Message> blockMessagePredicate;
+    private IgniteBiPredicate<ClusterNode, Message> blockMsgPred;
+
+    /** Record message predicate to set to Communication SPI in node configuration. */
+    private IgniteBiPredicate<ClusterNode, Message> recordMsgPred;
 
     /** */
     private int backups;
 
+    /** User attributes. */
+    private Map<String, Serializable> userAttrs = new HashMap<>();
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         System.setProperty(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0"); //to make all rebalance wal-based
@@ -116,11 +147,15 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
 
         cfg.setCommunicationSpi(new WalRebalanceCheckingCommunicationSpi());
 
-        if (blockMessagePredicate != null) {
-            TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi) cfg.getCommunicationSpi();
+        if (blockMsgPred != null)
+            ((TestRecordingCommunicationSpi) cfg.getCommunicationSpi()).blockMessages(blockMsgPred);
 
-            spi.blockMessages(blockMessagePredicate);
-        }
+        if (recordMsgPred != null)
+            ((TestRecordingCommunicationSpi) cfg.getCommunicationSpi()).record(recordMsgPred);
+
+        cfg.setFailureHandler(new StopNodeFailureHandler());
+        cfg.setConsistentId(gridName);
+        cfg.setUserAttributes(userAttrs);
 
         return cfg;
     }
@@ -344,7 +379,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         backups = 4;
 
         // Prepare some data.
-        IgniteEx crd = (IgniteEx) startGrids(3);
+        IgniteEx crd = startGrids(3);
 
         crd.cluster().active(true);
 
@@ -362,7 +397,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         stopAllGrids();
 
         // Rewrite data with globally disabled WAL.
-        crd = (IgniteEx) startGrids(2);
+        crd = startGrids(2);
 
         crd.cluster().active(true);
 
@@ -426,7 +461,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         backups = 4;
 
         // Prepare some data.
-        IgniteEx crd = (IgniteEx) startGrids(3);
+        IgniteEx crd = startGrids(3);
 
         crd.cluster().active(true);
 
@@ -444,7 +479,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         stopAllGrids();
 
         // Rewrite data to trigger further rebalance.
-        IgniteEx supplierNode = (IgniteEx) startGrid(0);
+        IgniteEx supplierNode = startGrid(0);
 
         supplierNode.cluster().active(true);
 
@@ -455,12 +490,12 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
 
         forceCheckpoint();
 
-        final int groupId = supplierNode.cachex(CACHE_NAME).context().groupId();
+        final int grpId = supplierNode.cachex(CACHE_NAME).context().groupId();
 
         // Delay rebalance process for specified group.
-        blockMessagePredicate = (node, msg) -> {
+        blockMsgPred = (node, msg) -> {
             if (msg instanceof GridDhtPartitionDemandMessage)
-                return ((GridDhtPartitionDemandMessage) msg).groupId() == groupId;
+                return ((GridDhtPartitionDemandMessage) msg).groupId() == grpId;
 
             return false;
         };
@@ -478,11 +513,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         );
 
         // Inject I/O factory which can throw exception during WAL read on supplier node.
-        FailingIOFactory ioFactory = new FailingIOFactory(new RandomAccessFileIOFactory());
-
-        ((FileWriteAheadLogManager) supplierNode.cachex(CACHE_NAME).context().shared().wal()).setFileIOFactory(ioFactory);
-
-        ioFactory.throwExceptionOnWalRead();
+        FailingIOFactory ioFactory = injectFailingIOFactory(supplierNode);
 
         // Resume rebalance process.
         TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi) demanderNode.configuration().getCommunicationSpi();
@@ -490,12 +521,12 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         spi.stopBlock();
 
         // Wait till rebalance will be failed and cancelled.
-        Boolean result = preloader.rebalanceFuture().get();
+        Boolean res = preloader.rebalanceFuture().get();
 
-        Assert.assertEquals("Rebalance should be cancelled on demander node: " + preloader.rebalanceFuture(), false, result);
+        Assert.assertEquals("Rebalance should be cancelled on demander node: " + preloader.rebalanceFuture(), false, res);
 
         // Stop blocking messages and fail WAL during read.
-        blockMessagePredicate = null;
+        blockMsgPred = null;
 
         ioFactory.reset();
 
@@ -514,6 +545,445 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Tests that demander switches to full rebalance if the previously chosen two of three of suppliers
+     * for a group have failed to perform historical rebalance due to an unexpected error.
+     *
+     * @throws Exception If failed
+     */
+    @Test
+    @WithSystemProperty(key = "IGNITE_DISABLE_WAL_DURING_REBALANCING", value = "true")
+    public void testMultipleNodesFailHistoricalRebalance() throws Exception {
+        backups = 1;
+        int node_cnt = 4;
+        int demanderId = node_cnt - 1;
+
+        // Start a new cluster with 3 suppliers.
+        startGrids(node_cnt - 1);
+
+        // Start demander node.
+        userAttrs.put("TEST_ATTR", "TEST_ATTR");
+        startGrid(node_cnt - 1);
+
+        grid(0).cluster().active(true);
+
+        // Create a new cache that places a full set of partitions on demander node.
+        RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, PARTS_CNT);
+        aff.setAffinityBackupFilter(new ClusterNodeAttributeAffinityBackupFilter("TEST_ATTR"));
+
+        String cacheName = "test-cache-1";
+        IgniteCache<Integer, IndexedObject> cache0 = grid(0).getOrCreateCache(
+            new CacheConfiguration<Integer, IndexedObject>(cacheName)
+                .setBackups(backups)
+                .setAffinity(aff)
+                .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC));
+
+        // Fill initial data and force checkpoint.
+        final int entryCnt = PARTS_CNT * 200;
+        for (int k = 0; k < entryCnt; k++)
+            cache0.put(k, new IndexedObject(k));
+
+        forceCheckpoint();
+
+        // Stop demander node.
+        stopGrid(demanderId);
+
+        // Rewrite data to trigger further rebalance.
+        for (int k = 0; k < entryCnt; k++) {
+            // Should skip one random partition to be sure that after restarting demander node,
+            // it will have at least one partition in OWNING state, and so WAL will not be disabled while rebalancing.
+            // This fact allows moving partitions to OWNING state during rebalancing
+            // even though the corresponding RebalanceFuture will be cancelled.
+            if (grid(0).affinity(cacheName).partition(k) != 12)
+                cache0.put(k, new IndexedObject(k));
+        }
+
+        forceCheckpoint();
+
+        // Delay rebalance process for specified group.
+        blockMsgPred = (node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+                return msg0.groupId() == CU.cacheId(cacheName);
+            }
+
+            return false;
+        };
+
+        Queue<RecordedDemandMessage> recorderedMsgs = new ConcurrentLinkedQueue<>();
+
+        // Record demand messages for specified group.
+        recordMsgPred = (node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+                if (msg0.groupId() == CU.cacheId(cacheName)) {
+                    recorderedMsgs.add(new RecordedDemandMessage(
+                        node.id(),
+                        msg0.groupId(),
+                        msg0.partitions().hasFull(),
+                        msg0.partitions().hasHistorical()));
+                }
+            }
+
+            return false;
+        };
+
+        // Corrupt WAL on suppliers, except the one.
+        injectFailingIOFactory(grid(0));
+        injectFailingIOFactory(grid(1));
+
+        // Trigger rebalance process from suppliers.
+        IgniteEx restartedDemander = startGrid(node_cnt - 1);
+
+        TestRecordingCommunicationSpi demanderSpi = TestRecordingCommunicationSpi.spi(restartedDemander);
+
+        // Wait until demander starts historical rebalancning.
+        demanderSpi.waitForBlocked();
+
+        final IgniteInternalFuture<Boolean> preloadFut = restartedDemander.cachex(cacheName).context().group()
+            .preloader().rebalanceFuture();
+
+        // Unblock messages and start tracking demand and supply messages.
+        demanderSpi.stopBlock();
+
+        // Wait until rebalancing will be cancelled for both suppliers.
+        assertTrue(
+            "Rebalance future was not cancelled [fut=" + preloadFut + ']',
+            GridTestUtils.waitForCondition(preloadFut::isDone, getTestTimeout()));
+
+        Assert.assertEquals(
+            "Rebalance should be cancelled on demander node: " + preloadFut,
+            false,
+            preloadFut.get());
+
+        awaitPartitionMapExchange(true, true, null);
+
+        // Check data consistency.
+        assertPartitionsSame(idleVerify(restartedDemander, cacheName));
+
+        // Check that historical rebalance switched to full for supplier 1 & 2 and it was historical for supplier3.
+        IgnitePredicate<RecordedDemandMessage> histPred = msg ->
+            msg.hasHistorical() && !msg.hasFull();
+
+        IgnitePredicate<RecordedDemandMessage> fullPred = msg ->
+            !msg.hasHistorical() && msg.hasFull();
+
+        IgniteInClosure<UUID> supplierChecker = supplierId -> {
+            List<RecordedDemandMessage> demandMsgsForSupplier = recorderedMsgs.stream()
+                // Filter messages correspond to the supplierId
+                .filter(msg -> msg.supplierId().equals(supplierId))
+                .filter(msg -> msg.groupId() == CU.cacheId(cacheName))
+                // Filter out intermediate messages
+                .filter(msg -> msg.hasFull() || msg.hasHistorical())
+                .collect(toList());
+
+            assertEquals("There should only two demand messages [supplierId=" + supplierId + ']',
+                2,
+                demandMsgsForSupplier.size());
+            assertTrue(
+                "The first message should require historical rebalance [msg=" + demandMsgsForSupplier.get(0) + ']',
+                histPred.apply(demandMsgsForSupplier.get(0)));
+            assertTrue(
+                "The second message should require full rebalance [msg=" + demandMsgsForSupplier.get(0) + ']',
+                fullPred.apply(demandMsgsForSupplier.get(1)));
+        };
+
+        supplierChecker.apply(grid(0).cluster().localNode().id());
+        supplierChecker.apply(grid(1).cluster().localNode().id());
+
+        // Check supplier3
+        List<RecordedDemandMessage> demandMsgsForSupplier = recorderedMsgs.stream()
+            // Filter messages correspond to the supplier3
+            .filter(msg -> msg.supplierId().equals(grid(2).cluster().localNode().id()))
+            .filter(msg -> msg.groupId() == CU.cacheId(cacheName))
+            // Filter out intermediate messages
+            .filter(msg -> msg.hasFull() || msg.hasHistorical())
+            .collect(toList());
+
+        assertEquals("There should only one demand message.", 1, demandMsgsForSupplier.size());
+        assertTrue(
+            "The first message should require historical rebalance [msg=" + demandMsgsForSupplier.get(0) + ']',
+            histPred.apply(demandMsgsForSupplier.get(0)));
+    }
+
+
+    /**
+     * Tests that demander switches to full rebalance if the previously chosen supplier for a group has failed
+     * to perform historical rebalance due to an unexpected error while historical iterator (wal iterator) is created.
+     * Additionally, the client node joins the cluster between the demand message sent, and the supply message received.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSwitchHistoricalRebalanceToFullAndClientJoin() throws Exception {
+        testSwitchHistoricalRebalanceToFull(IgniteWalRebalanceTest::injectFailingIOFactory, true);
+    }
+
+    /**
+     * Tests that demander switches to full rebalance if the previously chosen supplier for a group has failed
+     * to perform historical rebalance due to an unexpected error while historical iterator (wal iterator) is created.
+     *
+     * @throws Exception If failed
+     */
+    @Test
+    public void testSwitchHistoricalRebalanceToFullDueToFailOnCreatingWalIterator() throws Exception {
+        testSwitchHistoricalRebalanceToFull(IgniteWalRebalanceTest::injectFailingIOFactory, false);
+    }
+
+    /**
+     * Tests that demander switches to full rebalance if the previously chosen supplier for a group has failed
+     * to perform historical rebalance due to an unexpected error while iterating over reserved wal.
+     *
+     * @throws Exception If failed
+     */
+    @Test
+    public void testSwitchHistoricalRebalanceToFullWhileIteratingOverWAL() throws Exception {
+        testSwitchHistoricalRebalanceToFull(supplier1 -> {
+            try {
+                // Corrupt wal record in order to fail historical rebalance from supplier1 node.
+                IgniteWriteAheadLogManager walMgr = supplier1.context().cache().context().wal();
+
+                FileWALPointer ptr = (FileWALPointer)walMgr.log(new DataRecord(new DataEntry(
+                    CU.cacheId("test-cache-1"),
+                    new KeyCacheObjectImpl(0, null, 0),
+                    null,
+                    GridCacheOperation.DELETE,
+                    new GridCacheVersion(0, 1, 1, 0),
+                    new GridCacheVersion(0, 1, 1, 0),
+                    0,
+                    0,
+                    0
+                )));
+
+                File walDir = U.field(walMgr, "walWorkDir");
+
+                List<FileDescriptor> walFiles = new IgniteWalIteratorFactory().resolveWalFiles(
+                    new IgniteWalIteratorFactory.IteratorParametersBuilder().filesOrDirs(walDir));
+
+                FileDescriptor lastWalFile = walFiles.get(walFiles.size() - 1);
+
+                WalTestUtils.corruptWalSegmentFile(lastWalFile, ptr);
+
+                IgniteCache<Integer, IndexedObject> c1 = supplier1.cache("test-cache-1");
+                for (int i = 0; i < PARTS_CNT * 100; i++)
+                    c1.put(i, new IndexedObject(i));
+            }
+            catch (IgniteCheckedException | IOException e) {
+                throw new RuntimeException(e);
+            }
+        }, false);
+    }
+
+    /**
+     * Tests that demander switches to full rebalance if the previously chosen supplier for a group has failed
+     * to perform historical rebalance due to an unexpected error.
+     *
+     * @param corruptWalClo Closure that corrupts wal iterating on supplier node.
+     * @param needClientStart {@code true} if client node should join the cluster between
+     *                                    the demand message sent and the supply message received.
+     * @throws Exception If failed
+     */
+    public void testSwitchHistoricalRebalanceToFull(
+        IgniteInClosure<IgniteEx> corruptWalClo,
+        boolean needClientStart
+    ) throws Exception {
+        backups = 3;
+
+        IgniteEx supplier1 = startGrid(0);
+        IgniteEx supplier2 = startGrid(1);
+        IgniteEx demander = startGrid(2);
+
+        supplier1.cluster().active(true);
+
+        String supplier1Name = supplier1.localNode().consistentId().toString();
+        String supplier2Name = supplier2.localNode().consistentId().toString();
+        String demanderName = demander.localNode().consistentId().toString();
+
+        String cacheName1 = "test-cache-1";
+        String cacheName2 = "test-cache-2";
+
+        // Cache resides on supplier1 and demander nodes.
+        IgniteCache<Integer, IndexedObject> c1 = supplier1.getOrCreateCache(
+            new CacheConfiguration<Integer, IndexedObject>(cacheName1)
+                .setBackups(backups)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT))
+                .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setRebalanceOrder(10)
+                .setNodeFilter(n -> n.consistentId().equals(supplier1Name) || n.consistentId().equals(demanderName)));
+
+        // Cache resides on supplier2 and demander nodes.
+        IgniteCache<Integer, IndexedObject> c2 = supplier1.getOrCreateCache(
+            new CacheConfiguration<Integer, IndexedObject>("test-cache-2")
+                .setBackups(backups)
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT))
+                .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+                .setRebalanceOrder(20)
+                .setNodeFilter(n -> n.consistentId().equals(supplier2Name) || n.consistentId().equals(demanderName)));
+
+        // Fill initial data.
+        final int entryCnt = PARTS_CNT * 200;
+        for (int k = 0; k < entryCnt; k++) {
+            c1.put(k, new IndexedObject(k));
+
+            c2.put(k, new IndexedObject(k));
+        }
+
+        forceCheckpoint();
+
+        stopGrid(2);
+
+        // Rewrite data to trigger further rebalance.
+        for (int i = 0; i < entryCnt; i++) {
+            c1.put(i, new IndexedObject(i));
+
+            c2.put(i, new IndexedObject(i));
+        }
+
+        // Delay rebalance process for specified groups.
+        blockMsgPred = (node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+                return msg0.groupId() == CU.cacheId(cacheName1) || msg0.groupId() == CU.cacheId(cacheName2);
+            }
+
+            return false;
+        };
+
+        Queue<RecordedDemandMessage> recorderedMsgs = new ConcurrentLinkedQueue<>();
+
+        // Record demand messages for specified groups.
+        recordMsgPred = (node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
+
+                if (msg0.groupId() == CU.cacheId(cacheName1) || msg0.groupId() == CU.cacheId(cacheName2)) {
+                    recorderedMsgs.add(new RecordedDemandMessage(
+                        node.id(),
+                        msg0.groupId(),
+                        msg0.partitions().hasFull(),
+                        msg0.partitions().hasHistorical()));
+                }
+            }
+
+            return false;
+        };
+
+        // Delay rebalance process for specified group from supplier2.
+        TestRecordingCommunicationSpi supplierSpi2 = TestRecordingCommunicationSpi.spi(supplier2);
+        supplierSpi2.blockMessages((node, msg) -> {
+            if (msg instanceof GridDhtPartitionSupplyMessage) {
+                GridDhtPartitionSupplyMessage msg0 = (GridDhtPartitionSupplyMessage)msg;
+
+                return node.consistentId().equals(demanderName) && msg0.groupId() == CU.cacheId(cacheName2);
+            }
+
+            return false;
+        });
+
+        // Corrupt WAL on supplier1
+        corruptWalClo.apply(supplier1);
+
+        // Trigger rebalance process from suppliers.
+        IgniteEx restartedDemander = startGrid(2);
+
+        recordMsgPred = null;
+        blockMsgPred = null;
+
+        TestRecordingCommunicationSpi demanderSpi = TestRecordingCommunicationSpi.spi(grid(2));
+
+        // Wait until demander starts historical rebalancning.
+        demanderSpi.waitForBlocked();
+
+        final IgniteInternalFuture<Boolean> preloadFut1 = restartedDemander.cachex(cacheName1).context().group()
+            .preloader().rebalanceFuture();
+        final IgniteInternalFuture<Boolean> preloadFut2 = restartedDemander.cachex(cacheName2).context().group()
+            .preloader().rebalanceFuture();
+
+        if (needClientStart)
+            startClientGrid(3);
+
+        // Unblock messages and start tracking demand and supply messages.
+        demanderSpi.stopBlock();
+
+        // Wait until rebalancing will be cancelled for both suppliers.
+        GridTestUtils.waitForCondition(() -> preloadFut1.isDone() && preloadFut2.isDone(), getTestTimeout());
+
+        Assert.assertEquals(
+            "Rebalance should be cancelled on demander node: " + preloadFut1,
+            false,
+            preloadFut1.get());
+        Assert.assertEquals(
+            "Rebalance should be cancelled on demander node: " + preloadFut2,
+            false,
+            preloadFut2.get());
+
+        // Unblock supply messages from supplier2
+        supplierSpi2.stopBlock();
+
+        awaitPartitionMapExchange(true, true, null);
+
+        // Check data consistency.
+        assertPartitionsSame(idleVerify(restartedDemander, cacheName2, cacheName1));
+
+        // Check that historical rebalance switched to full for supplier1 and it is still historical for supplier2.
+        IgnitePredicate<RecordedDemandMessage> histPred = (msg) ->
+            msg.hasHistorical() && !msg.hasFull();
+
+        IgnitePredicate<RecordedDemandMessage> fullPred = (msg) ->
+            !msg.hasHistorical() && msg.hasFull();
+
+        // Supplier1
+        List<RecordedDemandMessage> demandMsgsForSupplier1 = recorderedMsgs.stream()
+            // Filter messages correspond to the supplier1
+            .filter(msg -> msg.groupId() == CU.cacheId(cacheName1))
+            // Filter out intermediate messages
+            .filter(msg -> msg.hasFull() || msg.hasHistorical())
+            .collect(toList());
+
+        assertEquals("There should only two demand messages.", 2, demandMsgsForSupplier1.size());
+        assertTrue(
+            "The first message should require historical rebalance [msg=" + demandMsgsForSupplier1.get(0) + ']',
+            histPred.apply(demandMsgsForSupplier1.get(0)));
+        assertTrue(
+            "The second message should require full rebalance [msg=" + demandMsgsForSupplier1.get(0) + ']',
+            fullPred.apply(demandMsgsForSupplier1.get(1)));
+
+        // Supplier2
+        List<RecordedDemandMessage> demandMsgsForSupplier2 = recorderedMsgs.stream()
+            // Filter messages correspond to the supplier2
+            .filter(msg -> msg.groupId() == CU.cacheId(cacheName2))
+            // Filter out intermediate messages
+            .filter(msg -> msg.hasFull() || msg.hasHistorical())
+            .collect(toList());
+
+        assertEquals("There should only two demand messages.", 2, demandMsgsForSupplier2.size());
+        assertTrue(
+            "Both messages should require historical rebalance [" +
+                "msg=" + demandMsgsForSupplier2.get(0) + ", msg=" + demandMsgsForSupplier2.get(1) + ']',
+                histPred.apply(demandMsgsForSupplier2.get(0)) && histPred.apply(demandMsgsForSupplier2.get(1)));
+    }
+
+    /**
+     * Injects a new instance of FailingIOFactory into wal manager for the given supplier node.
+     * This allows to break historical rebalance fo=rom the supplier.
+     *
+     * @param supplier Supplier node to be modified.
+     * @return Instance of FailingIOFactory that was injected.
+     */
+    private static FailingIOFactory injectFailingIOFactory(IgniteEx supplier) {
+        // Inject I/O factory which can throw exception during WAL read on supplier1 node.
+        FailingIOFactory ioFactory = new FailingIOFactory(new RandomAccessFileIOFactory());
+
+        ((FileWriteAheadLogManager)supplier.context().cache().context().wal()).setFileIOFactory(ioFactory);
+
+        ioFactory.throwExceptionOnWalRead();
+
+        return ioFactory;
+    }
+
+    /**
      *
      */
     private static class IndexedObject {
@@ -641,7 +1111,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         for (int i = entryCnt / 2; i < entryCnt; i++)
             cache0.put(i, String.valueOf(i));
 
-        blockMessagePredicate = (node, msg) -> {
+        blockMsgPred = (node, msg) -> {
             if (msg instanceof GridDhtPartitionDemandMessage) {
                 GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
 
@@ -664,7 +1134,7 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         // Wait until the full rebalance begins with g1 as a supplier.
         spi2.waitForBlocked(2);
 
-        blockMessagePredicate = null;
+        blockMsgPred = null;
 
         startGrid(0); // Should not force rebalancing remap.
 
@@ -725,4 +1195,71 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
             failRead = false;
         }
     }
+
+    /** */
+    static class RecordedDemandMessage {
+        /** Full rebalance. */
+        private final boolean full;
+
+        /** Historical rebalance. */
+        private final boolean historical;
+
+        /** Supplier node id. */
+        private final UUID supplierId;
+
+        /** Group id. */
+        private final int grpId;
+
+        /**
+         * Creates a new instance.
+         * @param supplierId Supplier node id.
+         * @param grpId Cache group id.
+         * @param full {@code true} if demand message has partitions that should be fully rebalanced.
+         * @param historical {@code true} if demand message has partitions that should be wal rebalanced.
+         */
+        RecordedDemandMessage(UUID supplierId, int grpId, boolean full, boolean historical) {
+            this.supplierId = supplierId;
+            this.grpId = grpId;
+            this.full = full;
+            this.historical = historical;
+        }
+
+        /**
+         * @return Supplier node id.
+         */
+        UUID supplierId() {
+            return supplierId;
+        }
+
+        /**
+         * @return cache group id.
+         */
+        int groupId() {
+            return grpId;
+        }
+
+        /**
+         * @return {@code true} if demand message has partitions that should be fully rebalanced.
+         */
+        boolean hasFull() {
+            return full;
+        }
+
+        /**
+         * @return {@code true} if demand message has partitions that should be wal rebalanced.
+         */
+        boolean hasHistorical() {
+            return historical;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "RecordedDemandMessage{" +
+                "supplierId=" + supplierId +
+                ", groupId=" + grpId +
+                ", full=" + full +
+                ", historical=" + historical +
+                '}';
+        }
+    }
 }