You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/05/04 01:10:45 UTC

[james-project] 01/03: JAMES-3737 Allow configuring boss thread count for protocols

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ef0b073344506845a319e06e3c723e66a407d39d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Apr 18 16:37:09 2022 +0700

    JAMES-3737 Allow configuring boss thread count for protocols
    
    These threads are used for accepting incoming connections.
    
     - Ability to specify the thread count
     - Ability to instead rely on IO threads
---
 .../james/protocols/netty/AbstractAsyncServer.java | 42 ++++++++++++++++------
 .../lib/netty/AbstractConfigurableAsyncServer.java |  4 ++-
 2 files changed, 34 insertions(+), 12 deletions(-)

diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
index 392c5a65f4..13d343319b 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
@@ -21,6 +21,7 @@ package org.apache.james.protocols.netty;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.james.protocols.api.ProtocolServer;
 import org.apache.james.util.concurrent.NamedThreadFactory;
@@ -49,11 +50,12 @@ import io.netty.util.concurrent.ImmediateEventExecutor;
 public abstract class AbstractAsyncServer implements ProtocolServer {
 
     public static final int DEFAULT_IO_WORKER_COUNT = Runtime.getRuntime().availableProcessors() * 2;
+    public static final int DEFAULT_BOSS_WORKER_COUNT = 2;
     private volatile int backlog = 250;
     
     private volatile int timeout = 120;
 
-    private EventLoopGroup bossGroup;
+    private Optional<EventLoopGroup> bossGroup;
     private EventLoopGroup workerGroup;
 
     private volatile boolean started;
@@ -61,7 +63,8 @@ public abstract class AbstractAsyncServer implements ProtocolServer {
     private final ChannelGroup channels = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
 
     private volatile int ioWorker = DEFAULT_IO_WORKER_COUNT;
-    
+    private volatile Optional<Integer> bossWorker = Optional.of(DEFAULT_IO_WORKER_COUNT);
+
     private List<InetSocketAddress> addresses = new ArrayList<>();
 
     protected String jmxName;
@@ -80,7 +83,11 @@ public abstract class AbstractAsyncServer implements ProtocolServer {
     }
 
     /**
-     * Set the IO-worker thread count to use. Default is nCores * 2
+     * Set the IO-worker thread count to use.
+     *
+     * IO threads are used for receiving and framing IMAP messages.
+     *
+     * Default is nCores * 2
      */
     public void setIoWorkerCount(int ioWorker) {
         if (started) {
@@ -89,6 +96,20 @@ public abstract class AbstractAsyncServer implements ProtocolServer {
         this.ioWorker = ioWorker;
     }
 
+    /**
+     * Set the Boss-worker thread count to use.
+     *
+     * Boss threads are responsible of accepting new connections.
+     *
+     * Default is to not use boss threads and let the IO threads hand over this responsibility.
+     */
+    public void setBossWorkerCount(Optional<Integer> bossWorker) {
+        if (started) {
+            throw new IllegalStateException("Can only be set when the server is not running");
+        }
+        this.bossWorker = bossWorker;
+    }
+
     @Override
     public synchronized void bind() throws Exception {
         if (started) {
@@ -102,10 +123,12 @@ public abstract class AbstractAsyncServer implements ProtocolServer {
         ServerBootstrap bootstrap = new ServerBootstrap();
         bootstrap.channel(NioServerSocketChannel.class);
 
-        bossGroup = new NioEventLoopGroup(2, NamedThreadFactory.withName(jmxName + "-boss"));
+        bossGroup = bossWorker.map(count -> new NioEventLoopGroup(2, NamedThreadFactory.withName(jmxName + "-boss")));
         workerGroup = new NioEventLoopGroup(ioWorker, NamedThreadFactory.withName(jmxName + "-io"));
 
-        bootstrap.group(bossGroup, workerGroup);
+        bossGroup.<Runnable>map(boss -> () -> bootstrap.group(boss, workerGroup))
+            .orElse(() -> bootstrap.group(workerGroup))
+            .run();
 
         ChannelInitializer<SocketChannel> factory = createPipelineFactory();
 
@@ -139,17 +162,14 @@ public abstract class AbstractAsyncServer implements ProtocolServer {
         }
 
         List<Future<?>> futures = new ArrayList<>();
-        if (bossGroup != null) {
-            futures.add(bossGroup.shutdownGracefully());
-        }
+
+        bossGroup.ifPresent(boss -> futures.add(boss.shutdownGracefully()));
 
         if (workerGroup != null) {
             futures.add(workerGroup.shutdownGracefully());
         }
 
-        if (channels != null) {
-            futures.add(channels.close());
-        }
+        futures.add(channels.close());
 
         if (gracefulShutdown) {
             futures.forEach(Throwing.<Future<?>>consumer(Future::await).sneakyThrow());
diff --git a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java
index 3935072f87..18c30622dc 100644
--- a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java
+++ b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java
@@ -166,6 +166,8 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe
         jmxName = config.getString("jmxName", getDefaultJMXName());
         int ioWorker = config.getInt("ioWorkerCount", DEFAULT_IO_WORKER_COUNT);
         setIoWorkerCount(ioWorker);
+        Integer bossWorker = config.getInteger("bossWorkerCount", null);
+        setBossWorkerCount(Optional.ofNullable(bossWorker));
 
         executorGroup = new DefaultEventExecutorGroup(config.getInt("maxExecutorCount", DEFAULT_MAX_EXECUTOR_COUNT),
             NamedThreadFactory.withName(jmxName));
@@ -445,7 +447,7 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe
     
     @Override
     protected AbstractChannelPipelineFactory createPipelineFactory() {
-        return new AbstractSSLAwareChannelPipelineFactory(getTimeout(), connectionLimit, connPerIP,
+        return new AbstractSSLAwareChannelPipelineFactory<>(getTimeout(), connectionLimit, connPerIP,
             getEncryption(), getFrameHandlerFactory(), getExecutorGroup()) {
 
             @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org