You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2020/08/18 13:55:54 UTC

[ignite] branch master updated: IGNITE-13253 Advanced heuristics for historical rebalance - Fixes #8160

This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b82b650  IGNITE-13253 Advanced heuristics for historical rebalance - Fixes #8160
b82b650 is described below

commit b82b65050c50e8820bd1480a38aefb2d20e4b0f1
Author: Ivan Rakov <iv...@gmail.com>
AuthorDate: Tue Aug 18 16:55:10 2020 +0300

    IGNITE-13253 Advanced heuristics for historical rebalance - Fixes #8160
---
 .../org/apache/ignite/IgniteSystemProperties.java  |   7 +
 .../preloader/GridDhtPartitionsExchangeFuture.java |  83 +++++++---
 .../GridCacheDatabaseSharedManager.java            |  18 ++-
 .../apache/ignite/cache/CircledRebalanceTest.java  |  15 +-
 ...CachePartitionLostAfterSupplierHasLeftTest.java |  11 +-
 .../HistoricalRebalanceHeuristicsTest.java         | 172 +++++++++++++++++++++
 ...itePdsAtomicCacheHistoricalRebalancingTest.java |   4 +-
 .../IgnitePdsTxHistoricalRebalancingTest.java      |   4 +-
 .../db/FullHistRebalanceOnClientStopTest.java      |   3 +-
 .../db/wal/IgniteWalRebalanceLoggingTest.java      |   3 +-
 .../persistence/db/wal/IgniteWalRebalanceTest.java |  26 +++-
 ...ounterStateConsistencyHistoryRebalanceTest.java |   4 +-
 ...ounterStateConsistencyHistoryRebalanceTest.java |   4 +-
 ...ateOnePrimaryOneBackupHistoryRebalanceTest.java |   4 +-
 ...imaryTwoBackupsFailAllHistoryRebalanceTest.java |   4 +-
 ...teOnePrimaryTwoBackupsHistoryRebalanceTest.java |   4 +-
 .../DistributedMetaStoragePersistentTest.java      |   6 +-
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |   3 +
 18 files changed, 319 insertions(+), 56 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 18b351f..d1f59af 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -746,6 +746,13 @@ public final class IgniteSystemProperties {
      */
     public static final String IGNITE_PDS_WAL_REBALANCE_THRESHOLD = "IGNITE_PDS_WAL_REBALANCE_THRESHOLD";
 
+    /**
+     * Prefer historical rebalance if there's enough history regardless off all heuristics.
+     * This property is intended for integration or performance tests.
+     * Default is {@code false}.
+     */
+    public static final String IGNITE_PREFER_WAL_REBALANCE = "IGNITE_PREFER_WAL_REBALANCE";
+
     /** Ignite page memory concurrency level. */
     public static final String IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL = "IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL";
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2cc9f76..0dd4b35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -100,6 +100,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator;
 import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -3526,7 +3527,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     ) {
         Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
 
-        Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.groupId()) : null;
+        int grpId = top.groupId();
+
+        Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(grpId) : null;
 
         List<SupplyPartitionInfo> list = new ArrayList<>();
 
@@ -3549,33 +3552,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 Long localHistCntr = localReserved.get(p);
 
                 if (localHistCntr != null && maxCntrObj.nodes.contains(cctx.localNodeId())) {
-                    Long ceilingMinReserved = nonMaxCntrs.ceiling(localHistCntr);
-
-                    if (ceilingMinReserved != null) {
-                        partHistSuppliers.put(cctx.localNodeId(), top.groupId(), p, ceilingMinReserved);
-
-                        haveHistory.add(p);
-                    }
-
-                    if (deepestReserved.get2() > localHistCntr)
-                        deepestReserved.set(cctx.localNodeId(), localHistCntr);
+                    findCounterForReservation(grpId, p, maxCntr, localHistCntr, maxCntrObj.size, cctx.localNodeId(),
+                        nonMaxCntrs, haveHistory, deepestReserved);
                 }
             }
 
             for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e0 : msgs.entrySet()) {
-                Long histCntr = e0.getValue().partitionHistoryCounters(top.groupId()).get(p);
+                Long histCntr = e0.getValue().partitionHistoryCounters(grpId).get(p);
 
                 if (histCntr != null && maxCntrObj.nodes.contains(e0.getKey())) {
-                    Long ceilingMinReserved = nonMaxCntrs.ceiling(histCntr);
-
-                    if (ceilingMinReserved != null) {
-                        partHistSuppliers.put(e0.getKey(), top.groupId(), p, ceilingMinReserved);
-
-                        haveHistory.add(p);
-                    }
-
-                    if (deepestReserved.get2() > histCntr)
-                        deepestReserved.set(e0.getKey(), histCntr);
+                    findCounterForReservation(grpId, p, maxCntr, histCntr, maxCntrObj.size, e0.getKey(),
+                        nonMaxCntrs, haveHistory, deepestReserved);
                 }
             }
 
@@ -3594,6 +3581,55 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Finds a historical counter in WAL of partition owner node, so that WAL interval from the historical counter
+     * to a max counter contains lesser updates that full partition size.
+     * If all conditions for ownerId to become a historical supplier are matched, it is added to partHistSuppliers map.
+     *
+     * @param grpId Id of cache group.
+     * @param p Partition Id.
+     * @param maxOwnerCntr Counter of owning partition.
+     * @param ownerReservedHistCntr Min counter which can be reserved for this partition.
+     * @param ownerSize Size of owned partition.
+     * @param ownerId Owner node id.
+     * @param nonMaxCntrs Sorted set of non max counters.
+     * @param haveHistory Modifiable collection for partitions that will be rebalanced historically.
+     * @param deepestReserved The Deepest reservation per node id.
+     */
+    private void findCounterForReservation(
+        int grpId,
+        int p,
+        long maxOwnerCntr,
+        Long ownerReservedHistCntr,
+        long ownerSize,
+        UUID ownerId,
+        NavigableSet<Long> nonMaxCntrs,
+        Set<Integer> haveHistory,
+        T2<UUID, Long> deepestReserved
+    ) {
+        boolean preferWalRebalance = ((GridCacheDatabaseSharedManager)cctx.database()).preferWalRebalance();
+
+        while (!nonMaxCntrs.isEmpty()) {
+            Long ceilingMinReserved = nonMaxCntrs.ceiling(ownerReservedHistCntr);
+
+            if (ceilingMinReserved == null)
+                break;
+
+            if (preferWalRebalance || maxOwnerCntr - ceilingMinReserved < ownerSize) {
+                partHistSuppliers.put(ownerId, grpId, p, ceilingMinReserved);
+
+                haveHistory.add(p);
+
+                break;
+            }
+
+            nonMaxCntrs = nonMaxCntrs.tailSet(ceilingMinReserved, false);
+        }
+
+        if (deepestReserved.get2() > ownerReservedHistCntr)
+            deepestReserved.set(ownerId, ownerReservedHistCntr);
+    }
+
+    /**
      * Detect lost partitions in case of node left or failed. For topology coordinator is called when all {@link
      * GridDhtPartitionsSingleMessage} were received. For other nodes is called when exchange future is completed by
      * {@link GridDhtPartitionsFullMessage}.
@@ -4244,7 +4280,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             }
 
             if (hasPartitionToLog(supplyInfoMap, true)) {
-                log.info("Partitions were reserved, but maximum available counter is greater than demanded: [" +
+                log.info("Partitions were reserved, but maximum available counter is greater than demanded " +
+                    "or WAL contains too many updates: [" +
                     supplyInfoMap.entrySet().stream().map(entry ->
                         "[grp=" + entry.getKey() + ' ' +
                             entry.getValue().stream().filter(SupplyPartitionInfo::isHistoryReserved).map(info ->
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index e263d38..246e363 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -165,6 +165,7 @@ import static java.nio.file.StandardOpenOption.READ;
 import static java.util.Objects.nonNull;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_RECOVERY_SEMAPHORE_PERMITS;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -214,7 +215,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     private static final double PAGE_LIST_CACHE_LIMIT_THRESHOLD = 0.1;
 
     /** */
-    private final int walRebalanceThreshold = getInteger(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000);
+    private final int walRebalanceThreshold = getInteger(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500);
+
+    /** Prefer historical rebalance flag. */
+    private final boolean preferWalRebalance = getBoolean(IGNITE_PREFER_WAL_REBALANCE);
 
     /** Value of property for throttling policy override. */
     private final String throttlingPolicyOverride = IgniteSystemProperties.getString(
@@ -414,6 +418,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     }
 
     /**
+     * Returns true if historical rebalance is preferred,
+     * false means using heuristic for determine rebalance type.
+     *
+     * @return Flag of preferred historical rebalance.
+     */
+    public boolean preferWalRebalance() {
+        return preferWalRebalance;
+    }
+
+    /**
      * For test use only.
      */
     public IgniteInternalFuture<Void> enableCheckpoints(boolean enable) {
@@ -1739,7 +1753,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 continue;
 
             for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) {
-                if (locPart.state() == OWNING && locPart.fullSize() > walRebalanceThreshold)
+                if (locPart.state() == OWNING && (preferWalRebalance() || locPart.fullSize() > walRebalanceThreshold))
                     res.computeIfAbsent(grp.groupId(), k -> new HashSet<>()).add(locPart.id());
             }
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java
index 6e49dee..ad0da5a 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java
@@ -49,6 +49,9 @@ public class CircledRebalanceTest extends GridCommonAbstractTest {
     /** Count of restart iterations. */
     public static final int ITERATIONS = 10;
 
+    /** Cache partiton count. */
+    public static final int PARTS = 64;
+
     /** Count of backup for default cache. */
     private int backups = 1;
 
@@ -65,7 +68,7 @@ public class CircledRebalanceTest extends GridCommonAbstractTest {
                     .setPersistenceEnabled(true)))
             .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
                 .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
-                .setAffinity(new RendezvousAffinityFunction(false, 64))
+                .setAffinity(new RendezvousAffinityFunction(false, PARTS))
                 .setBackups(backups));
     }
 
@@ -177,11 +180,11 @@ public class CircledRebalanceTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Load some data to default cache.
+     * Load keys for preloading significant more than for updates and covers all cache partitions.
      *
-     * @param sequentially True for loading sequential keys, false for random.
+     * @param preload True for preloading keys, false otherwise.
      */
-    public void loadData(boolean sequentially) {
+    public void loadData(boolean preload) {
         Random random = new Random();
 
         Ignite ignite = G.allGrids().get(0);
@@ -189,10 +192,10 @@ public class CircledRebalanceTest extends GridCommonAbstractTest {
         try (IgniteDataStreamer streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
             streamer.allowOverwrite(true);
 
-            for (int i = 0; i < 100; i++) {
+            for (int i = 0; i < (preload ? 100 * PARTS : 100); i++) {
                 Integer ranDomKey = random.nextInt(10_000);
 
-                streamer.addData(sequentially ? i : ranDomKey, "Val " + ranDomKey);
+                streamer.addData(preload ? i : ranDomKey, "Val " + ranDomKey);
             }
         }
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionLostAfterSupplierHasLeftTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionLostAfterSupplierHasLeftTest.java
index f28991e..096e5a8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionLostAfterSupplierHasLeftTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionLostAfterSupplierHasLeftTest.java
@@ -42,16 +42,20 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 
 /**
  * Test scenario: last supplier has left while a partition on demander is cleared before sending first demand request.
  */
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
 public class CachePartitionLostAfterSupplierHasLeftTest extends GridCommonAbstractTest {
     /** */
     private static final int PARTS_CNT = 64;
@@ -137,6 +141,7 @@ public class CachePartitionLostAfterSupplierHasLeftTest extends GridCommonAbstra
      * @throws Exception If failed.
      */
     @Test
+    @WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
     public void testPartitionLostWhileClearing_FailOnFullMessage() throws Exception {
         lossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
         persistence = true;
@@ -349,7 +354,11 @@ public class CachePartitionLostAfterSupplierHasLeftTest extends GridCommonAbstra
 
         final GridDhtLocalPartition part = g1.cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(partId);
 
-        assertEquals(GridDhtPartitionState.LOST, part.state());
+        assertTrue("Unexpected partition state [p=" + partId +
+                ", expected=" + GridDhtPartitionState.LOST +
+                ", actual=" + part.state() + ']',
+            GridTestUtils.waitForCondition(() -> part.state() == GridDhtPartitionState.LOST, 30_000));
+
         assertTrue(g1.cachex(DEFAULT_CACHE_NAME).lostPartitions().contains(partId));
 
         if (mode != 0) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/HistoricalRebalanceHeuristicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/HistoricalRebalanceHeuristicsTest.java
new file mode 100644
index 0000000..a774be5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/HistoricalRebalanceHeuristicsTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+
+/**
+ * Checks that way of rebalancing is selected based on a heuristic:
+ * if number of updates to rebalance is greater than partition size, full rebalance should be used.
+ */
+@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@RunWith(Parameterized.class)
+public class HistoricalRebalanceHeuristicsTest extends GridCommonAbstractTest {
+    /** Initial keys. */
+    private static final int INITIAL_KEYS = 5000;
+
+    /** Atomicity mode. */
+    @Parameterized.Parameter()
+    public boolean historical;
+
+    /** Full rebalancing happened flag. */
+    private final AtomicBoolean fullRebalancingHappened = new AtomicBoolean(false);
+
+    /** Historical rebalancing happened flag. */
+    private final AtomicBoolean historicalRebalancingHappened = new AtomicBoolean(false);
+
+    /**
+     * @return List of versions pairs to test.
+     */
+    @Parameterized.Parameters(name = "historical = {0}")
+    public static Collection<Object[]> testData() {
+        List<Object[]> res = new ArrayList<>();
+
+        res.add(new Object[] {true});
+        res.add(new Object[] {false});
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(name);
+
+        cfg.setConsistentId(name);
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE)
+                        .setPersistenceEnabled(true)
+                )
+        );
+
+        cfg.setCacheConfiguration(new CacheConfiguration()
+            .setAffinity(new RendezvousAffinityFunction(false, 8))
+            .setBackups(1)
+            .setName(DEFAULT_CACHE_NAME));
+
+        TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi();
+
+        spi.blockMessages((node, msg) -> {
+            if (msg instanceof GridDhtPartitionDemandMessage) {
+                GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage)msg;
+
+                if (demandMsg.groupId() == CU.cacheId(DEFAULT_CACHE_NAME)) {
+                    if (demandMsg.partitions().hasFull())
+                        fullRebalancingHappened.set(true);
+
+                    if (demandMsg.partitions().hasHistorical())
+                        historicalRebalancingHappened.set(true);
+                }
+            }
+
+            return false;
+        });
+
+        cfg.setCommunicationSpi(spi);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        fullRebalancingHappened.set(false);
+        historicalRebalancingHappened.set(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * Checks that heuristic (see class header doc) works correctly.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPutRemoveReorderingConsistency() throws Exception {
+        IgniteEx grid = startGrids(2);
+
+        grid.cluster().state(ClusterState.ACTIVE);
+
+        IgniteInternalCache<Integer, Integer> cache = grid.cachex(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < INITIAL_KEYS; i++)
+            cache.put(i, i);
+
+        forceCheckpoint();
+
+        stopGrid(1);
+
+        int limit = historical ? INITIAL_KEYS * 3 / 2 : INITIAL_KEYS * 5 / 2;
+
+        for (int i = INITIAL_KEYS; i < limit; i++)
+            cache.put(i % INITIAL_KEYS, i);
+
+        startGrid(1);
+
+        awaitPartitionMapExchange(true, true, null);
+
+        assertEquals(historical, historicalRebalancingHappened.get());
+
+        assertEquals(!historical, fullRebalancingHappened.get());
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
index cce9a40..64e800c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
@@ -42,7 +42,7 @@ public class IgnitePdsAtomicCacheHistoricalRebalancingTest extends IgnitePdsAtom
     /** {@inheritDoc */
     @Override protected void beforeTest() throws Exception {
         // Use rebalance from WAL if possible.
-        System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
+        System.setProperty(IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE, "true");
 
         super.beforeTest();
     }
@@ -54,7 +54,7 @@ public class IgnitePdsAtomicCacheHistoricalRebalancingTest extends IgnitePdsAtom
 
         IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi.cleanup();
 
-        System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
+        System.clearProperty(IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE);
 
         super.afterTest();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java
index 8236bd3..628acfb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java
@@ -42,7 +42,7 @@ public class IgnitePdsTxHistoricalRebalancingTest extends IgnitePdsTxCacheRebala
     /** {@inheritDoc */
     @Override protected void beforeTest() throws Exception {
         // Use rebalance from WAL if possible.
-        System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
+        System.setProperty(IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE, "true");
 
         super.beforeTest();
     }
@@ -54,7 +54,7 @@ public class IgnitePdsTxHistoricalRebalancingTest extends IgnitePdsTxCacheRebala
 
         IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi.cleanup();
 
-        System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
+        System.clearProperty(IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE);
 
         super.afterTest();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
index 9eaafd6..7db500a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
@@ -117,8 +117,9 @@ public class FullHistRebalanceOnClientStopTest extends GridCommonAbstractTest {
         startClientGrid(5);
 
         final int entryCnt = PARTS_CNT * 1000;
+        final int preloadEntryCnt = PARTS_CNT * 1001;
 
-        for (int i = 0; i < entryCnt; i++)
+        for (int i = 0; i < preloadEntryCnt; i++)
             cache.put(i, i);
 
         forceCheckpoint();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java
index 9b21b49..9c36299 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java
@@ -171,7 +171,8 @@ public class IgniteWalRebalanceLoggingTest extends GridCommonAbstractTest {
     @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "1")
     public void testFullRebalanceWithShortCpHistoryLogMsgs() throws Exception {
         LogListener expMsgsLsnr = LogListener.
-            matches(str -> str.startsWith("Partitions were reserved, but maximum available counter is greater than demanded: ") &&
+            matches(str -> str.startsWith("Partitions were reserved, but maximum available counter is greater than " +
+                "demanded or WAL contains too many updates: ") &&
                 str.contains("grp=cache_group1") && str.contains("grp=cache_group2")).
             andMatches(str -> str.startsWith("Starting rebalance routine") &&
                 (str.contains("cache_group1") || str.contains("cache_group2")) &&
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 191be72..afa06f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -198,12 +198,13 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         IgniteEx ig1 = startGrid(1);
 
         final int entryCnt = PARTS_CNT * 100;
+        final int preloadEntryCnt = PARTS_CNT * 101;
 
         ig0.cluster().active(true);
 
         IgniteCache<Object, Object> cache = ig0.cache(CACHE_NAME);
 
-        for (int k = 0; k < entryCnt; k++)
+        for (int k = 0; k < preloadEntryCnt; k++)
             cache.put(k, new IndexedObject(k));
 
         forceCheckpoint();
@@ -240,12 +241,13 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         IgniteEx ig1 = startGrid(1);
 
         final int entryCnt = PARTS_CNT * 100;
+        final int preloadEntryCnt = PARTS_CNT * 135;
 
         ig0.cluster().active(true);
 
         IgniteCache<Object, Object> cache = ig0.cache(CACHE_NAME);
 
-        for (int k = 0; k < entryCnt; k++)
+        for (int k = 0; k < preloadEntryCnt; k++)
             cache.put(k, new IndexedObject(k));
 
         forceCheckpoint();
@@ -293,11 +295,12 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         crd.cluster().active(true);
 
         final int entryCnt = PARTS_CNT * 10;
+        final int preloadEntryCnt = PARTS_CNT * 11;
 
         {
             IgniteCache<Object, Object> cache = crd.cache(CACHE_NAME);
 
-            for (int k = 0; k < entryCnt; k++)
+            for (int k = 0; k < preloadEntryCnt; k++)
                 cache.put(k, new IndexedObject(k - 1));
         }
 
@@ -385,10 +388,14 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
 
         final int entryCnt = PARTS_CNT * 10;
 
+        final int preloadEntryCnt = PARTS_CNT * 11;
+
         {
             IgniteCache<Object, Object> cache = crd.cache(CACHE_NAME);
 
-            for (int k = 0; k < entryCnt; k++)
+            //Preload should be more that data coming through historical rebalance
+            //Otherwise cluster may to choose a full rebalance instead of historical one.
+            for (int k = 0; k < preloadEntryCnt; k++)
                 cache.put(k, new IndexedObject(k - 1));
         }
 
@@ -466,11 +473,12 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
         crd.cluster().active(true);
 
         final int entryCnt = PARTS_CNT * 10;
+        final int preloadEntryCnt = PARTS_CNT * 11;
 
         {
             IgniteCache<Object, Object> cache = crd.cache(CACHE_NAME);
 
-            for (int k = 0; k < entryCnt; k++)
+            for (int k = 0; k < preloadEntryCnt; k++)
                 cache.put(k, new IndexedObject(k - 1));
         }
 
@@ -579,7 +587,9 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
 
         // Fill initial data and force checkpoint.
         final int entryCnt = PARTS_CNT * 200;
-        for (int k = 0; k < entryCnt; k++)
+        final int preloadEntryCnt = PARTS_CNT * 201;
+
+        for (int k = 0; k < preloadEntryCnt; k++)
             cache0.put(k, new IndexedObject(k));
 
         forceCheckpoint();
@@ -823,7 +833,9 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
 
         // Fill initial data.
         final int entryCnt = PARTS_CNT * 200;
-        for (int k = 0; k < entryCnt; k++) {
+        final int preloadEntryCnt = PARTS_CNT * 400;
+
+        for (int k = 0; k < preloadEntryCnt; k++) {
             c1.put(k, new IndexedObject(k));
 
             c2.put(k, new IndexedObject(k));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.java
index 2eff7ca..e808b91 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.java
@@ -19,11 +19,11 @@ package org.apache.ignite.internal.processors.cache.transactions;
 
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
 
 /**
  * Test partitions consistency in various scenarios when all rebalance is historical.
  */
-@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
 public class AtomicPartitionCounterStateConsistencyHistoryRebalanceTest extends AtomicPartitionCounterStateConsistencyTest {
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
index 54dece3..1e56992 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
@@ -19,11 +19,11 @@ package org.apache.ignite.internal.processors.cache.transactions;
 
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
 
 /**
  * Test partitions consistency in various scenarios when all rebalance is historical.
  */
-@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
 public class TxPartitionCounterStateConsistencyHistoryRebalanceTest extends TxPartitionCounterStateConsistencyTest {
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest.java
index b2da089..bcad022 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest.java
@@ -20,11 +20,11 @@ package org.apache.ignite.internal.processors.cache.transactions;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceTest;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
 
 /**
  */
-@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
 public class TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest
     extends TxPartitionCounterStateOnePrimaryOneBackupTest {
     /** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest.java
index 6a50a38..da7150b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest.java
@@ -22,12 +22,12 @@ import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
 
 /**
  * Tests partition consistency recovery in case then all owners are lost in the middle of transaction.
  */
-@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
 public class TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest extends
     TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest {
     /** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest.java
index 1877f9b..e6c64ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest.java
@@ -22,12 +22,12 @@ import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
 
 /**
  *
  */
-@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
 public class TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest
     extends TxPartitionCounterStateOnePrimaryTwoBackupsTest {
     /** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
index c988311..400ef14 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
@@ -581,7 +581,11 @@ public class DistributedMetaStoragePersistentTest extends DistributedMetaStorage
 
         // Check that the value was actually persisted to the storage.
 
-        startGrid(0);
+        IgniteEx ignite0 = startGrid(0);
+
+        awaitPartitionMapExchange();
+
+        assertSame(ignite0.cluster().state(), ClusterState.ACTIVE);
 
         assertEquals("value", metastorage(0).read(longKey));
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index d930b3b..5eeca92 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceHeuristicsTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteDataStorageMetricsSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheStartStopWithFreqCheckpointTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedStoreTest;
@@ -259,5 +260,7 @@ public class IgnitePdsTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, WalPreloadingConcurrentTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, IgniteWalRebalanceLoggingTest.class, ignoredTests);
+
+        GridTestUtils.addTestIfNeeded(suite, HistoricalRebalanceHeuristicsTest.class, ignoredTests);
     }
 }