You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2022/12/08 13:32:59 UTC

[ignite-3] branch main updated: IGNITE-18321 Return node to logical topology as soon as it gets returned to physical topology (#1412)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e4d35e579 IGNITE-18321 Return node to logical topology as soon as it gets returned to physical topology (#1412)
6e4d35e579 is described below

commit 6e4d35e5797dd7ebed0fb2259d69c6fc3dd22730
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Dec 8 17:32:54 2022 +0400

    IGNITE-18321 Return node to logical topology as soon as it gets returned to physical topology (#1412)
---
 .../management/ClusterManagementGroupManager.java  | 30 +++++++++++++-
 .../management/raft/CmgRaftGroupListenerTest.java  |  2 +-
 .../ignite/network/DefaultMessagingService.java    | 32 +++++++++++++++
 .../internal/AbstractClusterIntegrationTest.java   |  4 +-
 .../internal/compute/ItLogicalTopologyTest.java    | 48 +++++++++++++++++++++-
 .../org/apache/ignite/internal/app/IgniteImpl.java | 13 ++++++
 6 files changed, 123 insertions(+), 6 deletions(-)

diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 7190955a46..d47feaa30d 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -125,6 +125,20 @@ public class ClusterManagementGroupManager implements IgniteComponent {
     /** Handles cluster initialization flow. */
     private final ClusterInitializer clusterInitializer;
 
+    /**
+     * Whether we attempted to complete join (i.e. send JoinReady command) on Ignite node start.
+     *
+     * <p>Such join completion always happens during a start, and it is always the last step during the startup process,
+     * to make sure a node joins the cluster when it's fully ready.
+     *
+     * <p>We need this flag to make sure we handle automatic rejoins correctly. If a short network hiccup happens, CMG leader
+     * might lose our node of sight, hence the node will be removed from physical and then from logical topologies. When
+     * the network connectivity is restored, the node will appear in the physical topology, after which it will try to
+     * rejoin the cluster. If such 'rejoin' was carried out unconditionally, it could happen before the first join during
+     * startup, so a not-yet-ready node could join the cluster.
+     */
+    private volatile boolean attemptedCompleteJoinOnStart = false;
+
     /** Constructor. */
     public ClusterManagementGroupManager(
             VaultManager vault,
@@ -420,7 +434,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
                             if (service != null && service.nodeNames().equals(state.cmgNodes())) {
                                 LOG.info("ClusterStateMessage received, but the CMG service is already started");
 
-                                return completedFuture(service);
+                                return joinCluster(service, state.clusterTag());
                             }
 
                             if (service == null) {
@@ -448,11 +462,21 @@ public class ClusterManagementGroupManager implements IgniteComponent {
 
                             return initCmgRaftService(state);
                         })
-                        .thenCompose(Function.identity());
+                        .thenCompose(Function.identity())
+                        .thenCompose(this::completeJoinIfTryingToRejoin);
             }
         }
     }
 
+    private CompletableFuture<CmgRaftService> completeJoinIfTryingToRejoin(CmgRaftService cmgRaftService) {
+        if (attemptedCompleteJoinOnStart) {
+            return cmgRaftService.completeJoinCluster()
+                    .thenApply(unused -> cmgRaftService);
+        } else {
+            return completedFuture(cmgRaftService);
+        }
+    }
+
     private CompletableFuture<CmgRaftService> joinCluster(CmgRaftService service, ClusterTag clusterTag) {
         return service.startJoinCluster(clusterTag)
                 .thenApply(v -> service)
@@ -709,6 +733,8 @@ public class ClusterManagementGroupManager implements IgniteComponent {
             return failedFuture(new NodeStoppingException());
         }
 
+        attemptedCompleteJoinOnStart = true;
+
         try {
             return raftServiceAfterJoin().thenCompose(CmgRaftService::completeJoinCluster);
         } finally {
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
index 743b96c05d..c6ce9b65a9 100644
--- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
@@ -122,7 +122,7 @@ public class CmgRaftGroupListenerTest {
     }
 
     @Test
-    void unsuccessfulJoinReadyExecutesOnLogicalTopologyChanged() {
+    void unsuccessfulJoinReadyDoesNotExecuteOnLogicalTopologyChanged() {
         listener.onWrite(iterator(msgFactory.joinReadyCommand().node(node).build()));
 
         verify(onLogicalTopologyChanged, never()).accept(anyLong());
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index e86a48a59e..a984ddcfdc 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiPredicate;
 import java.util.function.Function;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /** Default messaging service implementation. */
 public class DefaultMessagingService extends AbstractMessagingService {
@@ -89,6 +91,9 @@ public class DefaultMessagingService extends AbstractMessagingService {
             new NamedThreadFactory("MessagingService-inbound-", LOG)
     );
 
+    @Nullable
+    private volatile BiPredicate<String, NetworkMessage> dropMessagePredicate;
+
     /**
      * Constructor.
      *
@@ -161,6 +166,11 @@ public class DefaultMessagingService extends AbstractMessagingService {
             return failedFuture(new NodeStoppingException());
         }
 
+        BiPredicate<String, NetworkMessage> dropMessage = dropMessagePredicate;
+        if (dropMessage != null && dropMessage.test(recipient.name(), msg)) {
+            return new CompletableFuture<>();
+        }
+
         InetSocketAddress recipientAddress = new InetSocketAddress(recipient.address().host(), recipient.address().port());
 
         if (isSelf(recipient.name(), recipientAddress)) {
@@ -389,4 +399,26 @@ public class DefaultMessagingService extends AbstractMessagingService {
         IgniteUtils.shutdownAndAwaitTermination(inboundExecutor, 10, TimeUnit.SECONDS);
         IgniteUtils.shutdownAndAwaitTermination(outboundExecutor, 10, TimeUnit.SECONDS);
     }
+
+    /**
+     * Installs a predicate, it will be consulted with for each message being sent; when it returns {@code true}, the
+     * message will be silently dropped (it will not be sent, the corresponding future will never complete).
+     *
+     * @param predicate Predicate that will decide whether a message should be dropped. Its first argument is the recipient
+     *     node's consistent ID.
+     */
+    @TestOnly
+    public void dropMessages(BiPredicate<String, NetworkMessage> predicate) {
+        dropMessagePredicate = predicate;
+    }
+
+    /**
+     * Stops dropping messages.
+     *
+     * @see #dropMessages(BiPredicate)
+     */
+    @TestOnly
+    public void stopDroppingMessages() {
+        dropMessagePredicate = null;
+    }
 }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
index 74dacefbe0..10fe272c58 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
@@ -117,7 +117,7 @@ public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractT
      * @param testInfo Test info (used to build node name).
      * @return Started Ignite node.
      */
-    protected Ignite startNode(int nodeIndex, TestInfo testInfo) {
+    protected IgniteImpl startNode(int nodeIndex, TestInfo testInfo) {
         CompletableFuture<Ignite> future = startNode0(nodeIndex, testInfo);
 
         assertThat(future, willSucceedIn(10, TimeUnit.SECONDS));
@@ -133,7 +133,7 @@ public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractT
                     + clusterNodes.size() + " nodes");
         }
 
-        return ignite;
+        return (IgniteImpl) ignite;
     }
 
     /**
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
index ce360d0f79..e048855e30 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
@@ -26,11 +26,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.AbstractClusterIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import org.apache.ignite.network.ClusterNode;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
@@ -93,7 +96,7 @@ class ItLogicalTopologyTest extends AbstractClusterIntegrationTest {
     }
 
     @Test
-    void receivesLogicalTopologyEventsFromRestart(TestInfo testInfo) throws Exception {
+    void receivesLogicalTopologyEventsCausedByNodeRestart(TestInfo testInfo) throws Exception {
         IgniteImpl entryNode = node(0);
 
         Ignite secondIgnite = startNode(1, testInfo);
@@ -119,6 +122,49 @@ class ItLogicalTopologyTest extends AbstractClusterIntegrationTest {
         assertThat(joinEvent.topologyVersion, is(4L));
     }
 
+    @Test
+    void nodeReturnedToPhysicalTopologyReturnsToLogicalTopology(TestInfo testInfo) throws Exception {
+        IgniteImpl entryNode = node(0);
+
+        IgniteImpl secondIgnite = startNode(1, testInfo);
+
+        makeSecondNodeDisappearForFirstNode(entryNode, secondIgnite);
+
+        CountDownLatch secondIgniteAppeared = new CountDownLatch(1);
+
+        entryNode.logicalTopologyService().addEventListener(new LogicalTopologyEventListener() {
+            @Override
+            public void onAppeared(ClusterNode appearedNode, LogicalTopologySnapshot newTopology) {
+                if (appearedNode.name().equals(secondIgnite.name())) {
+                    secondIgniteAppeared.countDown();
+                }
+
+            }
+        });
+
+        entryNode.stopDroppingMessages();
+
+        assertTrue(secondIgniteAppeared.await(10, TimeUnit.SECONDS), "Did not see second node coming back in time");
+    }
+
+    private static void makeSecondNodeDisappearForFirstNode(IgniteImpl firstIgnite, IgniteImpl secondIgnite) throws InterruptedException {
+        CountDownLatch secondIgniteDisappeared = new CountDownLatch(1);
+
+        firstIgnite.logicalTopologyService().addEventListener(new LogicalTopologyEventListener() {
+            @Override
+            public void onDisappeared(ClusterNode disappearedNode, LogicalTopologySnapshot newTopology) {
+                if (disappearedNode.name().equals(secondIgnite.name())) {
+                    secondIgniteDisappeared.countDown();
+                }
+            }
+        });
+
+        firstIgnite.dropMessages((recipientConsistentId, message) ->
+                secondIgnite.node().name().equals(recipientConsistentId) && message instanceof ScaleCubeMessage);
+
+        assertTrue(secondIgniteDisappeared.await(10, TimeUnit.SECONDS), "Did not see second node leaving in time");
+    }
+
     private static class Event {
         private final boolean appeared;
         private final ClusterNode node;
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index c0a59f14ef..99b01fd1bc 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -27,6 +27,7 @@ import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.Ignite;
@@ -114,9 +115,11 @@ import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.DefaultMessagingService;
 import org.apache.ignite.network.MessageSerializationRegistryImpl;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.NodeMetadata;
 import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
 import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
@@ -820,4 +823,14 @@ public class IgniteImpl implements Ignite {
     public LogicalTopologyService logicalTopologyService() {
         return logicalTopologyService;
     }
+
+    @TestOnly
+    public void dropMessages(BiPredicate<String, NetworkMessage> predicate) {
+        ((DefaultMessagingService) clusterSvc.messagingService()).dropMessages(predicate);
+    }
+
+    @TestOnly
+    public void stopDroppingMessages() {
+        ((DefaultMessagingService) clusterSvc.messagingService()).stopDroppingMessages();
+    }
 }