You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2019/05/07 15:21:44 UTC
[ignite] 36/41: GG-17478 Fixed wrong assert on affinity
initialization on node join
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch gg-18540
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 8baee8b280626e04157caa937e81a8b8b2ebbd8e
Author: Pavel Kovalenko <jo...@gmail.com>
AuthorDate: Mon May 6 12:21:52 2019 +0300
GG-17478 Fixed wrong assert on affinity initialization on node join
Signed-off-by: Pavel Kovalenko <jo...@gmail.com>
---
.../cache/CacheAffinitySharedManager.java | 2 +-
.../PartitionsExchangeCoordinatorFailoverTest.java | 147 +++++++++++++++++----
2 files changed, 120 insertions(+), 29 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 45f55da..29b4d81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1543,7 +1543,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
CacheGroupContext grp = cctx.cache().cacheGroup(holder.groupId());
if (affReq != null && affReq.contains(aff.groupId())) {
- assert AffinityTopologyVersion.NONE.equals(aff.lastVersion()) : aff.lastVersion();
+ assert resTopVer.compareTo(aff.lastVersion()) >= 0 : aff.lastVersion();
CacheGroupAffinityMessage affMsg = receivedAff.get(aff.groupId());
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
index fdef983..5cff138 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/PartitionsExchangeCoordinatorFailoverTest.java
@@ -16,12 +16,15 @@
package org.apache.ignite.internal.processors.cache;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -41,9 +44,9 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
import org.junit.Test;
@@ -55,8 +58,14 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
/** */
private static final String CACHE_NAME = "cache";
+ /** Coordinator node name. */
+ private static final String CRD_NONE = "crd";
+
+ /** */
+ private volatile Supplier<TcpCommunicationSpi> spiFactory = TcpCommunicationSpi::new;
+
/** */
- private Supplier<CommunicationSpi> spiFactory = TcpCommunicationSpi::new;
+ private boolean newCaches = true;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
@@ -64,7 +73,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
cfg.setConsistentId(igniteInstanceName);
- cfg.setCommunicationSpi(spiFactory.get());
+ cfg.setCommunicationSpi(spiFactory.get().setName("tcp"));
cfg.setCacheConfiguration(
new CacheConfiguration(CACHE_NAME)
@@ -73,7 +82,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
);
// Add cache that exists only on coordinator node.
- if (igniteInstanceName.equals("crd")) {
+ if (newCaches && igniteInstanceName.equals(CRD_NONE)) {
IgnitePredicate<ClusterNode> nodeFilter = node -> node.consistentId().equals(igniteInstanceName);
cfg.setCacheConfiguration(
@@ -106,7 +115,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
public void testNewCoordinatorCompletedExchange() throws Exception {
spiFactory = TestRecordingCommunicationSpi::new;
- IgniteEx crd = (IgniteEx) startGrid("crd");
+ IgniteEx crd = startGrid(CRD_NONE);
IgniteEx newCrd = startGrid(1);
@@ -121,13 +130,13 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
// Block FullMessage for newly joined nodes.
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(crd);
- final CountDownLatch sendFullMsgLatch = new CountDownLatch(1);
+ final CountDownLatch sndFullMsgLatch = new CountDownLatch(1);
// Delay sending full message to newly joined nodes.
spi.blockMessages((node, msg) -> {
if (msg instanceof GridDhtPartitionsFullMessage && node.order() > 2) {
try {
- sendFullMsgLatch.await();
+ sndFullMsgLatch.await();
}
catch (Throwable ignored) { }
@@ -155,13 +164,13 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
getTestTimeout()
);
- IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> stopGrid("crd", true, false));
+ IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> stopGrid(CRD_NONE, true, false));
// Magic sleep to make sure that coordinator stop process has started.
U.sleep(1000);
// Resume full messages sending to unblock coordinator stopping process.
- sendFullMsgLatch.countDown();
+ sndFullMsgLatch.countDown();
// Coordinator stop should succeed.
stopCrdFut.get();
@@ -192,7 +201,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
public void testDelayedFullMessageReplacedIfCoordinatorChanged() throws Exception {
spiFactory = TestRecordingCommunicationSpi::new;
- IgniteEx crd = startGrid("crd");
+ IgniteEx crd = startGrid(CRD_NONE);
IgniteEx newCrd = startGrid(1);
@@ -202,7 +211,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
awaitPartitionMapExchange();
- blockSendingFullMessage(crd, problemNode);
+ blockSendingFullMessage(crd, node -> node.equals(problemNode.localNode()));
IgniteInternalFuture joinNextNodeFut = GridTestUtils.runAsync(() -> startGrid(3));
@@ -210,11 +219,11 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
U.sleep(5000);
- blockSendingFullMessage(newCrd, problemNode);
+ blockSendingFullMessage(newCrd, node -> node.equals(problemNode.localNode()));
- IgniteInternalFuture stopCoordinatorFut = GridTestUtils.runAsync(() -> stopGrid("crd"));
+ IgniteInternalFuture stopCrdFut = GridTestUtils.runAsync(() -> stopGrid(CRD_NONE));
- stopCoordinatorFut.get();
+ stopCrdFut.get();
U.sleep(5000);
@@ -237,9 +246,9 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
final int delay = 5_000;
if (msg instanceof GridDhtPartitionDemandMessage) {
- GridDhtPartitionDemandMessage demandMessage = (GridDhtPartitionDemandMessage) msg;
+ GridDhtPartitionDemandMessage demandMsg = (GridDhtPartitionDemandMessage) msg;
- if (demandMessage.groupId() == GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME))
+ if (demandMsg.groupId() == GridCacheUtils.cacheId(GridCacheUtils.UTILITY_CACHE_NAME))
return 0;
return delay;
@@ -248,7 +257,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
return 0;
});
- final IgniteEx crd = startGrid("crd");
+ final IgniteEx crd = startGrid(CRD_NONE);
startGrid(1);
@@ -293,7 +302,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
U.sleep(2_500);
// And then stop coordinator node.
- stopGrid("crd", true);
+ stopGrid(CRD_NONE, true);
startNodeFut.get();
@@ -314,11 +323,93 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
}
/**
+ * Test checks that changing coordinator to a node that joining to cluster at the moment works correctly
+ * in case of exchanges merge and completed exchange on other joining nodes.
+ */
+ @Test
+ @WithSystemProperty(key = IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK, value = "true")
+ public void testChangeCoordinatorToLocallyJoiningNode() throws Exception {
+ newCaches = false;
+
+ spiFactory = TestRecordingCommunicationSpi::new;
+
+ IgniteEx crd = startGrid(CRD_NONE);
+
+ final int newCrdNodeIdx = 1;
+
+ // A full message shouldn't be send to new coordinator.
+ blockSendingFullMessage(crd, node -> node.consistentId().equals(getTestIgniteInstanceName(newCrdNodeIdx)));
+
+ CountDownLatch joiningNodeSentSingleMsg = new CountDownLatch(1);
+
+ // For next joining node delay sending single message to emulate exchanges merge.
+ spiFactory = () -> new DynamicDelayingCommunicationSpi(msg -> {
+ final int delay = 5_000;
+
+ if (msg instanceof GridDhtPartitionsSingleMessage) {
+ GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage) msg;
+
+ if (singleMsg.exchangeId() != null) {
+ joiningNodeSentSingleMsg.countDown();
+
+ return delay;
+ }
+ }
+
+ return 0;
+ });
+
+ IgniteInternalFuture<?> newCrdJoinFut = GridTestUtils.runAsync(() -> startGrid(newCrdNodeIdx));
+
+ // Wait till new coordinator node sent single message.
+ joiningNodeSentSingleMsg.await();
+
+ spiFactory = TcpCommunicationSpi::new;
+
+ // Additionally start 2 new nodes. Their exchange should be merged with exchange on join new coordinator node.
+ startGridsMultiThreaded(2, 2);
+
+ Assert.assertFalse("New coordinator join shouldn't be happened before stopping old coordinator.",
+ newCrdJoinFut.isDone());
+
+ // Stop coordinator.
+ stopGrid(CRD_NONE);
+
+ // New coordinator join process should succeed after that.
+ newCrdJoinFut.get();
+
+ awaitPartitionMapExchange();
+
+ // Check that affinity are equal on all nodes.
+ AffinityTopologyVersion affVer = ((IgniteEx) ignite(1)).cachex(CACHE_NAME)
+ .context().shared().exchange().readyAffinityVersion();
+
+ List<List<ClusterNode>> expAssignment = null;
+ IgniteEx expAssignmentNode = null;
+
+ for (Ignite node : G.allGrids()) {
+ IgniteEx nodeEx = (IgniteEx) node;
+
+ List<List<ClusterNode>> assignment = nodeEx.cachex(CACHE_NAME).context().affinity().assignments(affVer);
+
+ if (expAssignment == null) {
+ expAssignment = assignment;
+ expAssignmentNode = nodeEx;
+ }
+ else
+ Assert.assertEquals("Affinity assignments are different " +
+ "[expectedNode=" + expAssignmentNode + ", actualNode=" + nodeEx + "]", expAssignment, assignment);
+ }
+ }
+
+ /**
* Blocks sending full message from coordinator to non-coordinator node.
+ *
* @param from Coordinator node.
- * @param to Non-coordinator node.
+ * @param pred Non-coordinator node predicate.
+ * If predicate returns {@code true} a full message will not be send to that node.
*/
- private void blockSendingFullMessage(IgniteEx from, IgniteEx to) {
+ private void blockSendingFullMessage(IgniteEx from, Predicate<ClusterNode> pred) {
// Block FullMessage for newly joined nodes.
TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(from);
@@ -327,8 +418,8 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
if (msg instanceof GridDhtPartitionsFullMessage) {
GridDhtPartitionsFullMessage fullMsg = (GridDhtPartitionsFullMessage) msg;
- if (fullMsg.exchangeId() != null && node.order() == to.localNode().order()) {
- log.warning("Blocked sending " + msg + " to " + to.localNode());
+ if (fullMsg.exchangeId() != null && pred.test(node)) {
+ log.warning("Blocked sending " + msg + " to " + node);
return true;
}
@@ -341,9 +432,9 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
/**
* Communication SPI that allows to delay sending message by predicate.
*/
- class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi {
+ static class DynamicDelayingCommunicationSpi extends TcpCommunicationSpi {
/** Function that returns delay in milliseconds for given message. */
- private final Function<Message, Integer> delayMessageFunc;
+ private final Function<Message, Integer> delayMsgFunc;
/** */
DynamicDelayingCommunicationSpi() {
@@ -351,10 +442,10 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
}
/**
- * @param delayMessageFunc Function to calculate delay for message.
+ * @param delayMsgFunc Function to calculate delay for message.
*/
- DynamicDelayingCommunicationSpi(final Function<Message, Integer> delayMessageFunc) {
- this.delayMessageFunc = delayMessageFunc;
+ DynamicDelayingCommunicationSpi(final Function<Message, Integer> delayMsgFunc) {
+ this.delayMsgFunc = delayMsgFunc;
}
/** {@inheritDoc} */
@@ -363,7 +454,7 @@ public class PartitionsExchangeCoordinatorFailoverTest extends GridCommonAbstrac
try {
GridIoMessage ioMsg = (GridIoMessage)msg;
- int delay = delayMessageFunc.apply(ioMsg.message());
+ int delay = delayMsgFunc.apply(ioMsg.message());
if (delay > 0) {
log.warning(String.format("Delay sending %s to %s", msg, node));