You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2023/07/28 14:10:14 UTC

[ignite-3] branch main updated: IGNITE-20076 Improve networking shutdown implementation (#2375)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 4084a87d19 IGNITE-20076 Improve networking shutdown implementation (#2375)
4084a87d19 is described below

commit 4084a87d194e007c7bd2fc59cf7e5653b4cf6204
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Fri Jul 28 18:09:07 2023 +0400

    IGNITE-20076 Improve networking shutdown implementation (#2375)
---
 .../cluster/management/ItClusterManagerTest.java   |  2 +-
 .../internal/network/netty/ConnectionManager.java  | 46 +++++++++++++++++---
 .../recovery/RecoveryClientHandshakeManager.java   | 50 +++++++++++++++++-----
 .../recovery/RecoveryServerHandshakeManager.java   | 50 +++++++++++++++++-----
 .../recovery/message/HandshakeRejectedMessage.java |  6 +++
 .../scalecube/ScaleCubeClusterServiceFactory.java  |  2 +
 .../network/netty/RecoveryHandshakeTest.java       | 20 ++++++++-
 .../network/DefaultMessagingServiceTest.java       |  3 +-
 8 files changed, 150 insertions(+), 29 deletions(-)

diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
index 6ea0bd490f..bc20440a6e 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
@@ -346,7 +346,7 @@ public class ItClusterManagerTest extends BaseItClusterManagementTest {
 
         stopNodes(List.of(leaderNode));
 
-        // Issue the JoinReadCommand on the joining node. It is expected that the joining node is still treated as validated.
+        // Issue the JoinReadyCommand on the joining node. It is expected that the joining node is still treated as validated.
         assertThat(node.clusterManager().onJoinReady(), willCompleteSuccessfully());
     }
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 2a77bfe720..df8a27edee 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManage
 import org.apache.ignite.internal.network.recovery.StaleIdDetector;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ChannelType;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.jetbrains.annotations.Nullable;
@@ -97,6 +98,8 @@ public class ConnectionManager implements ChannelCreationListener {
     /** Start flag. */
     private final AtomicBoolean started = new AtomicBoolean(false);
 
+    private final AtomicBoolean stopping = new AtomicBoolean(false);
+
     /** Stop flag. */
     private final AtomicBoolean stopped = new AtomicBoolean(false);
 
@@ -234,14 +237,18 @@ public class ConnectionManager implements ChannelCreationListener {
             }
         }
 
-        // Get an existing client or create a new one. NettyClient provides a CompletableFuture that resolves
+        // Get an existing client or create a new one. NettyClient provides a future that resolves
         // when the client is ready for write operations, so previously started client, that didn't establish connection
         // or didn't perform the handshake operation, can be reused.
-        NettyClient client = clients.compute(
+        @Nullable NettyClient client = clients.compute(
                 new ConnectorKey<>(address, type),
                 (key, existingClient) -> isClientConnected(existingClient) ? existingClient : connect(key.id(), key.type())
         );
 
+        if (client == null) {
+            return OrderingFuture.failedFuture(new NodeStoppingException("No outgoing connections are allowed as the node is stopping"));
+        }
+
         return client.sender();
     }
 
@@ -277,9 +284,14 @@ public class ConnectionManager implements ChannelCreationListener {
      * Create new client from this node to specified address.
      *
      * @param address Target address.
-     * @return New netty client.
+     * @return New netty client or {@code null} if we are stopping.
      */
+    @Nullable
     private NettyClient connect(InetSocketAddress address, ChannelType channelType) {
+        if (stopping.get()) {
+            return null;
+        }
+
         var client = new NettyClient(
                 address,
                 serializationService,
@@ -343,7 +355,15 @@ public class ConnectionManager implements ChannelCreationListener {
 
     private HandshakeManager createClientHandshakeManager(short connectionId) {
         if (clientHandshakeManagerFactory == null) {
-            return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, descriptorProvider, staleIdDetector, this);
+            return new RecoveryClientHandshakeManager(
+                    launchId,
+                    consistentId,
+                    connectionId,
+                    descriptorProvider,
+                    staleIdDetector,
+                    this,
+                    stopping
+            );
         }
 
         return clientHandshakeManagerFactory.create(
@@ -355,7 +375,15 @@ public class ConnectionManager implements ChannelCreationListener {
     }
 
     private HandshakeManager createServerHandshakeManager() {
-        return new RecoveryServerHandshakeManager(launchId, consistentId, FACTORY, descriptorProvider, staleIdDetector, this);
+        return new RecoveryServerHandshakeManager(
+                launchId,
+                consistentId,
+                FACTORY,
+                descriptorProvider,
+                staleIdDetector,
+                this,
+                stopping
+        );
     }
 
     /**
@@ -401,4 +429,12 @@ public class ConnectionManager implements ChannelCreationListener {
     public Map<ConnectorKey<String>, NettySender> channels() {
         return Map.copyOf(channels);
     }
+
+    /**
+     * Marks this connection manager as being stopped. In this state, it does not make any new connections, does not accept any connections
+     * and does not consider handshake rejections as critical events.
+     */
+    public void initiateStopping() {
+        stopping.set(true);
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 53a00335a9..f157b5b07e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -45,7 +46,6 @@ import org.apache.ignite.internal.network.recovery.message.HandshakeStartRespons
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.OutNetworkObject;
-import org.jetbrains.annotations.TestOnly;
 
 /**
  * Recovery protocol handshake manager for a client.
@@ -68,6 +68,8 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
     /** Used to detect that a peer uses a stale ID. */
     private final StaleIdDetector staleIdDetector;
 
+    private final AtomicBoolean stopping;
+
     /** Connection id. */
     private final short connectionId;
 
@@ -100,6 +102,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
      * @param launchId Launch id.
      * @param consistentId Consistent id.
      * @param recoveryDescriptorProvider Recovery descriptor provider.
+     * @param stopping Defines whether the corresponding connection manager is stopping.
      */
     public RecoveryClientHandshakeManager(
             UUID launchId,
@@ -107,13 +110,15 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
             short connectionId,
             RecoveryDescriptorProvider recoveryDescriptorProvider,
             StaleIdDetector staleIdDetector,
-            ChannelCreationListener channelCreationListener
+            ChannelCreationListener channelCreationListener,
+            AtomicBoolean stopping
     ) {
         this.launchId = launchId;
         this.consistentId = consistentId;
         this.connectionId = connectionId;
         this.recoveryDescriptorProvider = recoveryDescriptorProvider;
         this.staleIdDetector = staleIdDetector;
+        this.stopping = stopping;
 
         this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
             if (throwable != null) {
@@ -156,12 +161,20 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
         if (message instanceof HandshakeRejectedMessage) {
             HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message;
 
-            LOG.warn("Handshake rejected by server: {}", msg.reason());
+            boolean ignorable = stopping.get() || !msg.critical();
+
+            if (ignorable) {
+                LOG.debug("Handshake rejected by server: {}", msg.reason());
+            } else {
+                LOG.warn("Handshake rejected by server: {}", msg.reason());
+            }
 
             handshakeCompleteFuture.completeExceptionally(new HandshakeException(msg.reason()));
 
-            // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler
-            failureHandler.handleFailure(new IgniteException("Handshake rejected by server: " + msg.reason()));
+            if (!ignorable) {
+                // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler
+                failureHandler.handleFailure(new IgniteException("Handshake rejected by server: " + msg.reason()));
+            }
 
             return;
         }
@@ -209,6 +222,12 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
             return;
         }
 
+        if (stopping.get()) {
+            handleRefusalToEstablishConnectionDueToStopping(message);
+
+            return;
+        }
+
         this.remoteLaunchId = message.launchId();
         this.remoteConsistentId = message.consistentId();
 
@@ -246,9 +265,25 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
     private void handleStaleServerId(HandshakeStartMessage msg) {
         String reason = msg.consistentId() + ":" + msg.launchId() + " is stale, server should be restarted so that clients can connect";
         HandshakeRejectedMessage rejectionMessage = MESSAGE_FACTORY.handshakeRejectedMessage()
+                .critical(true)
                 .reason(reason)
                 .build();
 
+        sendHandshakeRejectedMessage(rejectionMessage, reason);
+    }
+
+    private void handleRefusalToEstablishConnectionDueToStopping(HandshakeStartMessage msg) {
+        String reason = msg.consistentId() + ":" + msg.launchId() + " tried to establish a connection with " + consistentId
+                + ", but it's stopping";
+        HandshakeRejectedMessage rejectionMessage = MESSAGE_FACTORY.handshakeRejectedMessage()
+                .critical(false)
+                .reason(reason)
+                .build();
+
+        sendHandshakeRejectedMessage(rejectionMessage, reason);
+    }
+
+    private void sendHandshakeRejectedMessage(HandshakeRejectedMessage rejectionMessage, String reason) {
         ChannelFuture sendFuture = channel.writeAndFlush(new OutNetworkObject(rejectionMessage, emptyList(), false));
 
         NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
@@ -307,9 +342,4 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
 
         handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId, connectionId));
     }
-
-    @TestOnly
-    public RecoveryDescriptor recoveryDescriptor() {
-        return recoveryDescriptor;
-    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index 234f508dd0..f65948eed2 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.network.recovery.message.HandshakeStartRespons
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.OutNetworkObject;
-import org.jetbrains.annotations.TestOnly;
 
 /**
  * Recovery protocol handshake manager for a server.
@@ -90,6 +90,8 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
     /** Used to detect that a peer uses a stale ID. */
     private final StaleIdDetector staleIdDetector;
 
+    private final AtomicBoolean stopping;
+
     /** Recovery descriptor. */
     private RecoveryDescriptor recoveryDescriptor;
 
@@ -102,6 +104,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
      * @param consistentId Consistent id.
      * @param messageFactory Message factory.
      * @param recoveryDescriptorProvider Recovery descriptor provider.
+     * @param stopping Defines whether the corresponding connection manager is stopping.
      */
     public RecoveryServerHandshakeManager(
             UUID launchId,
@@ -109,13 +112,15 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
             NetworkMessagesFactory messageFactory,
             RecoveryDescriptorProvider recoveryDescriptorProvider,
             StaleIdDetector staleIdDetector,
-            ChannelCreationListener channelCreationListener
+            ChannelCreationListener channelCreationListener,
+            AtomicBoolean stopping
     ) {
         this.launchId = launchId;
         this.consistentId = consistentId;
         this.messageFactory = messageFactory;
         this.recoveryDescriptorProvider = recoveryDescriptorProvider;
         this.staleIdDetector = staleIdDetector;
+        this.stopping = stopping;
 
         this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
             if (throwable != null) {
@@ -177,12 +182,20 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
         if (message instanceof HandshakeRejectedMessage) {
             HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message;
 
-            LOG.warn("Handshake rejected by client: {}", msg.reason());
+            boolean ignorable = stopping.get() || !msg.critical();
+
+            if (ignorable) {
+                LOG.debug("Handshake rejected by client: {}", msg.reason());
+            } else {
+                LOG.warn("Handshake rejected by client: {}", msg.reason());
+            }
 
             handshakeCompleteFuture.completeExceptionally(new HandshakeException(msg.reason()));
 
-            // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler
-            failureHandler.handleFailure(new IgniteException("Handshake rejected by client: " + msg.reason()));
+            if (!ignorable) {
+                // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler
+                failureHandler.handleFailure(new IgniteException("Handshake rejected by client: " + msg.reason()));
+            }
 
             return;
         }
@@ -208,6 +221,12 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
             return;
         }
 
+        if (stopping.get()) {
+            handleRefusalToEstablishConnectionDueToStopping(message);
+
+            return;
+        }
+
         this.remoteLaunchId = remoteLaunchId;
         this.remoteConsistentId = remoteConsistentId;
         this.receivedCount = remoteReceivedCount;
@@ -247,9 +266,25 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
     private void handleStaleClientId(HandshakeStartResponseMessage msg) {
         String reason = msg.consistentId() + ":" + msg.launchId() + " is stale, client should be restarted to be allowed to connect";
         HandshakeRejectedMessage rejectionMessage = messageFactory.handshakeRejectedMessage()
+                .critical(true)
                 .reason(reason)
                 .build();
 
+        sendHandshakeRejectedMessage(rejectionMessage, reason);
+    }
+
+    private void handleRefusalToEstablishConnectionDueToStopping(HandshakeStartResponseMessage msg) {
+        String reason = msg.consistentId() + ":" + msg.launchId() + " tried to establish a connection with " + consistentId
+                + ", but it's stopping";
+        HandshakeRejectedMessage rejectionMessage = messageFactory.handshakeRejectedMessage()
+                .critical(false)
+                .reason(reason)
+                .build();
+
+        sendHandshakeRejectedMessage(rejectionMessage, reason);
+    }
+
+    private void sendHandshakeRejectedMessage(HandshakeRejectedMessage rejectionMessage, String reason) {
         ChannelFuture sendFuture = channel.writeAndFlush(new OutNetworkObject(rejectionMessage, emptyList(), false));
 
         NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
@@ -331,9 +366,4 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
 
         handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId, remoteChannelId));
     }
-
-    @TestOnly
-    public RecoveryDescriptor recoveryDescriptor() {
-        return recoveryDescriptor;
-    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
index 88c5efe3f9..9871de3597 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
@@ -35,4 +35,10 @@ public interface HandshakeRejectedMessage extends InternalMessage {
      * @return Reason of the rejection.
      */
     String reason();
+
+    /**
+     * Returns {@code true} iff the rejection is not expected and should be treated as a critical failure (requiring
+     * the rejected node to restart).
+     */
+    boolean critical();
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 14f012953d..87bcc22d39 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -177,6 +177,8 @@ public class ScaleCubeClusterServiceFactory {
                     return;
                 }
 
+                connectionMgr.initiateStopping();
+
                 cluster.shutdown();
 
                 try {
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index a83aa0975c..305a43c2b2 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -541,7 +541,15 @@ public class RecoveryHandshakeTest {
 
     private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
             RecoveryDescriptorProvider provider, StaleIdDetector staleIdDetector) {
-        return new RecoveryClientHandshakeManager(launchId, consistentId, CONNECTION_ID, provider, staleIdDetector, channel -> {});
+        return new RecoveryClientHandshakeManager(
+                launchId,
+                consistentId,
+                CONNECTION_ID,
+                provider,
+                staleIdDetector,
+                channel -> {},
+                new AtomicBoolean(false)
+        );
     }
 
     private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
@@ -555,7 +563,15 @@ public class RecoveryHandshakeTest {
 
     private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
             RecoveryDescriptorProvider provider, StaleIdDetector staleIdDetector) {
-        return new RecoveryServerHandshakeManager(launchId, consistentId, MESSAGE_FACTORY, provider, staleIdDetector, channel -> {});
+        return new RecoveryServerHandshakeManager(
+                launchId,
+                consistentId,
+                MESSAGE_FACTORY,
+                provider,
+                staleIdDetector,
+                channel -> {},
+                new AtomicBoolean(false)
+        );
     }
 
     private RecoveryDescriptorProvider createRecoveryDescriptorProvider() {
diff --git a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
index cb1736b073..28a99b5304 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -326,7 +326,8 @@ class DefaultMessagingServiceTest {
                         connectionId,
                         recoveryDescriptorProvider,
                         staleIdDetector,
-                        channel -> {}
+                        channel -> {},
+                        new AtomicBoolean(false)
                 ) {
                     @Override
                     protected void finishHandshake() {