You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/11/22 17:30:20 UTC
[ignite-3] 02/02: Update NettyServer to use NettyBootstrapFactory
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch ignite-15307
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 31f39102f3eee9799fc5e955da3740e4b2440de7
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Nov 22 20:30:08 2021 +0300
Update NettyServer to use NettyBootstrapFactory
---
.../internal/network/netty/ConnectionManager.java | 24 ++--
.../ignite/internal/network/netty/NettyServer.java | 127 +++------------------
2 files changed, 22 insertions(+), 129 deletions(-)
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index a42f903..efdb12e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -41,6 +41,7 @@ import org.apache.ignite.configuration.schemas.network.OutboundView;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.jetbrains.annotations.Nullable;
@@ -56,17 +57,8 @@ public class ConnectionManager {
/** Latest version of the direct marshalling protocol. */
public static final byte DIRECT_PROTOCOL_VERSION = 1;
- /** Client bootstrap. */
- private final Bootstrap clientBootstrap;
-
- /** Server boss socket channel handler event loop group. */
- private final EventLoopGroup bossGroup;
-
- /** Server work socket channel handler event loop group. */
- private final EventLoopGroup workerGroup;
-
- /** Client socket channel handler event loop group. */
- private final EventLoopGroup clientWorkerGroup;
+ /** Bootstrap factory. */
+ private final NettyBootstrapFactory bootstrapFactory;
/** Server. */
private final NettyServer server;
@@ -103,21 +95,21 @@ public class ConnectionManager {
* @param consistentId Consistent id of this node.
* @param serverHandshakeManagerFactory Server handshake manager factory.
* @param clientHandshakeManagerFactory Client handshake manager factory.
+ * @param bootstrapFactory Bootstrap factory.
*/
public ConnectionManager(
NetworkView networkConfiguration,
MessageSerializationRegistry registry,
String consistentId,
Supplier<HandshakeManager> serverHandshakeManagerFactory,
- Supplier<HandshakeManager> clientHandshakeManagerFactory
+ Supplier<HandshakeManager> clientHandshakeManagerFactory,
+ NettyBootstrapFactory bootstrapFactory
) {
this.serializationRegistry = registry;
this.consistentId = consistentId;
this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
-
- this.bossGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-accept");
- this.workerGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-worker");
- this.clientWorkerGroup = NamedNioEventLoopGroup.create(consistentId + "-client");
+
+ this.bootstrapFactory = bootstrapFactory;
this.server = new NettyServer(
consistentId,
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index f154625..d411294 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -21,8 +21,6 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -30,15 +28,14 @@ import io.netty.handler.stream.ChunkedWriteHandler;
import java.net.SocketAddress;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
-import org.apache.ignite.configuration.schemas.network.InboundView;
import org.apache.ignite.configuration.schemas.network.NetworkView;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.jetbrains.annotations.Nullable;
@@ -54,12 +51,6 @@ public class NettyServer {
/** {@link NioServerSocketChannel} bootstrapper. */
private final ServerBootstrap bootstrap;
- /** Socket accepter event loop group. */
- private final EventLoopGroup bossGroup;
-
- /** Socket handler event loop group. */
- private final EventLoopGroup workerGroup;
-
/** Server socket configuration. */
private final NetworkView configuration;
@@ -86,7 +77,7 @@ public class NettyServer {
private final Consumer<NettySender> newConnectionListener;
/** Flag indicating if {@link #stop()} has been called. */
- private boolean stopped = false;
+ private boolean stopped;
/**
* Constructor.
@@ -105,19 +96,16 @@ public class NettyServer {
Consumer<NettySender> newConnectionListener,
BiConsumer<SocketAddress, NetworkMessage> messageListener,
MessageSerializationRegistry serializationRegistry,
- EventLoopGroup bossGroup,
- EventLoopGroup workerGroup
+ NettyBootstrapFactory bootstrapFactory
) {
this(
consistentId,
- new ServerBootstrap(),
+ bootstrapFactory.createServerBootstrap(),
configuration,
handshakeManager,
newConnectionListener,
messageListener,
- serializationRegistry,
- bossGroup,
- workerGroup
+ serializationRegistry
);
}
@@ -139,9 +127,7 @@ public class NettyServer {
Supplier<HandshakeManager> handshakeManager,
Consumer<NettySender> newConnectionListener,
BiConsumer<SocketAddress, NetworkMessage> messageListener,
- MessageSerializationRegistry serializationRegistry,
- EventLoopGroup bossGroup,
- EventLoopGroup workerGroup
+ MessageSerializationRegistry serializationRegistry
) {
this.bootstrap = bootstrap;
this.configuration = configuration;
@@ -149,12 +135,6 @@ public class NettyServer {
this.newConnectionListener = newConnectionListener;
this.messageListener = messageListener;
this.serializationRegistry = serializationRegistry;
- this.bossGroup = bossGroup;
- this.workerGroup = workerGroup;
- serverCloseFuture = CompletableFuture.allOf(
- NettyUtils.toCompletableFuture(bossGroup.terminationFuture()),
- NettyUtils.toCompletableFuture(workerGroup.terminationFuture())
- );
}
/**
@@ -172,12 +152,7 @@ public class NettyServer {
throw new IgniteInternalException("Attempted to start an already started server");
}
- InboundView inboundConfiguration = configuration.inbound();
-
- // TODO: Reuse part of this logic and move to ConnectionManager.createServerBootstrap.
- bootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
+ bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
/** {@inheritDoc} */
@Override
public void initChannel(SocketChannel ch) {
@@ -206,37 +181,7 @@ public class NettyServer {
manager.handshakeFuture().thenAccept(newConnectionListener);
}
- })
- /*
- * The maximum queue length for incoming connection indications (a request to connect) is set
- * to the backlog parameter. If a connection indication arrives when the queue is full,
- * the connection is refused.
- */
- .option(ChannelOption.SO_BACKLOG, inboundConfiguration.soBacklog())
- .option(ChannelOption.SO_REUSEADDR, inboundConfiguration.soReuseAddr())
- /*
- * When the keepalive option is set for a TCP socket and no data has been exchanged across the socket
- * in either direction for 2 hours (NOTE: the actual value is implementation dependent),
- * TCP automatically sends a keepalive probe to the peer.
- */
- .childOption(ChannelOption.SO_KEEPALIVE, inboundConfiguration.soKeepAlive())
- /*
- * Specify a linger-on-close timeout. This option disables/enables immediate return from a close()
- * of a TCP Socket. Enabling this option with a non-zero Integer timeout means that a close() will
- * block pending the transmission and acknowledgement of all data written to the peer, at which point
- * the socket is closed gracefully. Upon reaching the linger timeout, the socket is closed forcefully,
- * with a TCP RST. Enabling the option with a timeout of zero does a forceful close immediately.
- * If the specified timeout value exceeds 65,535 it will be reduced to 65,535.
- */
- .childOption(ChannelOption.SO_LINGER, inboundConfiguration.soLinger())
- /*
- * Disable Nagle's algorithm for this connection. Written data to the network is not buffered pending
- * acknowledgement of previously written data. Valid for TCP only. Setting this option reduces
- * network latency and and delivery time for small messages.
- * For more information, see Socket#setTcpNoDelay(boolean)
- * and https://en.wikipedia.org/wiki/Nagle%27s_algorithm.
- */
- .childOption(ChannelOption.TCP_NODELAY, inboundConfiguration.tcpNoDelay());
+ });
int port = configuration.port();
int portRange = configuration.portRange();
@@ -248,31 +193,16 @@ public class NettyServer {
serverStartFuture = bindFuture
.handle((channel, err) -> {
synchronized (startStopLock) {
- CompletableFuture<Void> workerCloseFuture = serverCloseFuture;
-
if (channel != null) {
- CompletableFuture<Void> channelCloseFuture = NettyUtils.toCompletableFuture(channel.closeFuture())
- // Shutdown event loops on channel close.
- .whenComplete((v, err0) -> shutdownEventLoopGroups());
-
- serverCloseFuture = CompletableFuture.allOf(channelCloseFuture, workerCloseFuture);
+ serverCloseFuture = NettyUtils.toCompletableFuture(channel.closeFuture());
}
this.channel = (ServerChannel) channel;
-
- // Shutdown event loops if the server has failed to start or has been stopped.
- if (err != null || stopped) {
- shutdownEventLoopGroups();
- return workerCloseFuture.handle((unused, throwable) -> {
- Throwable stopErr = err != null ? err : new CancellationException("Server was stopped");
-
- if (throwable != null) {
- stopErr.addSuppressed(throwable);
- }
-
- return CompletableFuture.<Void>failedFuture(stopErr);
- }).thenCompose(Function.identity());
+ if (err != null || stopped) {
+ Throwable stopErr = err != null ? err : new CancellationException("Server was stopped");
+
+ return CompletableFuture.<Void>failedFuture(stopErr);
} else {
return CompletableFuture.<Void>completedFuture(null);
}
@@ -344,41 +274,12 @@ public class NettyServer {
}
/**
- * Shutdown event loops.
- */
- private void shutdownEventLoopGroups() {
- // TODO: IGNITE-14538 quietPeriod and timeout should be configurable.
- bossGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS);
- workerGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS);
- }
-
- /**
* Returns {@code true} if the server is running, {@code false} otherwise.
*
* @return {@code true} if the server is running, {@code false} otherwise.
*/
@TestOnly
public boolean isRunning() {
- return channel != null && channel.isOpen() && !bossGroup.isShuttingDown() && !workerGroup.isShuttingDown();
- }
-
- /**
- * Returns acceptor event loop group.
- *
- * @return Acceptor event loop group.
- */
- @TestOnly
- public EventLoopGroup getBossGroup() {
- return bossGroup;
- }
-
- /**
- * Returns worker event loop group.
- *
- * @return Worker event loop group.
- */
- @TestOnly
- public EventLoopGroup getWorkerGroup() {
- return workerGroup;
+ return channel != null && channel.isOpen();
}
}