You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/11/22 17:30:18 UTC

[ignite-3] branch ignite-15307 updated (03d847e -> 31f3910)

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a change to branch ignite-15307
in repository https://gitbox.apache.org/repos/asf/ignite-3.git.


    from 03d847e  Merge branch 'main' into ignite-15307
     new b187ba1  wip NettyBootstrapFactory
     new 31f3910  Update NettyServer to use NettyBootstrapFactory

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../internal/network/netty/ConnectionManager.java  |  24 ++--
 .../ignite/internal/network/netty/NettyServer.java | 127 +++------------------
 .../ignite/network/NettyBootstrapFactory.java      |   8 +-
 3 files changed, 26 insertions(+), 133 deletions(-)

[ignite-3] 01/02: wip NettyBootstrapFactory

Posted by pt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch ignite-15307
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit b187ba1961002b63425e414dadc89ff89396b777
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Nov 22 19:02:41 2021 +0300

    wip NettyBootstrapFactory
---
 .../java/org/apache/ignite/network/NettyBootstrapFactory.java     | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
index 4a71733..2c2b852 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.network.netty.NamedNioEventLoopGroup;
  */
 public class NettyBootstrapFactory {
     /** Network configuration. */
-    private NetworkView networkConfiguration;
+    private final NetworkView networkConfiguration;
     
     /** Server boss socket channel handler event loop group. */
     private final EventLoopGroup bossGroup;
@@ -59,9 +59,9 @@ public class NettyBootstrapFactory {
         
         this.networkConfiguration = networkConfiguration;
         
-        this.bossGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-accept");
-        this.workerGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-worker");
-        this.clientWorkerGroup = NamedNioEventLoopGroup.create(consistentId + "-client");
+        bossGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-accept");
+        workerGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-worker");
+        clientWorkerGroup = NamedNioEventLoopGroup.create(consistentId + "-client");
     }
     
     /**

[ignite-3] 02/02: Update NettyServer to use NettyBootstrapFactory

Posted by pt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch ignite-15307
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 31f39102f3eee9799fc5e955da3740e4b2440de7
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Nov 22 20:30:08 2021 +0300

    Update NettyServer to use NettyBootstrapFactory
---
 .../internal/network/netty/ConnectionManager.java  |  24 ++--
 .../ignite/internal/network/netty/NettyServer.java | 127 +++------------------
 2 files changed, 22 insertions(+), 129 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index a42f903..efdb12e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -41,6 +41,7 @@ import org.apache.ignite.configuration.schemas.network.OutboundView;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.jetbrains.annotations.Nullable;
@@ -56,17 +57,8 @@ public class ConnectionManager {
     /** Latest version of the direct marshalling protocol. */
     public static final byte DIRECT_PROTOCOL_VERSION = 1;
     
-    /** Client bootstrap. */
-    private final Bootstrap clientBootstrap;
-    
-    /** Server boss socket channel handler event loop group. */
-    private final EventLoopGroup bossGroup;
-    
-    /** Server work socket channel handler event loop group. */
-    private final EventLoopGroup workerGroup;
-    
-    /** Client socket channel handler event loop group. */
-    private final EventLoopGroup clientWorkerGroup;
+    /** Bootstrap factory. */
+    private final NettyBootstrapFactory bootstrapFactory;
     
     /** Server. */
     private final NettyServer server;
@@ -103,21 +95,21 @@ public class ConnectionManager {
      * @param consistentId                  Consistent id of this node.
      * @param serverHandshakeManagerFactory Server handshake manager factory.
      * @param clientHandshakeManagerFactory Client handshake manager factory.
+     * @param bootstrapFactory              Bootstrap factory.
      */
     public ConnectionManager(
             NetworkView networkConfiguration,
             MessageSerializationRegistry registry,
             String consistentId,
             Supplier<HandshakeManager> serverHandshakeManagerFactory,
-            Supplier<HandshakeManager> clientHandshakeManagerFactory
+            Supplier<HandshakeManager> clientHandshakeManagerFactory,
+            NettyBootstrapFactory bootstrapFactory
     ) {
         this.serializationRegistry = registry;
         this.consistentId = consistentId;
         this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
-        
-        this.bossGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-accept");
-        this.workerGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-worker");
-        this.clientWorkerGroup = NamedNioEventLoopGroup.create(consistentId + "-client");
+
+        this.bootstrapFactory = bootstrapFactory;
     
         this.server = new NettyServer(
                 consistentId,
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index f154625..d411294 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -21,8 +21,6 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
 import io.netty.channel.ServerChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -30,15 +28,14 @@ import io.netty.handler.stream.ChunkedWriteHandler;
 import java.net.SocketAddress;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
-import org.apache.ignite.configuration.schemas.network.InboundView;
 import org.apache.ignite.configuration.schemas.network.NetworkView;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.jetbrains.annotations.Nullable;
@@ -54,12 +51,6 @@ public class NettyServer {
     /** {@link NioServerSocketChannel} bootstrapper. */
     private final ServerBootstrap bootstrap;
     
-    /** Socket accepter event loop group. */
-    private final EventLoopGroup bossGroup;
-    
-    /** Socket handler event loop group. */
-    private final EventLoopGroup workerGroup;
-    
     /** Server socket configuration. */
     private final NetworkView configuration;
     
@@ -86,7 +77,7 @@ public class NettyServer {
     private final Consumer<NettySender> newConnectionListener;
     
     /** Flag indicating if {@link #stop()} has been called. */
-    private boolean stopped = false;
+    private boolean stopped;
     
     /**
      * Constructor.
@@ -105,19 +96,16 @@ public class NettyServer {
             Consumer<NettySender> newConnectionListener,
             BiConsumer<SocketAddress, NetworkMessage> messageListener,
             MessageSerializationRegistry serializationRegistry,
-            EventLoopGroup bossGroup,
-            EventLoopGroup workerGroup
+            NettyBootstrapFactory bootstrapFactory
     ) {
         this(
                 consistentId,
-                new ServerBootstrap(),
+                bootstrapFactory.createServerBootstrap(),
                 configuration,
                 handshakeManager,
                 newConnectionListener,
                 messageListener,
-                serializationRegistry,
-                bossGroup,
-                workerGroup
+                serializationRegistry
         );
     }
     
@@ -139,9 +127,7 @@ public class NettyServer {
             Supplier<HandshakeManager> handshakeManager,
             Consumer<NettySender> newConnectionListener,
             BiConsumer<SocketAddress, NetworkMessage> messageListener,
-            MessageSerializationRegistry serializationRegistry,
-            EventLoopGroup bossGroup,
-            EventLoopGroup workerGroup
+            MessageSerializationRegistry serializationRegistry
     ) {
         this.bootstrap = bootstrap;
         this.configuration = configuration;
@@ -149,12 +135,6 @@ public class NettyServer {
         this.newConnectionListener = newConnectionListener;
         this.messageListener = messageListener;
         this.serializationRegistry = serializationRegistry;
-        this.bossGroup = bossGroup;
-        this.workerGroup = workerGroup;
-        serverCloseFuture = CompletableFuture.allOf(
-                NettyUtils.toCompletableFuture(bossGroup.terminationFuture()),
-                NettyUtils.toCompletableFuture(workerGroup.terminationFuture())
-        );
     }
     
     /**
@@ -172,12 +152,7 @@ public class NettyServer {
                 throw new IgniteInternalException("Attempted to start an already started server");
             }
             
-            InboundView inboundConfiguration = configuration.inbound();
-            
-            // TODO: Reuse part of this logic and move to ConnectionManager.createServerBootstrap.
-            bootstrap.group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel.class)
-                    .childHandler(new ChannelInitializer<SocketChannel>() {
+            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                         /** {@inheritDoc} */
                         @Override
                         public void initChannel(SocketChannel ch) {
@@ -206,37 +181,7 @@ public class NettyServer {
                             
                             manager.handshakeFuture().thenAccept(newConnectionListener);
                         }
-                    })
-                    /*
-                     * The maximum queue length for incoming connection indications (a request to connect) is set
-                     * to the backlog parameter. If a connection indication arrives when the queue is full,
-                     * the connection is refused.
-                     */
-                    .option(ChannelOption.SO_BACKLOG, inboundConfiguration.soBacklog())
-                    .option(ChannelOption.SO_REUSEADDR, inboundConfiguration.soReuseAddr())
-                    /*
-                     * When the keepalive option is set for a TCP socket and no data has been exchanged across the socket
-                     * in either direction for 2 hours (NOTE: the actual value is implementation dependent),
-                     * TCP automatically sends a keepalive probe to the peer.
-                     */
-                    .childOption(ChannelOption.SO_KEEPALIVE, inboundConfiguration.soKeepAlive())
-                    /*
-                     * Specify a linger-on-close timeout. This option disables/enables immediate return from a close()
-                     * of a TCP Socket. Enabling this option with a non-zero Integer timeout means that a close() will
-                     * block pending the transmission and acknowledgement of all data written to the peer, at which point
-                     * the socket is closed gracefully. Upon reaching the linger timeout, the socket is closed forcefully,
-                     * with a TCP RST. Enabling the option with a timeout of zero does a forceful close immediately.
-                     * If the specified timeout value exceeds 65,535 it will be reduced to 65,535.
-                     */
-                    .childOption(ChannelOption.SO_LINGER, inboundConfiguration.soLinger())
-                    /*
-                     * Disable Nagle's algorithm for this connection. Written data to the network is not buffered pending
-                     * acknowledgement of previously written data. Valid for TCP only. Setting this option reduces
-                     * network latency and and delivery time for small messages.
-                     * For more information, see Socket#setTcpNoDelay(boolean)
-                     * and https://en.wikipedia.org/wiki/Nagle%27s_algorithm.
-                     */
-                    .childOption(ChannelOption.TCP_NODELAY, inboundConfiguration.tcpNoDelay());
+                    });
             
             int port = configuration.port();
             int portRange = configuration.portRange();
@@ -248,31 +193,16 @@ public class NettyServer {
             serverStartFuture = bindFuture
                     .handle((channel, err) -> {
                         synchronized (startStopLock) {
-                            CompletableFuture<Void> workerCloseFuture = serverCloseFuture;
-                            
                             if (channel != null) {
-                                CompletableFuture<Void> channelCloseFuture = NettyUtils.toCompletableFuture(channel.closeFuture())
-                                        // Shutdown event loops on channel close.
-                                        .whenComplete((v, err0) -> shutdownEventLoopGroups());
-                                
-                                serverCloseFuture = CompletableFuture.allOf(channelCloseFuture, workerCloseFuture);
+                                serverCloseFuture = NettyUtils.toCompletableFuture(channel.closeFuture());
                             }
                             
                             this.channel = (ServerChannel) channel;
-                            
-                            // Shutdown event loops if the server has failed to start or has been stopped.
-                            if (err != null || stopped) {
-                                shutdownEventLoopGroups();
     
-                                return workerCloseFuture.handle((unused, throwable) -> {
-                                    Throwable stopErr = err != null ? err : new CancellationException("Server was stopped");
-        
-                                    if (throwable != null) {
-                                        stopErr.addSuppressed(throwable);
-                                    }
-        
-                                    return CompletableFuture.<Void>failedFuture(stopErr);
-                                }).thenCompose(Function.identity());
+                            if (err != null || stopped) {
+                                Throwable stopErr = err != null ? err : new CancellationException("Server was stopped");
+                                
+                                return CompletableFuture.<Void>failedFuture(stopErr);
                             } else {
                                 return CompletableFuture.<Void>completedFuture(null);
                             }
@@ -344,41 +274,12 @@ public class NettyServer {
     }
     
     /**
-     * Shutdown event loops.
-     */
-    private void shutdownEventLoopGroups() {
-        // TODO: IGNITE-14538 quietPeriod and timeout should be configurable.
-        bossGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS);
-        workerGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS);
-    }
-    
-    /**
      * Returns {@code true} if the server is running, {@code false} otherwise.
      *
      * @return {@code true} if the server is running, {@code false} otherwise.
      */
     @TestOnly
     public boolean isRunning() {
-        return channel != null && channel.isOpen() && !bossGroup.isShuttingDown() && !workerGroup.isShuttingDown();
-    }
-    
-    /**
-     * Returns acceptor event loop group.
-     *
-     * @return Acceptor event loop group.
-     */
-    @TestOnly
-    public EventLoopGroup getBossGroup() {
-        return bossGroup;
-    }
-    
-    /**
-     * Returns worker event loop group.
-     *
-     * @return Worker event loop group.
-     */
-    @TestOnly
-    public EventLoopGroup getWorkerGroup() {
-        return workerGroup;
+        return channel != null && channel.isOpen();
     }
 }