You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2022/12/08 13:32:59 UTC
[ignite-3] branch main updated: IGNITE-18321 Return node to logical topology as soon as it gets returned to physical topology (#1412)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6e4d35e579 IGNITE-18321 Return node to logical topology as soon as it gets returned to physical topology (#1412)
6e4d35e579 is described below
commit 6e4d35e5797dd7ebed0fb2259d69c6fc3dd22730
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Dec 8 17:32:54 2022 +0400
IGNITE-18321 Return node to logical topology as soon as it gets returned to physical topology (#1412)
---
.../management/ClusterManagementGroupManager.java | 30 +++++++++++++-
.../management/raft/CmgRaftGroupListenerTest.java | 2 +-
.../ignite/network/DefaultMessagingService.java | 32 +++++++++++++++
.../internal/AbstractClusterIntegrationTest.java | 4 +-
.../internal/compute/ItLogicalTopologyTest.java | 48 +++++++++++++++++++++-
.../org/apache/ignite/internal/app/IgniteImpl.java | 13 ++++++
6 files changed, 123 insertions(+), 6 deletions(-)
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 7190955a46..d47feaa30d 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -125,6 +125,20 @@ public class ClusterManagementGroupManager implements IgniteComponent {
/** Handles cluster initialization flow. */
private final ClusterInitializer clusterInitializer;
+ /**
+ * Whether we attempted to complete join (i.e. send JoinReady command) on Ignite node start.
+ *
+ * <p>Such join completion always happens during a start, and it is always the last step during the startup process,
+ * to make sure a node joins the cluster when it's fully ready.
+ *
+ * <p>We need this flag to make sure we handle automatic rejoins correctly. If a short network hiccup happens, CMG leader
+ * might lose our node of sight, hence the node will be removed from physical and then from logical topologies. When
+ * the network connectivity is restored, the node will appear in the physical topology, after which it will try to
+ * rejoin the cluster. If such 'rejoin' was carried out unconditionally, it could happen before the first join during
+ * startup, so a not-yet-ready node could join the cluster.
+ */
+ private volatile boolean attemptedCompleteJoinOnStart = false;
+
/** Constructor. */
public ClusterManagementGroupManager(
VaultManager vault,
@@ -420,7 +434,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
if (service != null && service.nodeNames().equals(state.cmgNodes())) {
LOG.info("ClusterStateMessage received, but the CMG service is already started");
- return completedFuture(service);
+ return joinCluster(service, state.clusterTag());
}
if (service == null) {
@@ -448,11 +462,21 @@ public class ClusterManagementGroupManager implements IgniteComponent {
return initCmgRaftService(state);
})
- .thenCompose(Function.identity());
+ .thenCompose(Function.identity())
+ .thenCompose(this::completeJoinIfTryingToRejoin);
}
}
}
+ private CompletableFuture<CmgRaftService> completeJoinIfTryingToRejoin(CmgRaftService cmgRaftService) {
+ if (attemptedCompleteJoinOnStart) {
+ return cmgRaftService.completeJoinCluster()
+ .thenApply(unused -> cmgRaftService);
+ } else {
+ return completedFuture(cmgRaftService);
+ }
+ }
+
private CompletableFuture<CmgRaftService> joinCluster(CmgRaftService service, ClusterTag clusterTag) {
return service.startJoinCluster(clusterTag)
.thenApply(v -> service)
@@ -709,6 +733,8 @@ public class ClusterManagementGroupManager implements IgniteComponent {
return failedFuture(new NodeStoppingException());
}
+ attemptedCompleteJoinOnStart = true;
+
try {
return raftServiceAfterJoin().thenCompose(CmgRaftService::completeJoinCluster);
} finally {
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
index 743b96c05d..c6ce9b65a9 100644
--- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
@@ -122,7 +122,7 @@ public class CmgRaftGroupListenerTest {
}
@Test
- void unsuccessfulJoinReadyExecutesOnLogicalTopologyChanged() {
+ void unsuccessfulJoinReadyDoesNotExecuteOnLogicalTopologyChanged() {
listener.onWrite(iterator(msgFactory.joinReadyCommand().node(node).build()));
verify(onLogicalTopologyChanged, never()).accept(anyLong());
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index e86a48a59e..a984ddcfdc 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiPredicate;
import java.util.function.Function;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeStoppingException;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/** Default messaging service implementation. */
public class DefaultMessagingService extends AbstractMessagingService {
@@ -89,6 +91,9 @@ public class DefaultMessagingService extends AbstractMessagingService {
new NamedThreadFactory("MessagingService-inbound-", LOG)
);
+ @Nullable
+ private volatile BiPredicate<String, NetworkMessage> dropMessagePredicate;
+
/**
* Constructor.
*
@@ -161,6 +166,11 @@ public class DefaultMessagingService extends AbstractMessagingService {
return failedFuture(new NodeStoppingException());
}
+ BiPredicate<String, NetworkMessage> dropMessage = dropMessagePredicate;
+ if (dropMessage != null && dropMessage.test(recipient.name(), msg)) {
+ return new CompletableFuture<>();
+ }
+
InetSocketAddress recipientAddress = new InetSocketAddress(recipient.address().host(), recipient.address().port());
if (isSelf(recipient.name(), recipientAddress)) {
@@ -389,4 +399,26 @@ public class DefaultMessagingService extends AbstractMessagingService {
IgniteUtils.shutdownAndAwaitTermination(inboundExecutor, 10, TimeUnit.SECONDS);
IgniteUtils.shutdownAndAwaitTermination(outboundExecutor, 10, TimeUnit.SECONDS);
}
+
+ /**
+ * Installs a predicate, it will be consulted with for each message being sent; when it returns {@code true}, the
+ * message will be silently dropped (it will not be sent, the corresponding future will never complete).
+ *
+ * @param predicate Predicate that will decide whether a message should be dropped. Its first argument is the recipient
+ * node's consistent ID.
+ */
+ @TestOnly
+ public void dropMessages(BiPredicate<String, NetworkMessage> predicate) {
+ dropMessagePredicate = predicate;
+ }
+
+ /**
+ * Stops dropping messages.
+ *
+ * @see #dropMessages(BiPredicate)
+ */
+ @TestOnly
+ public void stopDroppingMessages() {
+ dropMessagePredicate = null;
+ }
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
index 74dacefbe0..10fe272c58 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
@@ -117,7 +117,7 @@ public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractT
* @param testInfo Test info (used to build node name).
* @return Started Ignite node.
*/
- protected Ignite startNode(int nodeIndex, TestInfo testInfo) {
+ protected IgniteImpl startNode(int nodeIndex, TestInfo testInfo) {
CompletableFuture<Ignite> future = startNode0(nodeIndex, testInfo);
assertThat(future, willSucceedIn(10, TimeUnit.SECONDS));
@@ -133,7 +133,7 @@ public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractT
+ clusterNodes.size() + " nodes");
}
- return ignite;
+ return (IgniteImpl) ignite;
}
/**
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
index ce360d0f79..e048855e30 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
@@ -26,11 +26,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.AbstractClusterIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
import org.apache.ignite.network.ClusterNode;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -93,7 +96,7 @@ class ItLogicalTopologyTest extends AbstractClusterIntegrationTest {
}
@Test
- void receivesLogicalTopologyEventsFromRestart(TestInfo testInfo) throws Exception {
+ void receivesLogicalTopologyEventsCausedByNodeRestart(TestInfo testInfo) throws Exception {
IgniteImpl entryNode = node(0);
Ignite secondIgnite = startNode(1, testInfo);
@@ -119,6 +122,49 @@ class ItLogicalTopologyTest extends AbstractClusterIntegrationTest {
assertThat(joinEvent.topologyVersion, is(4L));
}
+ @Test
+ void nodeReturnedToPhysicalTopologyReturnsToLogicalTopology(TestInfo testInfo) throws Exception {
+ IgniteImpl entryNode = node(0);
+
+ IgniteImpl secondIgnite = startNode(1, testInfo);
+
+ makeSecondNodeDisappearForFirstNode(entryNode, secondIgnite);
+
+ CountDownLatch secondIgniteAppeared = new CountDownLatch(1);
+
+ entryNode.logicalTopologyService().addEventListener(new LogicalTopologyEventListener() {
+ @Override
+ public void onAppeared(ClusterNode appearedNode, LogicalTopologySnapshot newTopology) {
+ if (appearedNode.name().equals(secondIgnite.name())) {
+ secondIgniteAppeared.countDown();
+ }
+
+ }
+ });
+
+ entryNode.stopDroppingMessages();
+
+ assertTrue(secondIgniteAppeared.await(10, TimeUnit.SECONDS), "Did not see second node coming back in time");
+ }
+
+ private static void makeSecondNodeDisappearForFirstNode(IgniteImpl firstIgnite, IgniteImpl secondIgnite) throws InterruptedException {
+ CountDownLatch secondIgniteDisappeared = new CountDownLatch(1);
+
+ firstIgnite.logicalTopologyService().addEventListener(new LogicalTopologyEventListener() {
+ @Override
+ public void onDisappeared(ClusterNode disappearedNode, LogicalTopologySnapshot newTopology) {
+ if (disappearedNode.name().equals(secondIgnite.name())) {
+ secondIgniteDisappeared.countDown();
+ }
+ }
+ });
+
+ firstIgnite.dropMessages((recipientConsistentId, message) ->
+ secondIgnite.node().name().equals(recipientConsistentId) && message instanceof ScaleCubeMessage);
+
+ assertTrue(secondIgniteDisappeared.await(10, TimeUnit.SECONDS), "Did not see second node leaving in time");
+ }
+
private static class Event {
private final boolean appeared;
private final ClusterNode node;
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index c0a59f14ef..99b01fd1bc 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -27,6 +27,7 @@ import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.Ignite;
@@ -114,9 +115,11 @@ import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.DefaultMessagingService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.NodeMetadata;
import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
@@ -820,4 +823,14 @@ public class IgniteImpl implements Ignite {
public LogicalTopologyService logicalTopologyService() {
return logicalTopologyService;
}
+
+ @TestOnly
+ public void dropMessages(BiPredicate<String, NetworkMessage> predicate) {
+ ((DefaultMessagingService) clusterSvc.messagingService()).dropMessages(predicate);
+ }
+
+ @TestOnly
+ public void stopDroppingMessages() {
+ ((DefaultMessagingService) clusterSvc.messagingService()).stopDroppingMessages();
+ }
}