You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2020/06/23 12:35:53 UTC

[ignite] branch master updated: IGNITE-13168 Retrigger historical rebalance if it was cancelled in case WAL history is still available - Fixes #7948.

This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new c915cea  IGNITE-13168 Retrigger historical rebalance if it was cancelled in case WAL history is still available - Fixes #7948.
c915cea is described below

commit c915ceaa2d12f73ae653f894d7159176aed7f9a3
Author: vd_pyatkov <vl...@gmail.com>
AuthorDate: Tue Jun 23 15:35:09 2020 +0300

    IGNITE-13168 Retrigger historical rebalance if it was cancelled in case WAL history is still available - Fixes #7948.
    
    Signed-off-by: Ivan Rakov <iv...@gmail.com>
---
 .../preloader/GridDhtPartitionsExchangeFuture.java | 167 +++++++----
 .../dht/preloader/GridDhtPreloader.java            |   9 +-
 .../IgniteDhtPartitionHistorySuppliersMap.java     |  15 +-
 .../db/wal/WalRebalanceRestartTest.java            | 321 +++++++++++++++++++++
 .../TxPartitionCounterStateConsistencyTest.java    | 121 ++++----
 .../ignite/testsuites/IgnitePdsTestSuite4.java     |   2 +
 6 files changed, 513 insertions(+), 122 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4f9e1dd..c940813 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -27,8 +27,10 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -532,9 +534,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * @param grpId Cache group ID.
      * @param partId Partition ID.
      * @param cntrSince Partition update counter since history supplying is requested.
-     * @return ID of history supplier node or null if it doesn't exist.
+     * @return List of IDs of history supplier nodes or empty list if these doesn't exist.
      */
-    @Nullable public UUID partitionHistorySupplier(int grpId, int partId, long cntrSince) {
+    @Nullable public List<UUID> partitionHistorySupplier(int grpId, int partId, long cntrSince) {
         return partHistSuppliers.getSupplier(grpId, partId, cntrSince);
     }
 
@@ -3274,10 +3276,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * Collects and determines new owners of partitions for all nodes for given {@code top}.
      *
      * @param top Topology to assign.
+     * @param resetOwners True if need to reset partition state considering of counter, false otherwise.
      */
-    private void assignPartitionStates(GridDhtPartitionTopology top) {
+    private void assignPartitionStates(GridDhtPartitionTopology top, boolean resetOwners) {
         Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
-        Map<Integer, Long> minCntrs = new HashMap<>();
+        Map<Integer, TreeSet<Long>> varCntrs = new HashMap<>();
 
         for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
             CachePartitionPartialCountersMap nodeCntrs = e.getValue().partitionUpdateCounters(top.groupId(),
@@ -3288,9 +3291,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             for (int i = 0; i < nodeCntrs.size(); i++) {
                 int p = nodeCntrs.partitionAt(i);
 
-                UUID uuid = e.getKey();
+                UUID remoteNodeId = e.getKey();
 
-                GridDhtPartitionState state = top.partitionState(uuid, p);
+                GridDhtPartitionState state = top.partitionState(remoteNodeId, p);
 
                 if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING)
                     continue;
@@ -3299,10 +3302,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                     nodeCntrs.initialUpdateCounterAt(i) :
                     nodeCntrs.updateCounterAt(i);
 
-                Long minCntr = minCntrs.get(p);
-
-                if (minCntr == null || minCntr > cntr)
-                    minCntrs.put(p, cntr);
+                varCntrs.computeIfAbsent(p, key -> new TreeSet<>()).add(cntr);
 
                 if (state != GridDhtPartitionState.OWNING)
                     continue;
@@ -3310,9 +3310,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 CounterWithNodes maxCntr = maxCntrs.get(p);
 
                 if (maxCntr == null || cntr > maxCntr.cnt)
-                    maxCntrs.put(p, new CounterWithNodes(cntr, e.getValue().partitionSizes(top.groupId()).get(p), uuid));
+                    maxCntrs.put(p, new CounterWithNodes(cntr, e.getValue().partitionSizes(top.groupId()).get(p), remoteNodeId));
                 else if (cntr == maxCntr.cnt)
-                    maxCntr.nodes.add(uuid);
+                    maxCntr.nodes.add(remoteNodeId);
             }
         }
 
@@ -3323,12 +3323,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING)
                 continue;
 
-            final long cntr = state == GridDhtPartitionState.MOVING ? part.initialUpdateCounter() : part.updateCounter();
-
-            Long minCntr = minCntrs.get(part.id());
+            final long cntr = state == GridDhtPartitionState.MOVING ?
+                part.initialUpdateCounter() :
+                part.updateCounter();
 
-            if (minCntr == null || minCntr > cntr)
-                minCntrs.put(part.id(), cntr);
+            varCntrs.computeIfAbsent(part.id(), key -> new TreeSet<>()).add(cntr);
 
             if (state != GridDhtPartitionState.OWNING)
                 continue;
@@ -3351,68 +3350,114 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 maxCntr.nodes.add(cctx.localNodeId());
         }
 
+        Set<Integer> haveHistory = new HashSet<>();
+
+        assignHistoricalSuppliers(top, maxCntrs, varCntrs, haveHistory);
+
+        if (resetOwners)
+            resetOwnersByCounter(top, maxCntrs, haveHistory);
+    }
+
+    /**
+     * Determine new owners for partitions.
+     * If anyone of OWNING partitions have a counter less than maximum this partition changes state to MOVING forcibly.
+     *
+     * @param top Topology.
+     * @param maxCntrs Max counter partiton map.
+     * @param haveHistory Set of partitions witch have historical supplier.
+     */
+    private void resetOwnersByCounter(GridDhtPartitionTopology top,
+        Map<Integer, CounterWithNodes> maxCntrs, Set<Integer> haveHistory) {
+        Map<Integer, Set<UUID>> ownersByUpdCounters = U.newHashMap(maxCntrs.size());
+        Map<Integer, Long> partSizes = U.newHashMap(maxCntrs.size());
+
+        for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet()) {
+            ownersByUpdCounters.put(e.getKey(), e.getValue().nodes);
+
+            partSizes.put(e.getKey(), e.getValue().size);
+        }
+
+        top.globalPartSizes(partSizes);
+
+        Map<UUID, Set<Integer>> partitionsToRebalance = top.resetOwners(
+            ownersByUpdCounters, haveHistory, this);
+
+        for (Map.Entry<UUID, Set<Integer>> e : partitionsToRebalance.entrySet()) {
+            UUID nodeId = e.getKey();
+            Set<Integer> parts = e.getValue();
+
+            for (int part : parts)
+                partsToReload.put(nodeId, top.groupId(), part);
+        }
+    }
+
+    /**
+     * Find and assign suppliers for history rebalance.
+     *
+     * @param top Topology.
+     * @param maxCntrs Max counter partiton map.
+     * @param varCntrs Various counters for each partition.
+     * @param haveHistory Set of partitions witch have historical supplier.
+     */
+    private void assignHistoricalSuppliers(
+        GridDhtPartitionTopology top,
+        Map<Integer, CounterWithNodes> maxCntrs,
+        Map<Integer, TreeSet<Long>> varCntrs,
+        Set<Integer> haveHistory
+    ) {
         Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
 
         Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.groupId()) : null;
 
-        Set<Integer> haveHistory = new HashSet<>();
-
-        for (Map.Entry<Integer, Long> e : minCntrs.entrySet()) {
+        for (Map.Entry<Integer, TreeSet<Long>> e : varCntrs.entrySet()) {
             int p = e.getKey();
-            long minCntr = e.getValue();
 
             CounterWithNodes maxCntrObj = maxCntrs.get(p);
 
             long maxCntr = maxCntrObj != null ? maxCntrObj.cnt : 0;
 
-            // If minimal counter is zero, do clean preloading.
-            if (minCntr == 0 || minCntr == maxCntr)
+            NavigableSet<Long> nonMaxCntrs = e.getValue().headSet(maxCntr, false);
+
+            // If minimal counter equals maximum then historical supplier does not necessary.
+            if (nonMaxCntrs.isEmpty())
                 continue;
 
+            T2<UUID, Long> deepestReserved = new T2<>(null, Long.MAX_VALUE);
+
             if (localReserved != null) {
                 Long localHistCntr = localReserved.get(p);
 
-                if (localHistCntr != null && localHistCntr <= minCntr && maxCntrObj.nodes.contains(cctx.localNodeId())) {
-                    partHistSuppliers.put(cctx.localNodeId(), top.groupId(), p, localHistCntr);
+                if (localHistCntr != null && maxCntrObj.nodes.contains(cctx.localNodeId())) {
+                    Long ceilingMinReserved = nonMaxCntrs.ceiling(localHistCntr);
 
-                    haveHistory.add(p);
+                    if (ceilingMinReserved != null) {
+                        partHistSuppliers.put(cctx.localNodeId(), top.groupId(), p, ceilingMinReserved);
 
-                    continue;
+                        haveHistory.add(p);
+                    }
+
+                    if (deepestReserved.get2() > localHistCntr)
+                        deepestReserved.set(cctx.localNodeId(), localHistCntr);
                 }
             }
 
             for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e0 : msgs.entrySet()) {
                 Long histCntr = e0.getValue().partitionHistoryCounters(top.groupId()).get(p);
 
-                if (histCntr != null && histCntr <= minCntr && maxCntrObj.nodes.contains(e0.getKey())) {
-                    partHistSuppliers.put(e0.getKey(), top.groupId(), p, histCntr);
+                if (histCntr != null && maxCntrObj.nodes.contains(e0.getKey())) {
+                    Long ceilingMinReserved = nonMaxCntrs.ceiling(histCntr);
 
-                    haveHistory.add(p);
+                    if (ceilingMinReserved != null) {
+                        partHistSuppliers.put(e0.getKey(), top.groupId(), p, ceilingMinReserved);
 
-                    break;
+                        haveHistory.add(p);
+                    }
+
+                    if (deepestReserved.get2() > histCntr)
+                        deepestReserved.set(e0.getKey(), histCntr);
                 }
             }
         }
-
-        Map<Integer, Set<UUID>> ownersByUpdCounters = new HashMap<>(maxCntrs.size());
-        for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet())
-            ownersByUpdCounters.put(e.getKey(), e.getValue().nodes);
-
-        Map<Integer, Long> partSizes = new HashMap<>(maxCntrs.size());
-        for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet())
-            partSizes.put(e.getKey(), e.getValue().size);
-
-        top.globalPartSizes(partSizes);
-
-        Map<UUID, Set<Integer>> partitionsToRebalance = top.resetOwners(ownersByUpdCounters, haveHistory, this);
-
-        for (Map.Entry<UUID, Set<Integer>> e : partitionsToRebalance.entrySet()) {
-            UUID nodeId = e.getKey();
-            Set<Integer> parts = e.getValue();
-
-            for (int part : parts)
-                partsToReload.put(nodeId, top.groupId(), part);
-        }
     }
 
     /**
@@ -3692,7 +3737,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 assert firstDiscoEvt instanceof DiscoveryCustomEvent;
 
                 if (activateCluster() || changedBaseline())
-                    assignPartitionsStates();
+                    assignPartitionsStates(true);
 
                 DiscoveryCustomMessage discoveryCustomMessage = ((DiscoveryCustomEvent) firstDiscoEvt).customMessage();
 
@@ -3703,18 +3748,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         if (!F.isEmpty(caches))
                             resetLostPartitions(caches);
 
-                        assignPartitionsStates();
+                        assignPartitionsStates(true);
                     }
                 }
                 else if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage
                     && ((SnapshotDiscoveryMessage)discoveryCustomMessage).needAssignPartitions()) {
                     markAffinityReassign();
 
-                    assignPartitionsStates();
+                    assignPartitionsStates(true);
                 }
             }
             else if (exchCtx.events().hasServerJoin())
-                assignPartitionsStates();
+                assignPartitionsStates(true);
+            else if (exchCtx.events().hasServerLeft())
+                assignPartitionsStates(false);
 
             // Recalculate new affinity based on partitions availability.
             if (!exchCtx.mergeExchanges() && forceAffReassignment) {
@@ -4002,9 +4049,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     *
+     * @param resetOwners True if reset partitions state needed, false otherwise.
      */
-    private void assignPartitionsStates() {
+    private void assignPartitionsStates(boolean resetOwners) {
         try {
             U.doInParallel(
                 cctx.kernalContext().getSystemExecutorService(),
@@ -4016,10 +4063,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                         ? grpCtx.topology()
                         : cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
 
-                    if (!CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration()))
+                    if (CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration()))
+                        assignPartitionStates(top, resetOwners);
+                    else if (resetOwners)
                         assignPartitionSizes(top);
-                    else
-                        assignPartitionStates(top);
 
                     return null;
                 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 17a500b..23ff455 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -44,14 +44,15 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
 
 /**
@@ -249,10 +250,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 ClusterNode histSupplier = null;
 
                 if (grp.persistenceEnabled() && exchFut != null) {
-                    UUID nodeId = exchFut.partitionHistorySupplier(grp.groupId(), p, part.initialUpdateCounter());
+                    List<UUID> nodeIds = exchFut.partitionHistorySupplier(grp.groupId(), p, part.initialUpdateCounter());
 
-                    if (nodeId != null)
-                        histSupplier = ctx.discovery().node(nodeId);
+                    if (!F.isEmpty(nodeIds))
+                        histSupplier = ctx.discovery().node(nodeIds.get(p % nodeIds.size()));
                 }
 
                 if (histSupplier != null && !exchFut.isClearingPartition(grp, p)) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
index 6755f28..427aad8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
@@ -19,7 +19,10 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -50,11 +53,13 @@ public class IgniteDhtPartitionHistorySuppliersMap implements Serializable {
      * @param grpId Group ID.
      * @param partId Partition ID.
      * @param cntrSince Partition update counter since history supplying is requested.
-     * @return Supplier UUID.
+     * @return List of supplier UUIDs or empty list if haven't these.
      */
-    @Nullable public synchronized UUID getSupplier(int grpId, int partId, long cntrSince) {
+    public synchronized List<UUID> getSupplier(int grpId, int partId, long cntrSince) {
         if (map == null)
-            return null;
+            return Collections.EMPTY_LIST;
+
+        List<UUID> suppliers = new ArrayList<>();
 
         for (Map.Entry<UUID, Map<T2<Integer, Integer>, Long>> e : map.entrySet()) {
             UUID supplierNode = e.getKey();
@@ -62,10 +67,10 @@ public class IgniteDhtPartitionHistorySuppliersMap implements Serializable {
             Long historyCounter = e.getValue().get(new T2<>(grpId, partId));
 
             if (historyCounter != null && historyCounter <= cntrSince)
-                return supplierNode;
+                suppliers.add(supplierNode);
         }
 
-        return null;
+        return suppliers;
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java
new file mode 100644
index 0000000..4432c58
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * This test checks that historical rebalance can be restarted after canceling by some reason.
+ */
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+public class WalRebalanceRestartTest extends GridCommonAbstractTest {
+
+    /** Version of progressing rebalance. */
+    private volatile AffinityTopologyVersion rebTopVer = null;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setConsistentId(igniteInstanceName)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi())
+            .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+                .setAffinity(new RendezvousAffinityFunction(false, 16))
+                .setBackups(2))
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setMaxSize(200L * 1024 * 1024)
+                    .setPersistenceEnabled(true)));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * Restart rebalance manually.
+     *
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testForceReassignment() throws Exception {
+        restartRebalance((ignite) -> {
+            IgniteFuture manualRebFut = ignite.cache(DEFAULT_CACHE_NAME).rebalance();
+        }, false);
+    }
+
+    /**
+     * Restart rebalance when another server joined and baseline changed.
+     *
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testAnotherServerJoinedAndChangeBlt() throws Exception {
+        restartRebalance((ignite) -> {
+            startGrid("new_srv");
+
+            resetBaselineTopology();
+        }, true);
+    }
+
+    /**
+     * Restart rebalance when another server joined.
+     *
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testAnotherServerJoined() throws Exception {
+        restartRebalance((ignite) -> {
+            startGrid("new_srv");
+        }, true);
+    }
+
+    /**
+     * Restart rebalance when new cache started.
+     *
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testCacheStarted() throws Exception {
+        restartRebalance((ignite) -> {
+            ignite.getOrCreateCache("new_" + DEFAULT_CACHE_NAME);
+        }, true);
+    }
+
+    /**
+     * Restart rebalance when one of suppliers leaved topology.
+     *
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testStopSupplier() throws Exception {
+        restartRebalance((ignite) -> {
+            stopFirstFoundSupplier(ignite);
+        }, true);
+    }
+
+    /**
+     * This test starts empty node and stops one of supplier during a rebalance,
+     * in order to that historical rebalance recovers twice time.
+     *
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testStartNewNodeAndStopSupplier() throws Exception {
+        restartRebalance((ignite) -> {
+            startGrid("new_srv");
+
+            resetBaselineTopology();
+
+            waitForRebalanceOnLastDiscoTopology(ignite);
+
+            stopFirstFoundSupplier(ignite);
+        }, true);
+    }
+
+    /**
+     * Waiting for rebalancing on last topology which got through Discovery.
+     *
+     * @param ignite Ignite.
+     * @throws IgniteInterruptedCheckedException if failed.
+     */
+    private void waitForRebalanceOnLastDiscoTopology(IgniteEx ignite) throws IgniteInterruptedCheckedException {
+        AffinityTopologyVersion readyAffinity = ignite.context().cache().context().exchange().readyAffinityVersion();
+
+        assertTrue("Can not wait for rebalance topology [cur=" + rebTopVer + ", expect: " + readyAffinity + ']',
+            GridTestUtils.waitForCondition(() -> rebTopVer.equals(readyAffinity),
+                10_000));
+    }
+
+    /**
+     * Stop supplier and start new node.
+     *
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testStopSupplierAndStartNewNode() throws Exception {
+        restartRebalance((ignite) -> {
+            stopFirstFoundSupplier(ignite);
+
+            waitForRebalanceOnLastDiscoTopology(ignite);
+
+            startGrid("new_srv");
+
+            resetBaselineTopology();
+        }, true);
+    }
+
+    /**
+     * Stop first found supplier for current rebalance on specific node.
+     *
+     * @param ignite Ignite.
+     */
+    private void stopFirstFoundSupplier(IgniteEx ignite) {
+        IgniteInternalFuture rebFut = ignite.cachex(DEFAULT_CACHE_NAME).context().preloader().rebalanceFuture();
+
+        assertFalse(rebFut.isDone());
+
+        Map<UUID, IgniteDhtDemandedPartitionsMap> remainding = U.field(rebFut, "remaining");
+
+        assertFalse(remainding.isEmpty());
+
+        UUID supplierId = remainding.keySet().iterator().next();
+
+        info("First dupplier: " + supplierId);
+
+        for (Ignite ign : G.allGrids()) {
+            if (ign.cluster().localNode().id().equals(supplierId))
+                ign.close();
+        }
+    }
+
+    /**
+     * Method hangs a rebalance on one node and invoke some trigger and check influence.
+     *
+     * @param retrigger Rebalance trigger.
+     * @param retriggerAsHistorical True means rebalance will be restarted as historical, false is as full.
+     * @throws Exception if failed.
+     */
+    private void restartRebalance(RebalanceRetrigger retrigger, boolean retriggerAsHistorical) throws Exception {
+        IgniteEx ignite0 = startGrids(4);
+
+        ignite0.cluster().active(true);
+
+        try (IgniteDataStreamer streamer = ignite0.dataStreamer(DEFAULT_CACHE_NAME)) {
+            streamer.allowOverwrite(true);
+
+            for (int i = 0; i < 1000; i++)
+                streamer.addData(i, String.valueOf(i));
+        }
+
+        awaitPartitionMapExchange();
+        forceCheckpoint();
+
+        ignite(2).close();
+
+        try (IgniteDataStreamer streamer = ignite0.dataStreamer(DEFAULT_CACHE_NAME)) {
+            streamer.allowOverwrite(true);
+
+            for (int i = 1000; i < 2000; i++)
+                streamer.addData(i, String.valueOf(i));
+        }
+
+        awaitPartitionMapExchange();
+        forceCheckpoint();
+
+        IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(2));
+
+        TestRecordingCommunicationSpi spi2 = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+        AtomicBoolean hasFullRebalance = new AtomicBoolean();
+
+        spi2.record((node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage)msg;
+
+                if (CU.cacheId(DEFAULT_CACHE_NAME) == demandMsg.groupId()) {
+                    if (rebTopVer == null || rebTopVer.before(demandMsg.topologyVersion()))
+                        rebTopVer = demandMsg.topologyVersion();
+
+                    if (!F.isEmpty(demandMsg.partitions().fullSet()))
+                        hasFullRebalance.compareAndSet(false, true);
+                }
+
+            }
+
+            return false;
+        });
+
+        spi2.blockMessages((node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage)msg;
+
+                if (CU.cacheId(DEFAULT_CACHE_NAME) == demandMsg.groupId())
+                    return true;
+            }
+
+            return false;
+
+        });
+
+        IgniteEx ignite2 = startGrid(optimize(cfg));
+
+        spi2.waitForBlocked();
+
+        assertFalse(hasFullRebalance.get());
+
+        retrigger.trigger(ignite2);
+
+        spi2.stopBlock();
+
+        awaitPartitionMapExchange();
+
+        if (retriggerAsHistorical)
+            assertFalse(hasFullRebalance.get());
+        else
+            assertTrue(hasFullRebalance.get());
+    }
+
+    /**
+     * Rebalance trigger interface.
+     */
+    private static interface RebalanceRetrigger {
+        /**
+         * Trigger some action.
+         *
+         * @param ignite Ignite.
+         * @throws Exception If issue happened.
+         */
+        public void trigger(IgniteEx ignite) throws Exception;
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
index 7dbc157..afe82f1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
@@ -376,15 +376,17 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
 
         prim.cache(DEFAULT_CACHE_NAME).put(keys.get(0), keys.get(0));
 
-        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(prim);
+        TestRecordingCommunicationSpi spiPrim = TestRecordingCommunicationSpi.spi(prim);
+        TestRecordingCommunicationSpi spiBack = TestRecordingCommunicationSpi.spi(backups.get(1));
 
-        spi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
+        spiPrim.blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
+        spiBack.blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
 
         IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
             try {
-                spi.waitForBlocked();
+                GridTestUtils.waitForCondition(() -> spiPrim.hasBlockedMessages() || spiBack.hasBlockedMessages(), 10_000);
             }
-            catch (InterruptedException e) {
+            catch (Exception e) {
                 fail(X.getFullStackTrace(e));
             }
 
@@ -393,7 +395,8 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
             doSleep(2000);
 
             // Ensure queue cleanup is triggered before releasing supply message.
-            spi.stopBlock();
+            spiPrim.stopBlock();
+            spiBack.stopBlock();
         });
 
         startGrid(backups.get(0).name());
@@ -440,33 +443,19 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
         cache.put(p1Keys.get(1), 2);
         cache.put(p2Keys.get(1), 1); // Will be fully rebalanced.
 
-        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(crd);
-
-        // Prevent rebalance completion.
-        spi.blockMessages((node, msg) -> {
-            String name = (String)node.attributes().get(ATTR_IGNITE_INSTANCE_NAME);
-
-            if (name.equals(backupName) && msg instanceof GridDhtPartitionSupplyMessage) {
-                GridDhtPartitionSupplyMessage msg0 = (GridDhtPartitionSupplyMessage)msg;
-
-                if (msg0.groupId() != CU.cacheId(DEFAULT_CACHE_NAME))
-                    return false;
-
-                Map<Integer, CacheEntryInfoCollection> infos = U.field(msg0, "infos");
-
-                return infos.keySet().contains(primaryParts[0]); // Delay historical rebalance.
-            }
+        List<TestRecordingCommunicationSpi> spis = new ArrayList<>();
 
-            return false;
-        });
+        for (Ignite ignite: G.allGrids())
+            spis.add(blockSupplyFromNode(ignite, primaryParts, backupName));
 
         backup = startGrid(backupName);
 
-        spi.waitForBlocked();
+        GridTestUtils.waitForCondition(() -> spis.stream().anyMatch(TestRecordingCommunicationSpi::hasBlockedMessages),
+            10_000);
 
         forceCheckpoint(backup);
 
-        spi.stopBlock();
+        spis.stream().forEach(TestRecordingCommunicationSpi::stopBlock);
 
         // While message is delayed topology version shouldn't change to ideal.
         awaitPartitionMapExchange();
@@ -524,30 +513,26 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
 
         stopAllGrids();
 
-        crd = startGrid(0);
+        crd = startNodeWithBlockingSupplying(0);
         startGrid(1);
-        startGrid(2);
-
-        TestRecordingCommunicationSpi crdSpi = TestRecordingCommunicationSpi.spi(crd);
+        startNodeWithBlockingSupplying(2);
 
-        // Block all rebalance from crd.
-        crdSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
-            @Override public boolean apply(ClusterNode node, Message msg) {
-                return msg instanceof GridDhtPartitionSupplyMessage;
-            }
-        });
+        crd.cluster().active(true);
 
-        crd.cluster().active(true); // Rebalancing is triggered on activation.
+        TestRecordingCommunicationSpi spi0 = TestRecordingCommunicationSpi.spi(crd);
+        TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(ignite(2));
 
         IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
             try {
-                crdSpi.waitForBlocked();
+                GridTestUtils.waitForCondition(() -> spi0.hasBlockedMessages() || spi2.hasBlockedMessages(), 10_000);
 
                 // Stop before supplying rebalance. New rebalance must start with second backup as supplier
                 // doing full rebalance.
                 stopGrid(primName);
+
+                spi2.stopBlock();
             }
-            catch (InterruptedException e) {
+            catch (Exception e) {
                 fail();
             }
         });
@@ -572,6 +557,21 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
     }
 
     /**
+     * @param idx Starting node index.
+     * @return Ignite.
+     * @throws Exception If failed.
+     */
+    private Ignite startNodeWithBlockingSupplying(int idx) throws Exception {
+        IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(idx));
+
+        TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+        spi.blockMessages(GridDhtPartitionSupplyMessage.class, getTestIgniteInstanceName(1));
+
+        return startGrid(optimize(cfg));
+    }
+
+    /**
      * Tests reproduces the problem: if node joins after missing some updates no partition inconsistency happens.
      *
      * @throws Exception If failed.
@@ -1346,7 +1346,35 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
 
         cache.remove(keys.get(1));
 
-        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(crd);
+        List<TestRecordingCommunicationSpi> spis = new ArrayList<>();
+
+        for (Ignite ignite: G.allGrids())
+            spis.add(blockSupplyFromNode(ignite, primaryParts, backupName));
+
+        startGrid(backupName);
+
+        GridTestUtils.waitForCondition(() -> spis.stream().anyMatch(TestRecordingCommunicationSpi::hasBlockedMessages),
+            10_000);
+
+        rebBlockClo.accept(backupName);
+
+        spis.stream().forEach(TestRecordingCommunicationSpi::stopBlock);
+
+        rebUnblockClo.accept(backupName);
+
+        awaitPartitionMapExchange();
+
+        assertPartitionsSame(idleVerify(crd, DEFAULT_CACHE_NAME));
+    }
+
+    /**
+     * @param ingine Ignite.
+     * @param primaryParts Array of partitions.
+     * @param backupName Name of node for which supply messages will be blocked.
+     * @return Test communication SPI.
+     */
+    private TestRecordingCommunicationSpi blockSupplyFromNode(Ignite ingine, int[] primaryParts, String backupName) {
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(ingine);
 
         // Prevent rebalance completion.
         spi.blockMessages((node, msg) -> {
@@ -1365,20 +1393,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
 
             return false;
         });
-
-        startGrid(backupName);
-
-        spi.waitForBlocked();
-
-        rebBlockClo.accept(backupName);
-
-        spi.stopBlock();
-
-        rebUnblockClo.accept(backupName);
-
-        awaitPartitionMapExchange();
-
-        assertPartitionsSame(idleVerify(crd, DEFAULT_CACHE_NAME));
+        return spi;
     }
 
     /** */
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index e9e25ca..17ca8a4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsParti
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsStartWIthEmptyArchive;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.HistoricalReservationTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRebalanceRestartTest;
 import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.PageLockTrackerManagerTest;
 import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.SharedPageLockTrackerTest;
 import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.dumpprocessors.ToFileDumpProcessorTest;
@@ -86,6 +87,7 @@ public class IgnitePdsTestSuite4 {
         GridTestUtils.addTestIfNeeded(suite, RebalanceCancellationTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, NotOptimizedRebalanceTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, BreakRebalanceChainTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, WalRebalanceRestartTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsRestartAfterFailedToWriteMetaPageTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, IgnitePdsRemoveDuringRebalancingTest.class, ignoredTests);