You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/11/22 17:30:20 UTC

[ignite-3] 02/02: Update NettyServer to use NettyBootstrapFactory

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch ignite-15307
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 31f39102f3eee9799fc5e955da3740e4b2440de7
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Nov 22 20:30:08 2021 +0300

    Update NettyServer to use NettyBootstrapFactory
---
 .../internal/network/netty/ConnectionManager.java  |  24 ++--
 .../ignite/internal/network/netty/NettyServer.java | 127 +++------------------
 2 files changed, 22 insertions(+), 129 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index a42f903..efdb12e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -41,6 +41,7 @@ import org.apache.ignite.configuration.schemas.network.OutboundView;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.jetbrains.annotations.Nullable;
@@ -56,17 +57,8 @@ public class ConnectionManager {
     /** Latest version of the direct marshalling protocol. */
     public static final byte DIRECT_PROTOCOL_VERSION = 1;
     
-    /** Client bootstrap. */
-    private final Bootstrap clientBootstrap;
-    
-    /** Server boss socket channel handler event loop group. */
-    private final EventLoopGroup bossGroup;
-    
-    /** Server work socket channel handler event loop group. */
-    private final EventLoopGroup workerGroup;
-    
-    /** Client socket channel handler event loop group. */
-    private final EventLoopGroup clientWorkerGroup;
+    /** Bootstrap factory. */
+    private final NettyBootstrapFactory bootstrapFactory;
     
     /** Server. */
     private final NettyServer server;
@@ -103,21 +95,21 @@ public class ConnectionManager {
      * @param consistentId                  Consistent id of this node.
      * @param serverHandshakeManagerFactory Server handshake manager factory.
      * @param clientHandshakeManagerFactory Client handshake manager factory.
+     * @param bootstrapFactory              Bootstrap factory.
      */
     public ConnectionManager(
             NetworkView networkConfiguration,
             MessageSerializationRegistry registry,
             String consistentId,
             Supplier<HandshakeManager> serverHandshakeManagerFactory,
-            Supplier<HandshakeManager> clientHandshakeManagerFactory
+            Supplier<HandshakeManager> clientHandshakeManagerFactory,
+            NettyBootstrapFactory bootstrapFactory
     ) {
         this.serializationRegistry = registry;
         this.consistentId = consistentId;
         this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
-        
-        this.bossGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-accept");
-        this.workerGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-worker");
-        this.clientWorkerGroup = NamedNioEventLoopGroup.create(consistentId + "-client");
+
+        this.bootstrapFactory = bootstrapFactory;
     
         this.server = new NettyServer(
                 consistentId,
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index f154625..d411294 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -21,8 +21,6 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
 import io.netty.channel.ServerChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -30,15 +28,14 @@ import io.netty.handler.stream.ChunkedWriteHandler;
 import java.net.SocketAddress;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
-import org.apache.ignite.configuration.schemas.network.InboundView;
 import org.apache.ignite.configuration.schemas.network.NetworkView;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.jetbrains.annotations.Nullable;
@@ -54,12 +51,6 @@ public class NettyServer {
     /** {@link NioServerSocketChannel} bootstrapper. */
     private final ServerBootstrap bootstrap;
     
-    /** Socket accepter event loop group. */
-    private final EventLoopGroup bossGroup;
-    
-    /** Socket handler event loop group. */
-    private final EventLoopGroup workerGroup;
-    
     /** Server socket configuration. */
     private final NetworkView configuration;
     
@@ -86,7 +77,7 @@ public class NettyServer {
     private final Consumer<NettySender> newConnectionListener;
     
     /** Flag indicating if {@link #stop()} has been called. */
-    private boolean stopped = false;
+    private boolean stopped;
     
     /**
      * Constructor.
@@ -105,19 +96,16 @@ public class NettyServer {
             Consumer<NettySender> newConnectionListener,
             BiConsumer<SocketAddress, NetworkMessage> messageListener,
             MessageSerializationRegistry serializationRegistry,
-            EventLoopGroup bossGroup,
-            EventLoopGroup workerGroup
+            NettyBootstrapFactory bootstrapFactory
     ) {
         this(
                 consistentId,
-                new ServerBootstrap(),
+                bootstrapFactory.createServerBootstrap(),
                 configuration,
                 handshakeManager,
                 newConnectionListener,
                 messageListener,
-                serializationRegistry,
-                bossGroup,
-                workerGroup
+                serializationRegistry
         );
     }
     
@@ -139,9 +127,7 @@ public class NettyServer {
             Supplier<HandshakeManager> handshakeManager,
             Consumer<NettySender> newConnectionListener,
             BiConsumer<SocketAddress, NetworkMessage> messageListener,
-            MessageSerializationRegistry serializationRegistry,
-            EventLoopGroup bossGroup,
-            EventLoopGroup workerGroup
+            MessageSerializationRegistry serializationRegistry
     ) {
         this.bootstrap = bootstrap;
         this.configuration = configuration;
@@ -149,12 +135,6 @@ public class NettyServer {
         this.newConnectionListener = newConnectionListener;
         this.messageListener = messageListener;
         this.serializationRegistry = serializationRegistry;
-        this.bossGroup = bossGroup;
-        this.workerGroup = workerGroup;
-        serverCloseFuture = CompletableFuture.allOf(
-                NettyUtils.toCompletableFuture(bossGroup.terminationFuture()),
-                NettyUtils.toCompletableFuture(workerGroup.terminationFuture())
-        );
     }
     
     /**
@@ -172,12 +152,7 @@ public class NettyServer {
                 throw new IgniteInternalException("Attempted to start an already started server");
             }
             
-            InboundView inboundConfiguration = configuration.inbound();
-            
-            // TODO: Reuse part of this logic and move to ConnectionManager.createServerBootstrap.
-            bootstrap.group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel.class)
-                    .childHandler(new ChannelInitializer<SocketChannel>() {
+            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                         /** {@inheritDoc} */
                         @Override
                         public void initChannel(SocketChannel ch) {
@@ -206,37 +181,7 @@ public class NettyServer {
                             
                             manager.handshakeFuture().thenAccept(newConnectionListener);
                         }
-                    })
-                    /*
-                     * The maximum queue length for incoming connection indications (a request to connect) is set
-                     * to the backlog parameter. If a connection indication arrives when the queue is full,
-                     * the connection is refused.
-                     */
-                    .option(ChannelOption.SO_BACKLOG, inboundConfiguration.soBacklog())
-                    .option(ChannelOption.SO_REUSEADDR, inboundConfiguration.soReuseAddr())
-                    /*
-                     * When the keepalive option is set for a TCP socket and no data has been exchanged across the socket
-                     * in either direction for 2 hours (NOTE: the actual value is implementation dependent),
-                     * TCP automatically sends a keepalive probe to the peer.
-                     */
-                    .childOption(ChannelOption.SO_KEEPALIVE, inboundConfiguration.soKeepAlive())
-                    /*
-                     * Specify a linger-on-close timeout. This option disables/enables immediate return from a close()
-                     * of a TCP Socket. Enabling this option with a non-zero Integer timeout means that a close() will
-                     * block pending the transmission and acknowledgement of all data written to the peer, at which point
-                     * the socket is closed gracefully. Upon reaching the linger timeout, the socket is closed forcefully,
-                     * with a TCP RST. Enabling the option with a timeout of zero does a forceful close immediately.
-                     * If the specified timeout value exceeds 65,535 it will be reduced to 65,535.
-                     */
-                    .childOption(ChannelOption.SO_LINGER, inboundConfiguration.soLinger())
-                    /*
-                     * Disable Nagle's algorithm for this connection. Written data to the network is not buffered pending
-                     * acknowledgement of previously written data. Valid for TCP only. Setting this option reduces
-                     * network latency and and delivery time for small messages.
-                     * For more information, see Socket#setTcpNoDelay(boolean)
-                     * and https://en.wikipedia.org/wiki/Nagle%27s_algorithm.
-                     */
-                    .childOption(ChannelOption.TCP_NODELAY, inboundConfiguration.tcpNoDelay());
+                    });
             
             int port = configuration.port();
             int portRange = configuration.portRange();
@@ -248,31 +193,16 @@ public class NettyServer {
             serverStartFuture = bindFuture
                     .handle((channel, err) -> {
                         synchronized (startStopLock) {
-                            CompletableFuture<Void> workerCloseFuture = serverCloseFuture;
-                            
                             if (channel != null) {
-                                CompletableFuture<Void> channelCloseFuture = NettyUtils.toCompletableFuture(channel.closeFuture())
-                                        // Shutdown event loops on channel close.
-                                        .whenComplete((v, err0) -> shutdownEventLoopGroups());
-                                
-                                serverCloseFuture = CompletableFuture.allOf(channelCloseFuture, workerCloseFuture);
+                                serverCloseFuture = NettyUtils.toCompletableFuture(channel.closeFuture());
                             }
                             
                             this.channel = (ServerChannel) channel;
-                            
-                            // Shutdown event loops if the server has failed to start or has been stopped.
-                            if (err != null || stopped) {
-                                shutdownEventLoopGroups();
     
-                                return workerCloseFuture.handle((unused, throwable) -> {
-                                    Throwable stopErr = err != null ? err : new CancellationException("Server was stopped");
-        
-                                    if (throwable != null) {
-                                        stopErr.addSuppressed(throwable);
-                                    }
-        
-                                    return CompletableFuture.<Void>failedFuture(stopErr);
-                                }).thenCompose(Function.identity());
+                            if (err != null || stopped) {
+                                Throwable stopErr = err != null ? err : new CancellationException("Server was stopped");
+                                
+                                return CompletableFuture.<Void>failedFuture(stopErr);
                             } else {
                                 return CompletableFuture.<Void>completedFuture(null);
                             }
@@ -344,41 +274,12 @@ public class NettyServer {
     }
     
     /**
-     * Shutdown event loops.
-     */
-    private void shutdownEventLoopGroups() {
-        // TODO: IGNITE-14538 quietPeriod and timeout should be configurable.
-        bossGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS);
-        workerGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS);
-    }
-    
-    /**
      * Returns {@code true} if the server is running, {@code false} otherwise.
      *
      * @return {@code true} if the server is running, {@code false} otherwise.
      */
     @TestOnly
     public boolean isRunning() {
-        return channel != null && channel.isOpen() && !bossGroup.isShuttingDown() && !workerGroup.isShuttingDown();
-    }
-    
-    /**
-     * Returns acceptor event loop group.
-     *
-     * @return Acceptor event loop group.
-     */
-    @TestOnly
-    public EventLoopGroup getBossGroup() {
-        return bossGroup;
-    }
-    
-    /**
-     * Returns worker event loop group.
-     *
-     * @return Worker event loop group.
-     */
-    @TestOnly
-    public EventLoopGroup getWorkerGroup() {
-        return workerGroup;
+        return channel != null && channel.isOpen();
     }
 }