You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/03/17 17:54:03 UTC
[ignite] branch master updated: IGNITE-13374 Initial PME hangs
because of multiple blinking nodes fixed. Fixes #8850
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ba2e717 IGNITE-13374 Initial PME hangs because of multiple blinking nodes fixed. Fixes #8850
ba2e717 is described below
commit ba2e717ba192f3512393c392d3470011ad6b8e66
Author: Alexander Lapin <la...@gmail.com>
AuthorDate: Wed Mar 17 20:53:15 2021 +0300
IGNITE-13374 Initial PME hangs because of multiple blinking nodes fixed. Fixes #8850
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../preloader/GridDhtPartitionsExchangeFuture.java | 78 +++++++++-------
.../ClientFastReplyCoordinatorFailureTest.java | 100 +++++++++++++++++++++
2 files changed, 146 insertions(+), 32 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index b49dd73..032df1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -4551,6 +4551,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
crd = node; // Do not allow to process FullMessage from old coordinator.
+
+ processNonLocalCoordinatorChange(crd, node);
}
else {
if (log.isInfoEnabled()) {
@@ -5229,38 +5231,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
}
else {
- if (crdChanged) {
- for (Map.Entry<ClusterNode, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet()) {
- if (crd0.equals(m.getKey())) {
- if (log.isInfoEnabled()) {
- log.info("Coordinator changed, process pending full message [" +
- "ver=" + initialVersion() +
- ", crd=" + node.id() +
- ", pendingMsgNode=" + m.getKey() + ']');
- }
-
- processFullMessage(true, m.getKey(), m.getValue());
-
- if (isDone())
- return;
- }
- }
-
- if (log.isInfoEnabled()) {
- log.info("Coordinator changed, send partitions to new coordinator [" +
- "ver=" + initialVersion() +
- ", crd=" + node.id() +
- ", newCrd=" + crd0.id() + ']');
- }
-
- final ClusterNode newCrd = crd0;
-
- cctx.kernalContext().getSystemExecutorService().submit(new Runnable() {
- @Override public void run() {
- sendPartitions(newCrd);
- }
- });
- }
+ if (crdChanged && processNonLocalCoordinatorChange(crd0, node))
+ return;
}
}
catch (IgniteCheckedException e) {
@@ -5723,6 +5695,48 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
}
/**
+ * Process coordinator change on non local node.
+ *
+ * @param crd New coordinator.
+ * @param node Left node.
+ * @return {@code true} if exchange {@code isDone}
+ */
+ private boolean processNonLocalCoordinatorChange(ClusterNode crd, ClusterNode node) {
+ for (Map.Entry<ClusterNode, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet()) {
+ if (crd.equals(m.getKey())) {
+ if (log.isInfoEnabled()) {
+ log.info("Coordinator changed, process pending full message [" +
+ "ver=" + initialVersion() +
+ ", crd=" + node.id() +
+ ", pendingMsgNode=" + m.getKey() + ']');
+ }
+
+ processFullMessage(true, m.getKey(), m.getValue());
+
+ if (isDone())
+ return true;
+ }
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Coordinator changed, send partitions to new coordinator [" +
+ "ver=" + initialVersion() +
+ ", crd=" + node.id() +
+ ", newCrd=" + crd.id() + ']');
+ }
+
+ final ClusterNode newCrd = crd;
+
+ cctx.kernalContext().getSystemExecutorService().submit(new Runnable() {
+ @Override public void run() {
+ sendPartitions(newCrd);
+ }
+ });
+
+ return false;
+ }
+
+ /**
*
*/
private static class FinishState {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java
index 6616b96..17b93fe 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ClientFastReplyCoordinatorFailureTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
@@ -26,10 +27,14 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.After;
@@ -51,6 +56,12 @@ public class ClientFastReplyCoordinatorFailureTest extends GridCommonAbstractTes
/** Latch that will be triggered after blocking message from new server to old coordinator. */
private final CountDownLatch newSrvSingleMesssageLatch = new CountDownLatch(1);
+ /** */
+ private static final CountDownLatch PART_SINGLE_REQ_MSG_LATCH = new CountDownLatch(1);
+
+ /** */
+ private boolean delayNodeFailedMsg;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -77,6 +88,13 @@ public class ClientFastReplyCoordinatorFailureTest extends GridCommonAbstractTes
return false;
}
});
+
+ if (delayNodeFailedMsg) {
+ TcpDiscoverySpi spi = new TestDiscoverySpi();
+ spi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(spi);
+ }
}
else if (getTestIgniteInstanceName(3).equals(igniteInstanceName)) {
commSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@@ -94,6 +112,26 @@ public class ClientFastReplyCoordinatorFailureTest extends GridCommonAbstractTes
}
});
}
+ else if (delayNodeFailedMsg) {
+ commSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridDhtPartitionsSingleRequest && node.isClient()) {
+ GridTestUtils.runAsync(() -> {
+ try {
+ Thread.sleep(1_000);
+ }
+ catch (InterruptedException ignore) {
+ // No-op.
+ }
+
+ PART_SINGLE_REQ_MSG_LATCH.countDown();
+ });
+ }
+
+ return false;
+ }
+ });
+ }
cfg.setCommunicationSpi(commSpi);
@@ -135,4 +173,66 @@ public class ClientFastReplyCoordinatorFailureTest extends GridCommonAbstractTes
startFut.get();
}
+
+ /**
+ * Reproduces scenario of race between GridDhtPartitionsSingleRequest arriving at client and updating coordinator
+ * and onNodeLeft event for the same coordinator change event
+ * that should trigger resending SingleMessage from client to new coordinator node.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testClientRepeatedReply() throws Exception {
+ delayNodeFailedMsg = true;
+
+ startGrids(3);
+
+ awaitPartitionMapExchange();
+
+ // Server start will be blocked.
+ GridTestUtils.runAsync(() -> startGrid(3));
+
+ newSrvSingleMesssageLatch.await();
+
+ // Client join will be hanging on local join exchange.
+ IgniteInternalFuture<Ignite> startFut =
+ GridTestUtils.runAsync(() -> startClientGrid("client-1"));
+
+ clientSingleMesssageLatch.await();
+
+ stopGrid(0);
+
+ assertTrue(
+ GridTestUtils.waitForCondition(() -> {
+ try {
+ startFut.get();
+ }
+ catch (IgniteCheckedException e) {
+ error("Failed when waiting for client start future to complete", e);
+ }
+
+ return true;
+ }, 10_000)
+ );
+ }
+
+ /**
+ * Used on client node, allows to postpone processing NODE_FAILED event
+ * and give GridDhtPartitionsSingleRequest to arrive earlier than discovery event is processed.
+ */
+ public static class TestDiscoverySpi extends TcpDiscoverySpi {
+ /** {@inheritDoc] */
+ @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+ if (msg instanceof TcpDiscoveryNodeLeftMessage || msg instanceof TcpDiscoveryNodeFailedMessage) {
+ try {
+ PART_SINGLE_REQ_MSG_LATCH.await();
+ }
+ catch (InterruptedException e) {
+ // no-op
+ }
+ }
+
+ super.startMessageProcess(msg);
+ }
+ }
}