You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2020/08/18 13:55:54 UTC
[ignite] branch master updated: IGNITE-13253 Advanced heuristics
for historical rebalance - Fixes #8160
This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b82b650 IGNITE-13253 Advanced heuristics for historical rebalance - Fixes #8160
b82b650 is described below
commit b82b65050c50e8820bd1480a38aefb2d20e4b0f1
Author: Ivan Rakov <iv...@gmail.com>
AuthorDate: Tue Aug 18 16:55:10 2020 +0300
IGNITE-13253 Advanced heuristics for historical rebalance - Fixes #8160
---
.../org/apache/ignite/IgniteSystemProperties.java | 7 +
.../preloader/GridDhtPartitionsExchangeFuture.java | 83 +++++++---
.../GridCacheDatabaseSharedManager.java | 18 ++-
.../apache/ignite/cache/CircledRebalanceTest.java | 15 +-
...CachePartitionLostAfterSupplierHasLeftTest.java | 11 +-
.../HistoricalRebalanceHeuristicsTest.java | 172 +++++++++++++++++++++
...itePdsAtomicCacheHistoricalRebalancingTest.java | 4 +-
.../IgnitePdsTxHistoricalRebalancingTest.java | 4 +-
.../db/FullHistRebalanceOnClientStopTest.java | 3 +-
.../db/wal/IgniteWalRebalanceLoggingTest.java | 3 +-
.../persistence/db/wal/IgniteWalRebalanceTest.java | 26 +++-
...ounterStateConsistencyHistoryRebalanceTest.java | 4 +-
...ounterStateConsistencyHistoryRebalanceTest.java | 4 +-
...ateOnePrimaryOneBackupHistoryRebalanceTest.java | 4 +-
...imaryTwoBackupsFailAllHistoryRebalanceTest.java | 4 +-
...teOnePrimaryTwoBackupsHistoryRebalanceTest.java | 4 +-
.../DistributedMetaStoragePersistentTest.java | 6 +-
.../ignite/testsuites/IgnitePdsTestSuite2.java | 3 +
18 files changed, 319 insertions(+), 56 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 18b351f..d1f59af 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -746,6 +746,13 @@ public final class IgniteSystemProperties {
*/
public static final String IGNITE_PDS_WAL_REBALANCE_THRESHOLD = "IGNITE_PDS_WAL_REBALANCE_THRESHOLD";
+ /**
+ * Prefer historical rebalance if there's enough history regardless off all heuristics.
+ * This property is intended for integration or performance tests.
+ * Default is {@code false}.
+ */
+ public static final String IGNITE_PREFER_WAL_REBALANCE = "IGNITE_PREFER_WAL_REBALANCE";
+
/** Ignite page memory concurrency level. */
public static final String IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL = "IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL";
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 2cc9f76..0dd4b35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -100,6 +100,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator;
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -3526,7 +3527,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
) {
Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
- Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.groupId()) : null;
+ int grpId = top.groupId();
+
+ Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(grpId) : null;
List<SupplyPartitionInfo> list = new ArrayList<>();
@@ -3549,33 +3552,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
Long localHistCntr = localReserved.get(p);
if (localHistCntr != null && maxCntrObj.nodes.contains(cctx.localNodeId())) {
- Long ceilingMinReserved = nonMaxCntrs.ceiling(localHistCntr);
-
- if (ceilingMinReserved != null) {
- partHistSuppliers.put(cctx.localNodeId(), top.groupId(), p, ceilingMinReserved);
-
- haveHistory.add(p);
- }
-
- if (deepestReserved.get2() > localHistCntr)
- deepestReserved.set(cctx.localNodeId(), localHistCntr);
+ findCounterForReservation(grpId, p, maxCntr, localHistCntr, maxCntrObj.size, cctx.localNodeId(),
+ nonMaxCntrs, haveHistory, deepestReserved);
}
}
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e0 : msgs.entrySet()) {
- Long histCntr = e0.getValue().partitionHistoryCounters(top.groupId()).get(p);
+ Long histCntr = e0.getValue().partitionHistoryCounters(grpId).get(p);
if (histCntr != null && maxCntrObj.nodes.contains(e0.getKey())) {
- Long ceilingMinReserved = nonMaxCntrs.ceiling(histCntr);
-
- if (ceilingMinReserved != null) {
- partHistSuppliers.put(e0.getKey(), top.groupId(), p, ceilingMinReserved);
-
- haveHistory.add(p);
- }
-
- if (deepestReserved.get2() > histCntr)
- deepestReserved.set(e0.getKey(), histCntr);
+ findCounterForReservation(grpId, p, maxCntr, histCntr, maxCntrObj.size, e0.getKey(),
+ nonMaxCntrs, haveHistory, deepestReserved);
}
}
@@ -3594,6 +3581,55 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * Finds a historical counter in WAL of partition owner node, so that WAL interval from the historical counter
+ * to a max counter contains lesser updates that full partition size.
+ * If all conditions for ownerId to become a historical supplier are matched, it is added to partHistSuppliers map.
+ *
+ * @param grpId Id of cache group.
+ * @param p Partition Id.
+ * @param maxOwnerCntr Counter of owning partition.
+ * @param ownerReservedHistCntr Min counter which can be reserved for this partition.
+ * @param ownerSize Size of owned partition.
+ * @param ownerId Owner node id.
+ * @param nonMaxCntrs Sorted set of non max counters.
+ * @param haveHistory Modifiable collection for partitions that will be rebalanced historically.
+ * @param deepestReserved The Deepest reservation per node id.
+ */
+ private void findCounterForReservation(
+ int grpId,
+ int p,
+ long maxOwnerCntr,
+ Long ownerReservedHistCntr,
+ long ownerSize,
+ UUID ownerId,
+ NavigableSet<Long> nonMaxCntrs,
+ Set<Integer> haveHistory,
+ T2<UUID, Long> deepestReserved
+ ) {
+ boolean preferWalRebalance = ((GridCacheDatabaseSharedManager)cctx.database()).preferWalRebalance();
+
+ while (!nonMaxCntrs.isEmpty()) {
+ Long ceilingMinReserved = nonMaxCntrs.ceiling(ownerReservedHistCntr);
+
+ if (ceilingMinReserved == null)
+ break;
+
+ if (preferWalRebalance || maxOwnerCntr - ceilingMinReserved < ownerSize) {
+ partHistSuppliers.put(ownerId, grpId, p, ceilingMinReserved);
+
+ haveHistory.add(p);
+
+ break;
+ }
+
+ nonMaxCntrs = nonMaxCntrs.tailSet(ceilingMinReserved, false);
+ }
+
+ if (deepestReserved.get2() > ownerReservedHistCntr)
+ deepestReserved.set(ownerId, ownerReservedHistCntr);
+ }
+
+ /**
* Detect lost partitions in case of node left or failed. For topology coordinator is called when all {@link
* GridDhtPartitionsSingleMessage} were received. For other nodes is called when exchange future is completed by
* {@link GridDhtPartitionsFullMessage}.
@@ -4244,7 +4280,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
if (hasPartitionToLog(supplyInfoMap, true)) {
- log.info("Partitions were reserved, but maximum available counter is greater than demanded: [" +
+ log.info("Partitions were reserved, but maximum available counter is greater than demanded " +
+ "or WAL contains too many updates: [" +
supplyInfoMap.entrySet().stream().map(entry ->
"[grp=" + entry.getKey() + ' ' +
entry.getValue().stream().filter(SupplyPartitionInfo::isHistoryReserved).map(info ->
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index e263d38..246e363 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -165,6 +165,7 @@ import static java.nio.file.StandardOpenOption.READ;
import static java.util.Objects.nonNull;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_RECOVERY_SEMAPHORE_PERMITS;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -214,7 +215,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private static final double PAGE_LIST_CACHE_LIMIT_THRESHOLD = 0.1;
/** */
- private final int walRebalanceThreshold = getInteger(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000);
+ private final int walRebalanceThreshold = getInteger(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500);
+
+ /** Prefer historical rebalance flag. */
+ private final boolean preferWalRebalance = getBoolean(IGNITE_PREFER_WAL_REBALANCE);
/** Value of property for throttling policy override. */
private final String throttlingPolicyOverride = IgniteSystemProperties.getString(
@@ -414,6 +418,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
+ * Returns true if historical rebalance is preferred,
+ * false means using heuristic for determine rebalance type.
+ *
+ * @return Flag of preferred historical rebalance.
+ */
+ public boolean preferWalRebalance() {
+ return preferWalRebalance;
+ }
+
+ /**
* For test use only.
*/
public IgniteInternalFuture<Void> enableCheckpoints(boolean enable) {
@@ -1739,7 +1753,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
continue;
for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) {
- if (locPart.state() == OWNING && locPart.fullSize() > walRebalanceThreshold)
+ if (locPart.state() == OWNING && (preferWalRebalance() || locPart.fullSize() > walRebalanceThreshold))
res.computeIfAbsent(grp.groupId(), k -> new HashSet<>()).add(locPart.id());
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java
index 6e49dee..ad0da5a 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/CircledRebalanceTest.java
@@ -49,6 +49,9 @@ public class CircledRebalanceTest extends GridCommonAbstractTest {
/** Count of restart iterations. */
public static final int ITERATIONS = 10;
+ /** Cache partiton count. */
+ public static final int PARTS = 64;
+
/** Count of backup for default cache. */
private int backups = 1;
@@ -65,7 +68,7 @@ public class CircledRebalanceTest extends GridCommonAbstractTest {
.setPersistenceEnabled(true)))
.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
- .setAffinity(new RendezvousAffinityFunction(false, 64))
+ .setAffinity(new RendezvousAffinityFunction(false, PARTS))
.setBackups(backups));
}
@@ -177,11 +180,11 @@ public class CircledRebalanceTest extends GridCommonAbstractTest {
}
/**
- * Load some data to default cache.
+ * Load keys for preloading significant more than for updates and covers all cache partitions.
*
- * @param sequentially True for loading sequential keys, false for random.
+ * @param preload True for preloading keys, false otherwise.
*/
- public void loadData(boolean sequentially) {
+ public void loadData(boolean preload) {
Random random = new Random();
Ignite ignite = G.allGrids().get(0);
@@ -189,10 +192,10 @@ public class CircledRebalanceTest extends GridCommonAbstractTest {
try (IgniteDataStreamer streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
streamer.allowOverwrite(true);
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < (preload ? 100 * PARTS : 100); i++) {
Integer ranDomKey = random.nextInt(10_000);
- streamer.addData(sequentially ? i : ranDomKey, "Val " + ranDomKey);
+ streamer.addData(preload ? i : ranDomKey, "Val " + ranDomKey);
}
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionLostAfterSupplierHasLeftTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionLostAfterSupplierHasLeftTest.java
index f28991e..096e5a8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionLostAfterSupplierHasLeftTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CachePartitionLostAfterSupplierHasLeftTest.java
@@ -42,16 +42,20 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Ignore;
import org.junit.Test;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
/**
* Test scenario: last supplier has left while a partition on demander is cleared before sending first demand request.
*/
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
public class CachePartitionLostAfterSupplierHasLeftTest extends GridCommonAbstractTest {
/** */
private static final int PARTS_CNT = 64;
@@ -137,6 +141,7 @@ public class CachePartitionLostAfterSupplierHasLeftTest extends GridCommonAbstra
* @throws Exception If failed.
*/
@Test
+ @WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
public void testPartitionLostWhileClearing_FailOnFullMessage() throws Exception {
lossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
persistence = true;
@@ -349,7 +354,11 @@ public class CachePartitionLostAfterSupplierHasLeftTest extends GridCommonAbstra
final GridDhtLocalPartition part = g1.cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(partId);
- assertEquals(GridDhtPartitionState.LOST, part.state());
+ assertTrue("Unexpected partition state [p=" + partId +
+ ", expected=" + GridDhtPartitionState.LOST +
+ ", actual=" + part.state() + ']',
+ GridTestUtils.waitForCondition(() -> part.state() == GridDhtPartitionState.LOST, 30_000));
+
assertTrue(g1.cachex(DEFAULT_CACHE_NAME).lostPartitions().contains(partId));
if (mode != 0) {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/HistoricalRebalanceHeuristicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/HistoricalRebalanceHeuristicsTest.java
new file mode 100644
index 0000000..a774be5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/HistoricalRebalanceHeuristicsTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+
+/**
+ * Checks that way of rebalancing is selected based on a heuristic:
+ * if number of updates to rebalance is greater than partition size, full rebalance should be used.
+ */
+@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@RunWith(Parameterized.class)
+public class HistoricalRebalanceHeuristicsTest extends GridCommonAbstractTest {
+ /** Initial keys. */
+ private static final int INITIAL_KEYS = 5000;
+
+ /** Atomicity mode. */
+ @Parameterized.Parameter()
+ public boolean historical;
+
+ /** Full rebalancing happened flag. */
+ private final AtomicBoolean fullRebalancingHappened = new AtomicBoolean(false);
+
+ /** Historical rebalancing happened flag. */
+ private final AtomicBoolean historicalRebalancingHappened = new AtomicBoolean(false);
+
+ /**
+ * @return List of versions pairs to test.
+ */
+ @Parameterized.Parameters(name = "historical = {0}")
+ public static Collection<Object[]> testData() {
+ List<Object[]> res = new ArrayList<>();
+
+ res.add(new Object[] {true});
+ res.add(new Object[] {false});
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(name);
+
+ cfg.setConsistentId(name);
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setMaxSize(DataStorageConfiguration.DFLT_DATA_REGION_INITIAL_SIZE)
+ .setPersistenceEnabled(true)
+ )
+ );
+
+ cfg.setCacheConfiguration(new CacheConfiguration()
+ .setAffinity(new RendezvousAffinityFunction(false, 8))
+ .setBackups(1)
+ .setName(DEFAULT_CACHE_NAME));
+
+ TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi();
+
+ spi.blockMessages((node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage)msg;
+
+ if (demandMsg.groupId() == CU.cacheId(DEFAULT_CACHE_NAME)) {
+ if (demandMsg.partitions().hasFull())
+ fullRebalancingHappened.set(true);
+
+ if (demandMsg.partitions().hasHistorical())
+ historicalRebalancingHappened.set(true);
+ }
+ }
+
+ return false;
+ });
+
+ cfg.setCommunicationSpi(spi);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ fullRebalancingHappened.set(false);
+ historicalRebalancingHappened.set(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * Checks that heuristic (see class header doc) works correctly.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testPutRemoveReorderingConsistency() throws Exception {
+ IgniteEx grid = startGrids(2);
+
+ grid.cluster().state(ClusterState.ACTIVE);
+
+ IgniteInternalCache<Integer, Integer> cache = grid.cachex(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < INITIAL_KEYS; i++)
+ cache.put(i, i);
+
+ forceCheckpoint();
+
+ stopGrid(1);
+
+ int limit = historical ? INITIAL_KEYS * 3 / 2 : INITIAL_KEYS * 5 / 2;
+
+ for (int i = INITIAL_KEYS; i < limit; i++)
+ cache.put(i % INITIAL_KEYS, i);
+
+ startGrid(1);
+
+ awaitPartitionMapExchange(true, true, null);
+
+ assertEquals(historical, historicalRebalancingHappened.get());
+
+ assertEquals(!historical, fullRebalancingHappened.get());
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
index cce9a40..64e800c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsAtomicCacheHistoricalRebalancingTest.java
@@ -42,7 +42,7 @@ public class IgnitePdsAtomicCacheHistoricalRebalancingTest extends IgnitePdsAtom
/** {@inheritDoc */
@Override protected void beforeTest() throws Exception {
// Use rebalance from WAL if possible.
- System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
+ System.setProperty(IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE, "true");
super.beforeTest();
}
@@ -54,7 +54,7 @@ public class IgnitePdsAtomicCacheHistoricalRebalancingTest extends IgnitePdsAtom
IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi.cleanup();
- System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
+ System.clearProperty(IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE);
super.afterTest();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java
index 8236bd3..628acfb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsTxHistoricalRebalancingTest.java
@@ -42,7 +42,7 @@ public class IgnitePdsTxHistoricalRebalancingTest extends IgnitePdsTxCacheRebala
/** {@inheritDoc */
@Override protected void beforeTest() throws Exception {
// Use rebalance from WAL if possible.
- System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
+ System.setProperty(IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE, "true");
super.beforeTest();
}
@@ -54,7 +54,7 @@ public class IgnitePdsTxHistoricalRebalancingTest extends IgnitePdsTxCacheRebala
IgniteWalRebalanceTest.WalRebalanceCheckingCommunicationSpi.cleanup();
- System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
+ System.clearProperty(IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE);
super.afterTest();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
index 9eaafd6..7db500a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/FullHistRebalanceOnClientStopTest.java
@@ -117,8 +117,9 @@ public class FullHistRebalanceOnClientStopTest extends GridCommonAbstractTest {
startClientGrid(5);
final int entryCnt = PARTS_CNT * 1000;
+ final int preloadEntryCnt = PARTS_CNT * 1001;
- for (int i = 0; i < entryCnt; i++)
+ for (int i = 0; i < preloadEntryCnt; i++)
cache.put(i, i);
forceCheckpoint();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java
index 9b21b49..9c36299 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceLoggingTest.java
@@ -171,7 +171,8 @@ public class IgniteWalRebalanceLoggingTest extends GridCommonAbstractTest {
@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "1")
public void testFullRebalanceWithShortCpHistoryLogMsgs() throws Exception {
LogListener expMsgsLsnr = LogListener.
- matches(str -> str.startsWith("Partitions were reserved, but maximum available counter is greater than demanded: ") &&
+ matches(str -> str.startsWith("Partitions were reserved, but maximum available counter is greater than " +
+ "demanded or WAL contains too many updates: ") &&
str.contains("grp=cache_group1") && str.contains("grp=cache_group2")).
andMatches(str -> str.startsWith("Starting rebalance routine") &&
(str.contains("cache_group1") || str.contains("cache_group2")) &&
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
index 191be72..afa06f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalRebalanceTest.java
@@ -198,12 +198,13 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
IgniteEx ig1 = startGrid(1);
final int entryCnt = PARTS_CNT * 100;
+ final int preloadEntryCnt = PARTS_CNT * 101;
ig0.cluster().active(true);
IgniteCache<Object, Object> cache = ig0.cache(CACHE_NAME);
- for (int k = 0; k < entryCnt; k++)
+ for (int k = 0; k < preloadEntryCnt; k++)
cache.put(k, new IndexedObject(k));
forceCheckpoint();
@@ -240,12 +241,13 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
IgniteEx ig1 = startGrid(1);
final int entryCnt = PARTS_CNT * 100;
+ final int preloadEntryCnt = PARTS_CNT * 135;
ig0.cluster().active(true);
IgniteCache<Object, Object> cache = ig0.cache(CACHE_NAME);
- for (int k = 0; k < entryCnt; k++)
+ for (int k = 0; k < preloadEntryCnt; k++)
cache.put(k, new IndexedObject(k));
forceCheckpoint();
@@ -293,11 +295,12 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
crd.cluster().active(true);
final int entryCnt = PARTS_CNT * 10;
+ final int preloadEntryCnt = PARTS_CNT * 11;
{
IgniteCache<Object, Object> cache = crd.cache(CACHE_NAME);
- for (int k = 0; k < entryCnt; k++)
+ for (int k = 0; k < preloadEntryCnt; k++)
cache.put(k, new IndexedObject(k - 1));
}
@@ -385,10 +388,14 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
final int entryCnt = PARTS_CNT * 10;
+ final int preloadEntryCnt = PARTS_CNT * 11;
+
{
IgniteCache<Object, Object> cache = crd.cache(CACHE_NAME);
- for (int k = 0; k < entryCnt; k++)
+ //Preload should be more that data coming through historical rebalance
+ //Otherwise cluster may to choose a full rebalance instead of historical one.
+ for (int k = 0; k < preloadEntryCnt; k++)
cache.put(k, new IndexedObject(k - 1));
}
@@ -466,11 +473,12 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
crd.cluster().active(true);
final int entryCnt = PARTS_CNT * 10;
+ final int preloadEntryCnt = PARTS_CNT * 11;
{
IgniteCache<Object, Object> cache = crd.cache(CACHE_NAME);
- for (int k = 0; k < entryCnt; k++)
+ for (int k = 0; k < preloadEntryCnt; k++)
cache.put(k, new IndexedObject(k - 1));
}
@@ -579,7 +587,9 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
// Fill initial data and force checkpoint.
final int entryCnt = PARTS_CNT * 200;
- for (int k = 0; k < entryCnt; k++)
+ final int preloadEntryCnt = PARTS_CNT * 201;
+
+ for (int k = 0; k < preloadEntryCnt; k++)
cache0.put(k, new IndexedObject(k));
forceCheckpoint();
@@ -823,7 +833,9 @@ public class IgniteWalRebalanceTest extends GridCommonAbstractTest {
// Fill initial data.
final int entryCnt = PARTS_CNT * 200;
- for (int k = 0; k < entryCnt; k++) {
+ final int preloadEntryCnt = PARTS_CNT * 400;
+
+ for (int k = 0; k < preloadEntryCnt; k++) {
c1.put(k, new IndexedObject(k));
c2.put(k, new IndexedObject(k));
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.java
index 2eff7ca..e808b91 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.java
@@ -19,11 +19,11 @@ package org.apache.ignite.internal.processors.cache.transactions;
import org.apache.ignite.testframework.junits.WithSystemProperty;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
/**
* Test partitions consistency in various scenarios when all rebalance is historical.
*/
-@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
public class AtomicPartitionCounterStateConsistencyHistoryRebalanceTest extends AtomicPartitionCounterStateConsistencyTest {
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
index 54dece3..1e56992 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyHistoryRebalanceTest.java
@@ -19,11 +19,11 @@ package org.apache.ignite.internal.processors.cache.transactions;
import org.apache.ignite.testframework.junits.WithSystemProperty;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
/**
* Test partitions consistency in various scenarios when all rebalance is historical.
*/
-@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
public class TxPartitionCounterStateConsistencyHistoryRebalanceTest extends TxPartitionCounterStateConsistencyTest {
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest.java
index b2da089..bcad022 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest.java
@@ -20,11 +20,11 @@ package org.apache.ignite.internal.processors.cache.transactions;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceTest;
import org.apache.ignite.testframework.junits.WithSystemProperty;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
/**
*/
-@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
public class TxPartitionCounterStateOnePrimaryOneBackupHistoryRebalanceTest
extends TxPartitionCounterStateOnePrimaryOneBackupTest {
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest.java
index 6a50a38..da7150b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest.java
@@ -22,12 +22,12 @@ import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.junit.Ignore;
import org.junit.Test;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
/**
* Tests partition consistency recovery in case then all owners are lost in the middle of transaction.
*/
-@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
public class TxPartitionCounterStateOnePrimaryTwoBackupsFailAllHistoryRebalanceTest extends
TxPartitionCounterStateOnePrimaryTwoBackupsFailAllTest {
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest.java
index 1877f9b..e6c64ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest.java
@@ -22,12 +22,12 @@ import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.junit.Ignore;
import org.junit.Test;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_PREFER_WAL_REBALANCE;
/**
*
*/
-@WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+@WithSystemProperty(key = IGNITE_PREFER_WAL_REBALANCE, value = "true")
public class TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest
extends TxPartitionCounterStateOnePrimaryTwoBackupsTest {
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
index c988311..400ef14 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/metastorage/DistributedMetaStoragePersistentTest.java
@@ -581,7 +581,11 @@ public class DistributedMetaStoragePersistentTest extends DistributedMetaStorage
// Check that the value was actually persisted to the storage.
- startGrid(0);
+ IgniteEx ignite0 = startGrid(0);
+
+ awaitPartitionMapExchange();
+
+ assertSame(ignite0.cluster().state(), ClusterState.ACTIVE);
assertEquals("value", metastorage(0).read(longKey));
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index d930b3b..5eeca92 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -20,6 +20,7 @@ package org.apache.ignite.testsuites;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.HistoricalRebalanceHeuristicsTest;
import org.apache.ignite.internal.processors.cache.persistence.IgniteDataStorageMetricsSelfTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCacheStartStopWithFreqCheckpointTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedStoreTest;
@@ -259,5 +260,7 @@ public class IgnitePdsTestSuite2 {
GridTestUtils.addTestIfNeeded(suite, WalPreloadingConcurrentTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteWalRebalanceLoggingTest.class, ignoredTests);
+
+ GridTestUtils.addTestIfNeeded(suite, HistoricalRebalanceHeuristicsTest.class, ignoredTests);
}
}