You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ir...@apache.org on 2020/06/23 12:35:53 UTC
[ignite] branch master updated: IGNITE-13168 Retrigger historical
rebalance if it was cancelled in case WAL history is still available -
Fixes #7948.
This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c915cea IGNITE-13168 Retrigger historical rebalance if it was cancelled in case WAL history is still available - Fixes #7948.
c915cea is described below
commit c915ceaa2d12f73ae653f894d7159176aed7f9a3
Author: vd_pyatkov <vl...@gmail.com>
AuthorDate: Tue Jun 23 15:35:09 2020 +0300
IGNITE-13168 Retrigger historical rebalance if it was cancelled in case WAL history is still available - Fixes #7948.
Signed-off-by: Ivan Rakov <iv...@gmail.com>
---
.../preloader/GridDhtPartitionsExchangeFuture.java | 167 +++++++----
.../dht/preloader/GridDhtPreloader.java | 9 +-
.../IgniteDhtPartitionHistorySuppliersMap.java | 15 +-
.../db/wal/WalRebalanceRestartTest.java | 321 +++++++++++++++++++++
.../TxPartitionCounterStateConsistencyTest.java | 121 ++++----
.../ignite/testsuites/IgnitePdsTestSuite4.java | 2 +
6 files changed, 513 insertions(+), 122 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 4f9e1dd..c940813 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -27,8 +27,10 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@@ -532,9 +534,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param grpId Cache group ID.
* @param partId Partition ID.
* @param cntrSince Partition update counter since history supplying is requested.
- * @return ID of history supplier node or null if it doesn't exist.
+ * @return List of IDs of history supplier nodes or empty list if these doesn't exist.
*/
- @Nullable public UUID partitionHistorySupplier(int grpId, int partId, long cntrSince) {
+ @Nullable public List<UUID> partitionHistorySupplier(int grpId, int partId, long cntrSince) {
return partHistSuppliers.getSupplier(grpId, partId, cntrSince);
}
@@ -3274,10 +3276,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* Collects and determines new owners of partitions for all nodes for given {@code top}.
*
* @param top Topology to assign.
+ * @param resetOwners True if need to reset partition state considering of counter, false otherwise.
*/
- private void assignPartitionStates(GridDhtPartitionTopology top) {
+ private void assignPartitionStates(GridDhtPartitionTopology top, boolean resetOwners) {
Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
- Map<Integer, Long> minCntrs = new HashMap<>();
+ Map<Integer, TreeSet<Long>> varCntrs = new HashMap<>();
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
CachePartitionPartialCountersMap nodeCntrs = e.getValue().partitionUpdateCounters(top.groupId(),
@@ -3288,9 +3291,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
for (int i = 0; i < nodeCntrs.size(); i++) {
int p = nodeCntrs.partitionAt(i);
- UUID uuid = e.getKey();
+ UUID remoteNodeId = e.getKey();
- GridDhtPartitionState state = top.partitionState(uuid, p);
+ GridDhtPartitionState state = top.partitionState(remoteNodeId, p);
if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING)
continue;
@@ -3299,10 +3302,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
nodeCntrs.initialUpdateCounterAt(i) :
nodeCntrs.updateCounterAt(i);
- Long minCntr = minCntrs.get(p);
-
- if (minCntr == null || minCntr > cntr)
- minCntrs.put(p, cntr);
+ varCntrs.computeIfAbsent(p, key -> new TreeSet<>()).add(cntr);
if (state != GridDhtPartitionState.OWNING)
continue;
@@ -3310,9 +3310,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
CounterWithNodes maxCntr = maxCntrs.get(p);
if (maxCntr == null || cntr > maxCntr.cnt)
- maxCntrs.put(p, new CounterWithNodes(cntr, e.getValue().partitionSizes(top.groupId()).get(p), uuid));
+ maxCntrs.put(p, new CounterWithNodes(cntr, e.getValue().partitionSizes(top.groupId()).get(p), remoteNodeId));
else if (cntr == maxCntr.cnt)
- maxCntr.nodes.add(uuid);
+ maxCntr.nodes.add(remoteNodeId);
}
}
@@ -3323,12 +3323,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING)
continue;
- final long cntr = state == GridDhtPartitionState.MOVING ? part.initialUpdateCounter() : part.updateCounter();
-
- Long minCntr = minCntrs.get(part.id());
+ final long cntr = state == GridDhtPartitionState.MOVING ?
+ part.initialUpdateCounter() :
+ part.updateCounter();
- if (minCntr == null || minCntr > cntr)
- minCntrs.put(part.id(), cntr);
+ varCntrs.computeIfAbsent(part.id(), key -> new TreeSet<>()).add(cntr);
if (state != GridDhtPartitionState.OWNING)
continue;
@@ -3351,68 +3350,114 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
maxCntr.nodes.add(cctx.localNodeId());
}
+ Set<Integer> haveHistory = new HashSet<>();
+
+ assignHistoricalSuppliers(top, maxCntrs, varCntrs, haveHistory);
+
+ if (resetOwners)
+ resetOwnersByCounter(top, maxCntrs, haveHistory);
+ }
+
+ /**
+ * Determine new owners for partitions.
+ * If anyone of OWNING partitions have a counter less than maximum this partition changes state to MOVING forcibly.
+ *
+ * @param top Topology.
+ * @param maxCntrs Max counter partiton map.
+ * @param haveHistory Set of partitions witch have historical supplier.
+ */
+ private void resetOwnersByCounter(GridDhtPartitionTopology top,
+ Map<Integer, CounterWithNodes> maxCntrs, Set<Integer> haveHistory) {
+ Map<Integer, Set<UUID>> ownersByUpdCounters = U.newHashMap(maxCntrs.size());
+ Map<Integer, Long> partSizes = U.newHashMap(maxCntrs.size());
+
+ for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet()) {
+ ownersByUpdCounters.put(e.getKey(), e.getValue().nodes);
+
+ partSizes.put(e.getKey(), e.getValue().size);
+ }
+
+ top.globalPartSizes(partSizes);
+
+ Map<UUID, Set<Integer>> partitionsToRebalance = top.resetOwners(
+ ownersByUpdCounters, haveHistory, this);
+
+ for (Map.Entry<UUID, Set<Integer>> e : partitionsToRebalance.entrySet()) {
+ UUID nodeId = e.getKey();
+ Set<Integer> parts = e.getValue();
+
+ for (int part : parts)
+ partsToReload.put(nodeId, top.groupId(), part);
+ }
+ }
+
+ /**
+ * Find and assign suppliers for history rebalance.
+ *
+ * @param top Topology.
+ * @param maxCntrs Max counter partiton map.
+ * @param varCntrs Various counters for each partition.
+ * @param haveHistory Set of partitions witch have historical supplier.
+ */
+ private void assignHistoricalSuppliers(
+ GridDhtPartitionTopology top,
+ Map<Integer, CounterWithNodes> maxCntrs,
+ Map<Integer, TreeSet<Long>> varCntrs,
+ Set<Integer> haveHistory
+ ) {
Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.groupId()) : null;
- Set<Integer> haveHistory = new HashSet<>();
-
- for (Map.Entry<Integer, Long> e : minCntrs.entrySet()) {
+ for (Map.Entry<Integer, TreeSet<Long>> e : varCntrs.entrySet()) {
int p = e.getKey();
- long minCntr = e.getValue();
CounterWithNodes maxCntrObj = maxCntrs.get(p);
long maxCntr = maxCntrObj != null ? maxCntrObj.cnt : 0;
- // If minimal counter is zero, do clean preloading.
- if (minCntr == 0 || minCntr == maxCntr)
+ NavigableSet<Long> nonMaxCntrs = e.getValue().headSet(maxCntr, false);
+
+ // If minimal counter equals maximum then historical supplier does not necessary.
+ if (nonMaxCntrs.isEmpty())
continue;
+ T2<UUID, Long> deepestReserved = new T2<>(null, Long.MAX_VALUE);
+
if (localReserved != null) {
Long localHistCntr = localReserved.get(p);
- if (localHistCntr != null && localHistCntr <= minCntr && maxCntrObj.nodes.contains(cctx.localNodeId())) {
- partHistSuppliers.put(cctx.localNodeId(), top.groupId(), p, localHistCntr);
+ if (localHistCntr != null && maxCntrObj.nodes.contains(cctx.localNodeId())) {
+ Long ceilingMinReserved = nonMaxCntrs.ceiling(localHistCntr);
- haveHistory.add(p);
+ if (ceilingMinReserved != null) {
+ partHistSuppliers.put(cctx.localNodeId(), top.groupId(), p, ceilingMinReserved);
- continue;
+ haveHistory.add(p);
+ }
+
+ if (deepestReserved.get2() > localHistCntr)
+ deepestReserved.set(cctx.localNodeId(), localHistCntr);
}
}
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e0 : msgs.entrySet()) {
Long histCntr = e0.getValue().partitionHistoryCounters(top.groupId()).get(p);
- if (histCntr != null && histCntr <= minCntr && maxCntrObj.nodes.contains(e0.getKey())) {
- partHistSuppliers.put(e0.getKey(), top.groupId(), p, histCntr);
+ if (histCntr != null && maxCntrObj.nodes.contains(e0.getKey())) {
+ Long ceilingMinReserved = nonMaxCntrs.ceiling(histCntr);
- haveHistory.add(p);
+ if (ceilingMinReserved != null) {
+ partHistSuppliers.put(e0.getKey(), top.groupId(), p, ceilingMinReserved);
- break;
+ haveHistory.add(p);
+ }
+
+ if (deepestReserved.get2() > histCntr)
+ deepestReserved.set(e0.getKey(), histCntr);
}
}
}
-
- Map<Integer, Set<UUID>> ownersByUpdCounters = new HashMap<>(maxCntrs.size());
- for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet())
- ownersByUpdCounters.put(e.getKey(), e.getValue().nodes);
-
- Map<Integer, Long> partSizes = new HashMap<>(maxCntrs.size());
- for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet())
- partSizes.put(e.getKey(), e.getValue().size);
-
- top.globalPartSizes(partSizes);
-
- Map<UUID, Set<Integer>> partitionsToRebalance = top.resetOwners(ownersByUpdCounters, haveHistory, this);
-
- for (Map.Entry<UUID, Set<Integer>> e : partitionsToRebalance.entrySet()) {
- UUID nodeId = e.getKey();
- Set<Integer> parts = e.getValue();
-
- for (int part : parts)
- partsToReload.put(nodeId, top.groupId(), part);
- }
}
/**
@@ -3692,7 +3737,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
assert firstDiscoEvt instanceof DiscoveryCustomEvent;
if (activateCluster() || changedBaseline())
- assignPartitionsStates();
+ assignPartitionsStates(true);
DiscoveryCustomMessage discoveryCustomMessage = ((DiscoveryCustomEvent) firstDiscoEvt).customMessage();
@@ -3703,18 +3748,20 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (!F.isEmpty(caches))
resetLostPartitions(caches);
- assignPartitionsStates();
+ assignPartitionsStates(true);
}
}
else if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage
&& ((SnapshotDiscoveryMessage)discoveryCustomMessage).needAssignPartitions()) {
markAffinityReassign();
- assignPartitionsStates();
+ assignPartitionsStates(true);
}
}
else if (exchCtx.events().hasServerJoin())
- assignPartitionsStates();
+ assignPartitionsStates(true);
+ else if (exchCtx.events().hasServerLeft())
+ assignPartitionsStates(false);
// Recalculate new affinity based on partitions availability.
if (!exchCtx.mergeExchanges() && forceAffReassignment) {
@@ -4002,9 +4049,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
- *
+ * @param resetOwners True if reset partitions state needed, false otherwise.
*/
- private void assignPartitionsStates() {
+ private void assignPartitionsStates(boolean resetOwners) {
try {
U.doInParallel(
cctx.kernalContext().getSystemExecutorService(),
@@ -4016,10 +4063,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
? grpCtx.topology()
: cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
- if (!CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration()))
+ if (CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration()))
+ assignPartitionStates(top, resetOwners);
+ else if (resetOwners)
assignPartitionSizes(top);
- else
- assignPartitionStates(top);
return null;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 17a500b..23ff455 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -44,14 +44,15 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_REBALANCING_CANCELLATION_OPTIMIZATION;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
/**
@@ -249,10 +250,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
ClusterNode histSupplier = null;
if (grp.persistenceEnabled() && exchFut != null) {
- UUID nodeId = exchFut.partitionHistorySupplier(grp.groupId(), p, part.initialUpdateCounter());
+ List<UUID> nodeIds = exchFut.partitionHistorySupplier(grp.groupId(), p, part.initialUpdateCounter());
- if (nodeId != null)
- histSupplier = ctx.discovery().node(nodeId);
+ if (!F.isEmpty(nodeIds))
+ histSupplier = ctx.discovery().node(nodeIds.get(p % nodeIds.size()));
}
if (histSupplier != null && !exchFut.isClearingPartition(grp, p)) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
index 6755f28..427aad8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
@@ -19,7 +19,10 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.util.typedef.T2;
@@ -50,11 +53,13 @@ public class IgniteDhtPartitionHistorySuppliersMap implements Serializable {
* @param grpId Group ID.
* @param partId Partition ID.
* @param cntrSince Partition update counter since history supplying is requested.
- * @return Supplier UUID.
+ * @return List of supplier UUIDs or empty list if haven't these.
*/
- @Nullable public synchronized UUID getSupplier(int grpId, int partId, long cntrSince) {
+ public synchronized List<UUID> getSupplier(int grpId, int partId, long cntrSince) {
if (map == null)
- return null;
+ return Collections.EMPTY_LIST;
+
+ List<UUID> suppliers = new ArrayList<>();
for (Map.Entry<UUID, Map<T2<Integer, Integer>, Long>> e : map.entrySet()) {
UUID supplierNode = e.getKey();
@@ -62,10 +67,10 @@ public class IgniteDhtPartitionHistorySuppliersMap implements Serializable {
Long historyCounter = e.getValue().get(new T2<>(grpId, partId));
if (historyCounter != null && historyCounter <= cntrSince)
- return supplierNode;
+ suppliers.add(supplierNode);
}
- return null;
+ return suppliers;
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java
new file mode 100644
index 0000000..4432c58
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalRebalanceRestartTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * This test checks that historical rebalance can be restarted after canceling by some reason.
+ */
+@WithSystemProperty(key = IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
+public class WalRebalanceRestartTest extends GridCommonAbstractTest {
+
+ /** Version of progressing rebalance. */
+ private volatile AffinityTopologyVersion rebTopVer = null;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setConsistentId(igniteInstanceName)
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)
+ .setAffinity(new RendezvousAffinityFunction(false, 16))
+ .setBackups(2))
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setMaxSize(200L * 1024 * 1024)
+ .setPersistenceEnabled(true)));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * Restart rebalance manually.
+ *
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testForceReassignment() throws Exception {
+ restartRebalance((ignite) -> {
+ IgniteFuture manualRebFut = ignite.cache(DEFAULT_CACHE_NAME).rebalance();
+ }, false);
+ }
+
+ /**
+ * Restart rebalance when another server joined and baseline changed.
+ *
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testAnotherServerJoinedAndChangeBlt() throws Exception {
+ restartRebalance((ignite) -> {
+ startGrid("new_srv");
+
+ resetBaselineTopology();
+ }, true);
+ }
+
+ /**
+ * Restart rebalance when another server joined.
+ *
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testAnotherServerJoined() throws Exception {
+ restartRebalance((ignite) -> {
+ startGrid("new_srv");
+ }, true);
+ }
+
+ /**
+ * Restart rebalance when new cache started.
+ *
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testCacheStarted() throws Exception {
+ restartRebalance((ignite) -> {
+ ignite.getOrCreateCache("new_" + DEFAULT_CACHE_NAME);
+ }, true);
+ }
+
+ /**
+ * Restart rebalance when one of suppliers leaved topology.
+ *
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testStopSupplier() throws Exception {
+ restartRebalance((ignite) -> {
+ stopFirstFoundSupplier(ignite);
+ }, true);
+ }
+
+ /**
+ * This test starts empty node and stops one of supplier during a rebalance,
+ * in order to that historical rebalance recovers twice time.
+ *
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testStartNewNodeAndStopSupplier() throws Exception {
+ restartRebalance((ignite) -> {
+ startGrid("new_srv");
+
+ resetBaselineTopology();
+
+ waitForRebalanceOnLastDiscoTopology(ignite);
+
+ stopFirstFoundSupplier(ignite);
+ }, true);
+ }
+
+ /**
+ * Waiting for rebalancing on last topology which got through Discovery.
+ *
+ * @param ignite Ignite.
+ * @throws IgniteInterruptedCheckedException if failed.
+ */
+ private void waitForRebalanceOnLastDiscoTopology(IgniteEx ignite) throws IgniteInterruptedCheckedException {
+ AffinityTopologyVersion readyAffinity = ignite.context().cache().context().exchange().readyAffinityVersion();
+
+ assertTrue("Can not wait for rebalance topology [cur=" + rebTopVer + ", expect: " + readyAffinity + ']',
+ GridTestUtils.waitForCondition(() -> rebTopVer.equals(readyAffinity),
+ 10_000));
+ }
+
+ /**
+ * Stop supplier and start new node.
+ *
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testStopSupplierAndStartNewNode() throws Exception {
+ restartRebalance((ignite) -> {
+ stopFirstFoundSupplier(ignite);
+
+ waitForRebalanceOnLastDiscoTopology(ignite);
+
+ startGrid("new_srv");
+
+ resetBaselineTopology();
+ }, true);
+ }
+
+ /**
+ * Stop first found supplier for current rebalance on specific node.
+ *
+ * @param ignite Ignite.
+ */
+ private void stopFirstFoundSupplier(IgniteEx ignite) {
+ IgniteInternalFuture rebFut = ignite.cachex(DEFAULT_CACHE_NAME).context().preloader().rebalanceFuture();
+
+ assertFalse(rebFut.isDone());
+
+ Map<UUID, IgniteDhtDemandedPartitionsMap> remainding = U.field(rebFut, "remaining");
+
+ assertFalse(remainding.isEmpty());
+
+ UUID supplierId = remainding.keySet().iterator().next();
+
+ info("First dupplier: " + supplierId);
+
+ for (Ignite ign : G.allGrids()) {
+ if (ign.cluster().localNode().id().equals(supplierId))
+ ign.close();
+ }
+ }
+
+ /**
+ * Method hangs a rebalance on one node and invoke some trigger and check influence.
+ *
+ * @param retrigger Rebalance trigger.
+ * @param retriggerAsHistorical True means rebalance will be restarted as historical, false is as full.
+ * @throws Exception if failed.
+ */
+ private void restartRebalance(RebalanceRetrigger retrigger, boolean retriggerAsHistorical) throws Exception {
+ IgniteEx ignite0 = startGrids(4);
+
+ ignite0.cluster().active(true);
+
+ try (IgniteDataStreamer streamer = ignite0.dataStreamer(DEFAULT_CACHE_NAME)) {
+ streamer.allowOverwrite(true);
+
+ for (int i = 0; i < 1000; i++)
+ streamer.addData(i, String.valueOf(i));
+ }
+
+ awaitPartitionMapExchange();
+ forceCheckpoint();
+
+ ignite(2).close();
+
+ try (IgniteDataStreamer streamer = ignite0.dataStreamer(DEFAULT_CACHE_NAME)) {
+ streamer.allowOverwrite(true);
+
+ for (int i = 1000; i < 2000; i++)
+ streamer.addData(i, String.valueOf(i));
+ }
+
+ awaitPartitionMapExchange();
+ forceCheckpoint();
+
+ IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(2));
+
+ TestRecordingCommunicationSpi spi2 = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+ AtomicBoolean hasFullRebalance = new AtomicBoolean();
+
+ spi2.record((node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage)msg;
+
+ if (CU.cacheId(DEFAULT_CACHE_NAME) == demandMsg.groupId()) {
+ if (rebTopVer == null || rebTopVer.before(demandMsg.topologyVersion()))
+ rebTopVer = demandMsg.topologyVersion();
+
+ if (!F.isEmpty(demandMsg.partitions().fullSet()))
+ hasFullRebalance.compareAndSet(false, true);
+ }
+
+ }
+
+ return false;
+ });
+
+ spi2.blockMessages((node, msg) -> {
+ if (msg instanceof GridDhtPartitionDemandMessage) {
+ GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage)msg;
+
+ if (CU.cacheId(DEFAULT_CACHE_NAME) == demandMsg.groupId())
+ return true;
+ }
+
+ return false;
+
+ });
+
+ IgniteEx ignite2 = startGrid(optimize(cfg));
+
+ spi2.waitForBlocked();
+
+ assertFalse(hasFullRebalance.get());
+
+ retrigger.trigger(ignite2);
+
+ spi2.stopBlock();
+
+ awaitPartitionMapExchange();
+
+ if (retriggerAsHistorical)
+ assertFalse(hasFullRebalance.get());
+ else
+ assertTrue(hasFullRebalance.get());
+ }
+
+ /**
+ * Rebalance trigger interface.
+ */
+ private static interface RebalanceRetrigger {
+ /**
+ * Trigger some action.
+ *
+ * @param ignite Ignite.
+ * @throws Exception If issue happened.
+ */
+ public void trigger(IgniteEx ignite) throws Exception;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
index 7dbc157..afe82f1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
@@ -376,15 +376,17 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
prim.cache(DEFAULT_CACHE_NAME).put(keys.get(0), keys.get(0));
- TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(prim);
+ TestRecordingCommunicationSpi spiPrim = TestRecordingCommunicationSpi.spi(prim);
+ TestRecordingCommunicationSpi spiBack = TestRecordingCommunicationSpi.spi(backups.get(1));
- spi.blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
+ spiPrim.blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
+ spiBack.blockMessages((node, msg) -> msg instanceof GridDhtPartitionSupplyMessage);
IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
try {
- spi.waitForBlocked();
+ GridTestUtils.waitForCondition(() -> spiPrim.hasBlockedMessages() || spiBack.hasBlockedMessages(), 10_000);
}
- catch (InterruptedException e) {
+ catch (Exception e) {
fail(X.getFullStackTrace(e));
}
@@ -393,7 +395,8 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
doSleep(2000);
// Ensure queue cleanup is triggered before releasing supply message.
- spi.stopBlock();
+ spiPrim.stopBlock();
+ spiBack.stopBlock();
});
startGrid(backups.get(0).name());
@@ -440,33 +443,19 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
cache.put(p1Keys.get(1), 2);
cache.put(p2Keys.get(1), 1); // Will be fully rebalanced.
- TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(crd);
-
- // Prevent rebalance completion.
- spi.blockMessages((node, msg) -> {
- String name = (String)node.attributes().get(ATTR_IGNITE_INSTANCE_NAME);
-
- if (name.equals(backupName) && msg instanceof GridDhtPartitionSupplyMessage) {
- GridDhtPartitionSupplyMessage msg0 = (GridDhtPartitionSupplyMessage)msg;
-
- if (msg0.groupId() != CU.cacheId(DEFAULT_CACHE_NAME))
- return false;
-
- Map<Integer, CacheEntryInfoCollection> infos = U.field(msg0, "infos");
-
- return infos.keySet().contains(primaryParts[0]); // Delay historical rebalance.
- }
+ List<TestRecordingCommunicationSpi> spis = new ArrayList<>();
- return false;
- });
+ for (Ignite ignite: G.allGrids())
+ spis.add(blockSupplyFromNode(ignite, primaryParts, backupName));
backup = startGrid(backupName);
- spi.waitForBlocked();
+ GridTestUtils.waitForCondition(() -> spis.stream().anyMatch(TestRecordingCommunicationSpi::hasBlockedMessages),
+ 10_000);
forceCheckpoint(backup);
- spi.stopBlock();
+ spis.stream().forEach(TestRecordingCommunicationSpi::stopBlock);
// While message is delayed topology version shouldn't change to ideal.
awaitPartitionMapExchange();
@@ -524,30 +513,26 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
stopAllGrids();
- crd = startGrid(0);
+ crd = startNodeWithBlockingSupplying(0);
startGrid(1);
- startGrid(2);
-
- TestRecordingCommunicationSpi crdSpi = TestRecordingCommunicationSpi.spi(crd);
+ startNodeWithBlockingSupplying(2);
- // Block all rebalance from crd.
- crdSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
- @Override public boolean apply(ClusterNode node, Message msg) {
- return msg instanceof GridDhtPartitionSupplyMessage;
- }
- });
+ crd.cluster().active(true);
- crd.cluster().active(true); // Rebalancing is triggered on activation.
+ TestRecordingCommunicationSpi spi0 = TestRecordingCommunicationSpi.spi(crd);
+ TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(ignite(2));
IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
try {
- crdSpi.waitForBlocked();
+ GridTestUtils.waitForCondition(() -> spi0.hasBlockedMessages() || spi2.hasBlockedMessages(), 10_000);
// Stop before supplying rebalance. New rebalance must start with second backup as supplier
// doing full rebalance.
stopGrid(primName);
+
+ spi2.stopBlock();
}
- catch (InterruptedException e) {
+ catch (Exception e) {
fail();
}
});
@@ -572,6 +557,21 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
}
/**
+ * @param idx Starting node index.
+ * @return Ignite.
+ * @throws Exception If failed.
+ */
+ private Ignite startNodeWithBlockingSupplying(int idx) throws Exception {
+ IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(idx));
+
+ TestRecordingCommunicationSpi spi = (TestRecordingCommunicationSpi)cfg.getCommunicationSpi();
+
+ spi.blockMessages(GridDhtPartitionSupplyMessage.class, getTestIgniteInstanceName(1));
+
+ return startGrid(optimize(cfg));
+ }
+
+ /**
* Tests reproduces the problem: if node joins after missing some updates no partition inconsistency happens.
*
* @throws Exception If failed.
@@ -1346,7 +1346,35 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
cache.remove(keys.get(1));
- TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(crd);
+ List<TestRecordingCommunicationSpi> spis = new ArrayList<>();
+
+ for (Ignite ignite: G.allGrids())
+ spis.add(blockSupplyFromNode(ignite, primaryParts, backupName));
+
+ startGrid(backupName);
+
+ GridTestUtils.waitForCondition(() -> spis.stream().anyMatch(TestRecordingCommunicationSpi::hasBlockedMessages),
+ 10_000);
+
+ rebBlockClo.accept(backupName);
+
+ spis.stream().forEach(TestRecordingCommunicationSpi::stopBlock);
+
+ rebUnblockClo.accept(backupName);
+
+ awaitPartitionMapExchange();
+
+ assertPartitionsSame(idleVerify(crd, DEFAULT_CACHE_NAME));
+ }
+
+ /**
+ * @param ingine Ignite.
+ * @param primaryParts Array of partitions.
+ * @param backupName Name of node for which supply messages will be blocked.
+ * @return Test communication SPI.
+ */
+ private TestRecordingCommunicationSpi blockSupplyFromNode(Ignite ingine, int[] primaryParts, String backupName) {
+ TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(ingine);
// Prevent rebalance completion.
spi.blockMessages((node, msg) -> {
@@ -1365,20 +1393,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
return false;
});
-
- startGrid(backupName);
-
- spi.waitForBlocked();
-
- rebBlockClo.accept(backupName);
-
- spi.stopBlock();
-
- rebUnblockClo.accept(backupName);
-
- awaitPartitionMapExchange();
-
- assertPartitionsSame(idleVerify(crd, DEFAULT_CACHE_NAME));
+ return spi;
}
/** */
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index e9e25ca..17ca8a4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsParti
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsStartWIthEmptyArchive;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.HistoricalReservationTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRebalanceRestartTest;
import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.PageLockTrackerManagerTest;
import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.SharedPageLockTrackerTest;
import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.dumpprocessors.ToFileDumpProcessorTest;
@@ -86,6 +87,7 @@ public class IgnitePdsTestSuite4 {
GridTestUtils.addTestIfNeeded(suite, RebalanceCancellationTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, NotOptimizedRebalanceTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, BreakRebalanceChainTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, WalRebalanceRestartTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsRestartAfterFailedToWriteMetaPageTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgnitePdsRemoveDuringRebalancingTest.class, ignoredTests);