You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2023/07/28 14:10:14 UTC
[ignite-3] branch main updated: IGNITE-20076 Improve networking shutdown implementation (#2375)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4084a87d19 IGNITE-20076 Improve networking shutdown implementation (#2375)
4084a87d19 is described below
commit 4084a87d194e007c7bd2fc59cf7e5653b4cf6204
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Fri Jul 28 18:09:07 2023 +0400
IGNITE-20076 Improve networking shutdown implementation (#2375)
---
.../cluster/management/ItClusterManagerTest.java | 2 +-
.../internal/network/netty/ConnectionManager.java | 46 +++++++++++++++++---
.../recovery/RecoveryClientHandshakeManager.java | 50 +++++++++++++++++-----
.../recovery/RecoveryServerHandshakeManager.java | 50 +++++++++++++++++-----
.../recovery/message/HandshakeRejectedMessage.java | 6 +++
.../scalecube/ScaleCubeClusterServiceFactory.java | 2 +
.../network/netty/RecoveryHandshakeTest.java | 20 ++++++++-
.../network/DefaultMessagingServiceTest.java | 3 +-
8 files changed, 150 insertions(+), 29 deletions(-)
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
index 6ea0bd490f..bc20440a6e 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
@@ -346,7 +346,7 @@ public class ItClusterManagerTest extends BaseItClusterManagementTest {
stopNodes(List.of(leaderNode));
- // Issue the JoinReadCommand on the joining node. It is expected that the joining node is still treated as validated.
+ // Issue the JoinReadyCommand on the joining node. It is expected that the joining node is still treated as validated.
assertThat(node.clusterManager().onJoinReady(), willCompleteSuccessfully());
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 2a77bfe720..df8a27edee 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManage
import org.apache.ignite.internal.network.recovery.StaleIdDetector;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ChannelType;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.jetbrains.annotations.Nullable;
@@ -97,6 +98,8 @@ public class ConnectionManager implements ChannelCreationListener {
/** Start flag. */
private final AtomicBoolean started = new AtomicBoolean(false);
+ private final AtomicBoolean stopping = new AtomicBoolean(false);
+
/** Stop flag. */
private final AtomicBoolean stopped = new AtomicBoolean(false);
@@ -234,14 +237,18 @@ public class ConnectionManager implements ChannelCreationListener {
}
}
- // Get an existing client or create a new one. NettyClient provides a CompletableFuture that resolves
+ // Get an existing client or create a new one. NettyClient provides a future that resolves
// when the client is ready for write operations, so previously started client, that didn't establish connection
// or didn't perform the handshake operation, can be reused.
- NettyClient client = clients.compute(
+ @Nullable NettyClient client = clients.compute(
new ConnectorKey<>(address, type),
(key, existingClient) -> isClientConnected(existingClient) ? existingClient : connect(key.id(), key.type())
);
+ if (client == null) {
+ return OrderingFuture.failedFuture(new NodeStoppingException("No outgoing connections are allowed as the node is stopping"));
+ }
+
return client.sender();
}
@@ -277,9 +284,14 @@ public class ConnectionManager implements ChannelCreationListener {
* Create new client from this node to specified address.
*
* @param address Target address.
- * @return New netty client.
+ * @return New netty client or {@code null} if we are stopping.
*/
+ @Nullable
private NettyClient connect(InetSocketAddress address, ChannelType channelType) {
+ if (stopping.get()) {
+ return null;
+ }
+
var client = new NettyClient(
address,
serializationService,
@@ -343,7 +355,15 @@ public class ConnectionManager implements ChannelCreationListener {
private HandshakeManager createClientHandshakeManager(short connectionId) {
if (clientHandshakeManagerFactory == null) {
- return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, descriptorProvider, staleIdDetector, this);
+ return new RecoveryClientHandshakeManager(
+ launchId,
+ consistentId,
+ connectionId,
+ descriptorProvider,
+ staleIdDetector,
+ this,
+ stopping
+ );
}
return clientHandshakeManagerFactory.create(
@@ -355,7 +375,15 @@ public class ConnectionManager implements ChannelCreationListener {
}
private HandshakeManager createServerHandshakeManager() {
- return new RecoveryServerHandshakeManager(launchId, consistentId, FACTORY, descriptorProvider, staleIdDetector, this);
+ return new RecoveryServerHandshakeManager(
+ launchId,
+ consistentId,
+ FACTORY,
+ descriptorProvider,
+ staleIdDetector,
+ this,
+ stopping
+ );
}
/**
@@ -401,4 +429,12 @@ public class ConnectionManager implements ChannelCreationListener {
public Map<ConnectorKey<String>, NettySender> channels() {
return Map.copyOf(channels);
}
+
+ /**
+ * Marks this connection manager as being stopped. In this state, it does not make any new connections, does not accept any connections
+ * and does not consider handshake rejections as critical events.
+ */
+ public void initiateStopping() {
+ stopping.set(true);
+ }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 53a00335a9..f157b5b07e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -45,7 +46,6 @@ import org.apache.ignite.internal.network.recovery.message.HandshakeStartRespons
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.OutNetworkObject;
-import org.jetbrains.annotations.TestOnly;
/**
* Recovery protocol handshake manager for a client.
@@ -68,6 +68,8 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
/** Used to detect that a peer uses a stale ID. */
private final StaleIdDetector staleIdDetector;
+ private final AtomicBoolean stopping;
+
/** Connection id. */
private final short connectionId;
@@ -100,6 +102,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
* @param launchId Launch id.
* @param consistentId Consistent id.
* @param recoveryDescriptorProvider Recovery descriptor provider.
+ * @param stopping Defines whether the corresponding connection manager is stopping.
*/
public RecoveryClientHandshakeManager(
UUID launchId,
@@ -107,13 +110,15 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
short connectionId,
RecoveryDescriptorProvider recoveryDescriptorProvider,
StaleIdDetector staleIdDetector,
- ChannelCreationListener channelCreationListener
+ ChannelCreationListener channelCreationListener,
+ AtomicBoolean stopping
) {
this.launchId = launchId;
this.consistentId = consistentId;
this.connectionId = connectionId;
this.recoveryDescriptorProvider = recoveryDescriptorProvider;
this.staleIdDetector = staleIdDetector;
+ this.stopping = stopping;
this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
if (throwable != null) {
@@ -156,12 +161,20 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
if (message instanceof HandshakeRejectedMessage) {
HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message;
- LOG.warn("Handshake rejected by server: {}", msg.reason());
+ boolean ignorable = stopping.get() || !msg.critical();
+
+ if (ignorable) {
+ LOG.debug("Handshake rejected by server: {}", msg.reason());
+ } else {
+ LOG.warn("Handshake rejected by server: {}", msg.reason());
+ }
handshakeCompleteFuture.completeExceptionally(new HandshakeException(msg.reason()));
- // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler
- failureHandler.handleFailure(new IgniteException("Handshake rejected by server: " + msg.reason()));
+ if (!ignorable) {
+ // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler
+ failureHandler.handleFailure(new IgniteException("Handshake rejected by server: " + msg.reason()));
+ }
return;
}
@@ -209,6 +222,12 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
return;
}
+ if (stopping.get()) {
+ handleRefusalToEstablishConnectionDueToStopping(message);
+
+ return;
+ }
+
this.remoteLaunchId = message.launchId();
this.remoteConsistentId = message.consistentId();
@@ -246,9 +265,25 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
private void handleStaleServerId(HandshakeStartMessage msg) {
String reason = msg.consistentId() + ":" + msg.launchId() + " is stale, server should be restarted so that clients can connect";
HandshakeRejectedMessage rejectionMessage = MESSAGE_FACTORY.handshakeRejectedMessage()
+ .critical(true)
.reason(reason)
.build();
+ sendHandshakeRejectedMessage(rejectionMessage, reason);
+ }
+
+ private void handleRefusalToEstablishConnectionDueToStopping(HandshakeStartMessage msg) {
+ String reason = msg.consistentId() + ":" + msg.launchId() + " tried to establish a connection with " + consistentId
+ + ", but it's stopping";
+ HandshakeRejectedMessage rejectionMessage = MESSAGE_FACTORY.handshakeRejectedMessage()
+ .critical(false)
+ .reason(reason)
+ .build();
+
+ sendHandshakeRejectedMessage(rejectionMessage, reason);
+ }
+
+ private void sendHandshakeRejectedMessage(HandshakeRejectedMessage rejectionMessage, String reason) {
ChannelFuture sendFuture = channel.writeAndFlush(new OutNetworkObject(rejectionMessage, emptyList(), false));
NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
@@ -307,9 +342,4 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId, connectionId));
}
-
- @TestOnly
- public RecoveryDescriptor recoveryDescriptor() {
- return recoveryDescriptor;
- }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index 234f508dd0..f65948eed2 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -44,7 +45,6 @@ import org.apache.ignite.internal.network.recovery.message.HandshakeStartRespons
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.OutNetworkObject;
-import org.jetbrains.annotations.TestOnly;
/**
* Recovery protocol handshake manager for a server.
@@ -90,6 +90,8 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
/** Used to detect that a peer uses a stale ID. */
private final StaleIdDetector staleIdDetector;
+ private final AtomicBoolean stopping;
+
/** Recovery descriptor. */
private RecoveryDescriptor recoveryDescriptor;
@@ -102,6 +104,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
* @param consistentId Consistent id.
* @param messageFactory Message factory.
* @param recoveryDescriptorProvider Recovery descriptor provider.
+ * @param stopping Defines whether the corresponding connection manager is stopping.
*/
public RecoveryServerHandshakeManager(
UUID launchId,
@@ -109,13 +112,15 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
NetworkMessagesFactory messageFactory,
RecoveryDescriptorProvider recoveryDescriptorProvider,
StaleIdDetector staleIdDetector,
- ChannelCreationListener channelCreationListener
+ ChannelCreationListener channelCreationListener,
+ AtomicBoolean stopping
) {
this.launchId = launchId;
this.consistentId = consistentId;
this.messageFactory = messageFactory;
this.recoveryDescriptorProvider = recoveryDescriptorProvider;
this.staleIdDetector = staleIdDetector;
+ this.stopping = stopping;
this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
if (throwable != null) {
@@ -177,12 +182,20 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
if (message instanceof HandshakeRejectedMessage) {
HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message;
- LOG.warn("Handshake rejected by client: {}", msg.reason());
+ boolean ignorable = stopping.get() || !msg.critical();
+
+ if (ignorable) {
+ LOG.debug("Handshake rejected by client: {}", msg.reason());
+ } else {
+ LOG.warn("Handshake rejected by client: {}", msg.reason());
+ }
handshakeCompleteFuture.completeExceptionally(new HandshakeException(msg.reason()));
- // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler
- failureHandler.handleFailure(new IgniteException("Handshake rejected by client: " + msg.reason()));
+ if (!ignorable) {
+ // TODO: IGNITE-16899 Perhaps we need to fail the node by FailureHandler
+ failureHandler.handleFailure(new IgniteException("Handshake rejected by client: " + msg.reason()));
+ }
return;
}
@@ -208,6 +221,12 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
return;
}
+ if (stopping.get()) {
+ handleRefusalToEstablishConnectionDueToStopping(message);
+
+ return;
+ }
+
this.remoteLaunchId = remoteLaunchId;
this.remoteConsistentId = remoteConsistentId;
this.receivedCount = remoteReceivedCount;
@@ -247,9 +266,25 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
private void handleStaleClientId(HandshakeStartResponseMessage msg) {
String reason = msg.consistentId() + ":" + msg.launchId() + " is stale, client should be restarted to be allowed to connect";
HandshakeRejectedMessage rejectionMessage = messageFactory.handshakeRejectedMessage()
+ .critical(true)
.reason(reason)
.build();
+ sendHandshakeRejectedMessage(rejectionMessage, reason);
+ }
+
+ private void handleRefusalToEstablishConnectionDueToStopping(HandshakeStartResponseMessage msg) {
+ String reason = msg.consistentId() + ":" + msg.launchId() + " tried to establish a connection with " + consistentId
+ + ", but it's stopping";
+ HandshakeRejectedMessage rejectionMessage = messageFactory.handshakeRejectedMessage()
+ .critical(false)
+ .reason(reason)
+ .build();
+
+ sendHandshakeRejectedMessage(rejectionMessage, reason);
+ }
+
+ private void sendHandshakeRejectedMessage(HandshakeRejectedMessage rejectionMessage, String reason) {
ChannelFuture sendFuture = channel.writeAndFlush(new OutNetworkObject(rejectionMessage, emptyList(), false));
NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
@@ -331,9 +366,4 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId, remoteChannelId));
}
-
- @TestOnly
- public RecoveryDescriptor recoveryDescriptor() {
- return recoveryDescriptor;
- }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
index 88c5efe3f9..9871de3597 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
@@ -35,4 +35,10 @@ public interface HandshakeRejectedMessage extends InternalMessage {
* @return Reason of the rejection.
*/
String reason();
+
+ /**
+ * Returns {@code true} iff the rejection is not expected and should be treated as a critical failure (requiring
+ * the rejected node to restart).
+ */
+ boolean critical();
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 14f012953d..87bcc22d39 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -177,6 +177,8 @@ public class ScaleCubeClusterServiceFactory {
return;
}
+ connectionMgr.initiateStopping();
+
cluster.shutdown();
try {
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index a83aa0975c..305a43c2b2 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -541,7 +541,15 @@ public class RecoveryHandshakeTest {
private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
RecoveryDescriptorProvider provider, StaleIdDetector staleIdDetector) {
- return new RecoveryClientHandshakeManager(launchId, consistentId, CONNECTION_ID, provider, staleIdDetector, channel -> {});
+ return new RecoveryClientHandshakeManager(
+ launchId,
+ consistentId,
+ CONNECTION_ID,
+ provider,
+ staleIdDetector,
+ channel -> {},
+ new AtomicBoolean(false)
+ );
}
private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
@@ -555,7 +563,15 @@ public class RecoveryHandshakeTest {
private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
RecoveryDescriptorProvider provider, StaleIdDetector staleIdDetector) {
- return new RecoveryServerHandshakeManager(launchId, consistentId, MESSAGE_FACTORY, provider, staleIdDetector, channel -> {});
+ return new RecoveryServerHandshakeManager(
+ launchId,
+ consistentId,
+ MESSAGE_FACTORY,
+ provider,
+ staleIdDetector,
+ channel -> {},
+ new AtomicBoolean(false)
+ );
}
private RecoveryDescriptorProvider createRecoveryDescriptorProvider() {
diff --git a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
index cb1736b073..28a99b5304 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -326,7 +326,8 @@ class DefaultMessagingServiceTest {
connectionId,
recoveryDescriptorProvider,
staleIdDetector,
- channel -> {}
+ channel -> {},
+ new AtomicBoolean(false)
) {
@Override
protected void finishHandshake() {