You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/03/22 02:01:37 UTC

[james-project] 06/29: JAMES-3715 Execute core handlers outside of the event loop

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 92dcc05469b17f014fc72711ec0c3269b4160b7b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Mar 7 09:28:31 2022 +0700

    JAMES-3715 Execute core handlers outside of the event loop
    
    WIP SMTP pipelining is currently brokem
---
 .../protocols/netty/AbstractChannelPipelineFactory.java  | 13 ++++++++-----
 .../netty/AbstractSSLAwareChannelPipelineFactory.java    | 10 ++++++----
 .../protocols/netty/BasicChannelUpstreamHandler.java     | 11 +++++++----
 .../james/protocols/netty/NettyProtocolTransport.java    |  7 +++++--
 .../org/apache/james/protocols/netty/NettyServer.java    |  8 +++++---
 .../org/apache/james/imapserver/netty/IMAPServer.java    | 10 +++++-----
 .../lib/netty/AbstractConfigurableAsyncServer.java       | 16 ++++++++++++----
 .../AbstractExecutorAwareChannelPipelineFactory.java     |  5 +++--
 .../org/apache/james/lmtpserver/netty/LMTPServer.java    |  2 +-
 .../james/managesieveserver/netty/ManageSieveServer.java | 14 +++++++-------
 .../org/apache/james/pop3server/netty/POP3Server.java    |  2 +-
 .../smtpserver/netty/SMTPChannelUpstreamHandler.java     |  9 +++++----
 .../org/apache/james/smtpserver/netty/SMTPServer.java    |  2 +-
 13 files changed, 66 insertions(+), 43 deletions(-)

diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java
index 32049af..db4a1bd 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java
@@ -25,6 +25,7 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 /**
  * Abstract base class for {@link ChannelInitializer} implementations
@@ -38,19 +39,21 @@ public abstract class AbstractChannelPipelineFactory<C extends SocketChannel> ex
     private final ChannelGroupHandler groupHandler;
     private final int timeout;
     private final ChannelHandlerFactory frameHandlerFactory;
+    private final EventExecutorGroup eventExecutorGroup;
 
     public AbstractChannelPipelineFactory(ChannelGroup channels,
-                                          ChannelHandlerFactory frameHandlerFactory) {
-        this(0, 0, 0, channels, frameHandlerFactory);
+                                          ChannelHandlerFactory frameHandlerFactory, EventExecutorGroup eventExecutorGroup) {
+        this(0, 0, 0, channels, frameHandlerFactory, eventExecutorGroup);
     }
 
     public AbstractChannelPipelineFactory(int timeout, int maxConnections, int maxConnectsPerIp, ChannelGroup channels,
-                                          ChannelHandlerFactory frameHandlerFactory) {
+                                          ChannelHandlerFactory frameHandlerFactory, EventExecutorGroup eventExecutorGroup) {
         this.connectionLimitHandler = new ConnectionLimitUpstreamHandler(maxConnections);
         this.connectionPerIpLimitHandler = new ConnectionPerIpLimitUpstreamHandler(maxConnectsPerIp);
         this.groupHandler = new ChannelGroupHandler(channels);
         this.timeout = timeout;
         this.frameHandlerFactory = frameHandlerFactory;
+        this.eventExecutorGroup = eventExecutorGroup;
     }
     
     
@@ -66,13 +69,13 @@ public abstract class AbstractChannelPipelineFactory<C extends SocketChannel> ex
 
         
         // Add the text line decoder which limit the max line length, don't strip the delimiter and use CRLF as delimiter
-        pipeline.addLast(HandlerConstants.FRAMER, frameHandlerFactory.create(pipeline));
+        pipeline.addLast(eventExecutorGroup, HandlerConstants.FRAMER, frameHandlerFactory.create(pipeline));
        
         // Add the ChunkedWriteHandler to be able to write ChunkInput
         pipeline.addLast(HandlerConstants.CHUNK_HANDLER, new ChunkedWriteHandler());
         pipeline.addLast(HandlerConstants.TIMEOUT_HANDLER, new TimeoutHandler(timeout));
 
-        pipeline.addLast(HandlerConstants.CORE_HANDLER, createHandler());
+        pipeline.addLast(eventExecutorGroup, HandlerConstants.CORE_HANDLER, createHandler());
     }
 
     
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java
index e509ee0..f3399c9 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java
@@ -27,6 +27,7 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 
 /**
@@ -39,14 +40,15 @@ public abstract class AbstractSSLAwareChannelPipelineFactory<C extends SocketCha
 
     public AbstractSSLAwareChannelPipelineFactory(int timeout,
                                                   int maxConnections, int maxConnectsPerIp, ChannelGroup group,
-                                                  ChannelHandlerFactory frameHandlerFactory) {
-        super(timeout, maxConnections, maxConnectsPerIp, group, frameHandlerFactory);
+                                                  ChannelHandlerFactory frameHandlerFactory,
+                                                  EventExecutorGroup eventExecutorGroup) {
+        super(timeout, maxConnections, maxConnectsPerIp, group, frameHandlerFactory, eventExecutorGroup);
     }
 
     public AbstractSSLAwareChannelPipelineFactory(int timeout,
             int maxConnections, int maxConnectsPerIp, ChannelGroup group, Encryption secure,
-            ChannelHandlerFactory frameHandlerFactory) {
-        this(timeout, maxConnections, maxConnectsPerIp, group, frameHandlerFactory);
+            ChannelHandlerFactory frameHandlerFactory, EventExecutorGroup eventExecutorGroup) {
+        this(timeout, maxConnections, maxConnectsPerIp, group, frameHandlerFactory, eventExecutorGroup);
 
         this.secure = secure;
     }
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
index 08086a6..1cde815 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
@@ -51,6 +51,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 
 /**
@@ -67,16 +68,18 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
     protected final Protocol protocol;
     protected final ProtocolHandlerChain chain;
     protected final Encryption secure;
+    private final EventExecutorGroup eventExecutors;
 
-    public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol) {
-        this(mdcContextFactory, protocol, null);
+    public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, EventExecutorGroup eventExecutors) {
+        this(mdcContextFactory, protocol, null, eventExecutors);
     }
 
-    public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, Encryption secure) {
+    public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, Encryption secure, EventExecutorGroup eventExecutors) {
         this.mdcContextFactory = mdcContextFactory;
         this.protocol = protocol;
         this.chain = protocol.getProtocolChain();
         this.secure = secure;
+        this.eventExecutors = eventExecutors;
     }
 
 
@@ -197,7 +200,7 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
             engine = secure.createSSLEngine();
         }
 
-        return protocol.newSession(new NettyProtocolTransport(ctx.channel(), engine));
+        return protocol.newSession(new NettyProtocolTransport(ctx.channel(), engine, eventExecutors));
     }
 
     @Override
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
index 36d5ab8..bdeee67 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
@@ -38,6 +38,7 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.DefaultFileRegion;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedStream;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 
 /**
@@ -47,11 +48,13 @@ public class NettyProtocolTransport extends AbstractProtocolTransport {
     
     private final Channel channel;
     private final SSLEngine engine;
+    private final EventExecutorGroup eventExecutors;
     private int lineHandlerCount = 0;
     
-    public NettyProtocolTransport(Channel channel, SSLEngine engine) {
+    public NettyProtocolTransport(Channel channel, SSLEngine engine, EventExecutorGroup eventExecutors) {
         this.channel = channel;
         this.engine = engine;
+        this.eventExecutors = eventExecutors;
     }
 
     @Override
@@ -156,7 +159,7 @@ public class NettyProtocolTransport extends AbstractProtocolTransport {
         // it is executed with the same ExecutorHandler as the coreHandler (if one exist)
         // 
         // See JAMES-1277
-        channel.pipeline().addBefore(HandlerConstants.CORE_HANDLER, "lineHandler" + lineHandlerCount, new LineHandlerUpstreamHandler(session, overrideCommandHandler));
+        channel.pipeline().addBefore(eventExecutors, HandlerConstants.CORE_HANDLER, "lineHandler" + lineHandlerCount, new LineHandlerUpstreamHandler(session, overrideCommandHandler));
     }
     
    
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
index 35da9a9..e50a498 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
@@ -28,8 +28,9 @@ import org.apache.james.protocols.api.Protocol;
 import com.google.common.base.Preconditions;
 
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.DefaultEventLoopGroup;
 import io.netty.channel.group.ChannelGroup;
-
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
 
 
 /**
@@ -104,7 +105,7 @@ public class NettyServer extends AbstractAsyncServer {
     }
 
     protected ChannelInboundHandlerAdapter createCoreHandler() {
-        return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, secure);
+        return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, secure, new DefaultEventExecutorGroup(2));
     }
     
     @Override
@@ -126,7 +127,8 @@ public class NettyServer extends AbstractAsyncServer {
             maxCurConnectionsPerIP,
             group,
             secure,
-            getFrameHandlerFactory()) {
+            getFrameHandlerFactory(),
+            new DefaultEventLoopGroup(16)) {
 
             @Override
             protected ChannelInboundHandlerAdapter createHandler() {
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
index 7896835..649f4a8 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
@@ -211,7 +211,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC
     @Override
     protected AbstractChannelPipelineFactory createPipelineFactory(final ChannelGroup group) {
         
-        return new AbstractChannelPipelineFactory(group, getFrameHandlerFactory()) {
+        return new AbstractChannelPipelineFactory(group, getFrameHandlerFactory(), getExecutorGroup()) {
 
             @Override
             protected ChannelInboundHandlerAdapter createHandler() {
@@ -235,7 +235,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC
                 // Add the text line decoder which limit the max line length,
                 // don't strip the delimiter and use CRLF as delimiter
                 // Use a SwitchableDelimiterBasedFrameDecoder, see JAMES-1436
-                pipeline.addLast(FRAMER, getFrameHandlerFactory().create(pipeline));
+                pipeline.addLast(getExecutorGroup(), FRAMER, getFrameHandlerFactory().create(pipeline));
                
                 Encryption secure = getEncryption();
                 if (secure != null && !secure.isStartTLS()) {
@@ -248,11 +248,11 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC
                 }
                 pipeline.addLast(CONNECTION_COUNT_HANDLER, getConnectionCountHandler());
 
-                pipeline.addLast(CHUNK_WRITE_HANDLER, new ChunkedWriteHandler());
+                pipeline.addLast(getExecutorGroup(), CHUNK_WRITE_HANDLER, new ChunkedWriteHandler());
 
-                pipeline.addLast(REQUEST_DECODER, new ImapRequestFrameDecoder(decoder, inMemorySizeLimit, literalSizeLimit));
+                pipeline.addLast(getExecutorGroup(), REQUEST_DECODER, new ImapRequestFrameDecoder(decoder, inMemorySizeLimit, literalSizeLimit));
 
-                pipeline.addLast(CORE_HANDLER, createHandler());
+                pipeline.addLast(getExecutorGroup(), CORE_HANDLER, createCoreHandler());
             }
 
         };
diff --git a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java
index 1ccbd84..1da9ce6 100644
--- a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java
+++ b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java
@@ -48,6 +48,7 @@ import org.apache.james.protocols.lib.jmx.ServerMBean;
 import org.apache.james.protocols.netty.AbstractAsyncServer;
 import org.apache.james.protocols.netty.AbstractChannelPipelineFactory;
 import org.apache.james.protocols.netty.ChannelHandlerFactory;
+import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +56,8 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
 import nl.altindag.ssl.SSLFactory;
 import nl.altindag.ssl.util.PemUtils;
 
@@ -118,7 +121,7 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe
 
     private ChannelHandlerFactory frameHandlerFactory;
 
-    private int maxExecutorThreads;
+    private EventExecutorGroup executorGroup;
 
     private MBeanServer mbeanServer;
 
@@ -189,8 +192,8 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe
         int ioWorker = config.getInt("ioWorkerCount", DEFAULT_IO_WORKER_COUNT);
         setIoWorkerCount(ioWorker);
 
-        maxExecutorThreads = config.getInt("maxExecutorCount", DEFAULT_MAX_EXECUTOR_COUNT);
-
+        executorGroup = new DefaultEventExecutorGroup(config.getInt("maxExecutorCount", DEFAULT_MAX_EXECUTOR_COUNT),
+            NamedThreadFactory.withName(jmxName));
         
         configureHelloName(config);
 
@@ -266,6 +269,10 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe
 
     }
 
+    protected EventExecutorGroup getExecutorGroup() {
+        return executorGroup;
+    }
+
     @PostConstruct
     public final void init() throws Exception {
 
@@ -308,6 +315,7 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe
         if (isEnabled()) {
             unbind();
             postDestroy();
+            executorGroup.shutdownGracefully();
 
             unregisterMBean();
         }
@@ -550,7 +558,7 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe
     @Override
     protected AbstractChannelPipelineFactory createPipelineFactory(ChannelGroup group) {
         return new AbstractExecutorAwareChannelPipelineFactory(getTimeout(), connectionLimit, connPerIP, group,
-            getEncryption(), getFrameHandlerFactory()) {
+            getEncryption(), getFrameHandlerFactory(), getExecutorGroup()) {
 
             @Override
             protected ChannelInboundHandlerAdapter createHandler() {
diff --git a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java
index a4271eb..4890f3c 100644
--- a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java
+++ b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java
@@ -24,6 +24,7 @@ import org.apache.james.protocols.netty.ChannelHandlerFactory;
 
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 /**
  * Abstract base class which should get used if you MAY need an {@link ExecutionHandler}
@@ -35,8 +36,8 @@ public abstract class AbstractExecutorAwareChannelPipelineFactory extends Abstra
 
     public AbstractExecutorAwareChannelPipelineFactory(int timeout, int maxConnections, int maxConnectsPerIp,
                                                        ChannelGroup group, Encryption encryption,
-                                                       ChannelHandlerFactory frameHandlerFactory) {
-        super(timeout, maxConnections, maxConnectsPerIp, group, encryption, frameHandlerFactory);
+                                                       ChannelHandlerFactory frameHandlerFactory, EventExecutorGroup eventExecutorGroup) {
+        super(timeout, maxConnections, maxConnectsPerIp, group, encryption, frameHandlerFactory, eventExecutorGroup);
     }
     
     /**
diff --git a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
index 7899a7a..0596813 100644
--- a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
+++ b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
@@ -142,7 +142,7 @@ public class LMTPServer extends AbstractProtocolAsyncServer implements LMTPServe
     @Override
     protected ChannelInboundHandlerAdapter createCoreHandler() {
         SMTPProtocol protocol = new SMTPProtocol(getProtocolHandlerChain(), lmtpConfig);
-        return new SMTPChannelUpstreamHandler(protocol, lmtpMetrics);
+        return new SMTPChannelUpstreamHandler(protocol, lmtpMetrics, getExecutorGroup());
     }
 
     @Override
diff --git a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java
index e9f35fa..294ddc7 100644
--- a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java
+++ b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java
@@ -86,7 +86,7 @@ public class ManageSieveServer extends AbstractConfigurableAsyncServer implement
     @Override
     protected AbstractChannelPipelineFactory createPipelineFactory(final ChannelGroup group) {
 
-        return new AbstractChannelPipelineFactory(group, createFrameHandlerFactory()) {
+        return new AbstractChannelPipelineFactory(group, createFrameHandlerFactory(), getExecutorGroup()) {
 
             @Override
             protected ChannelInboundHandlerAdapter createHandler() {
@@ -113,13 +113,13 @@ public class ManageSieveServer extends AbstractConfigurableAsyncServer implement
                 // Add the text line decoder which limit the max line length,
                 // don't strip the delimiter and use CRLF as delimiter
                 // Use a SwitchableDelimiterBasedFrameDecoder, see JAMES-1436
-                pipeline.addLast(FRAMER, getFrameHandlerFactory().create(pipeline));
-                pipeline.addLast(CONNECTION_COUNT_HANDLER, getConnectionCountHandler());
-                pipeline.addLast(CHUNK_WRITE_HANDLER, new ChunkedWriteHandler());
+                pipeline.addLast(getExecutorGroup(), FRAMER, getFrameHandlerFactory().create(pipeline));
+                pipeline.addLast(getExecutorGroup(), CONNECTION_COUNT_HANDLER, getConnectionCountHandler());
+                pipeline.addLast(getExecutorGroup(), CHUNK_WRITE_HANDLER, new ChunkedWriteHandler());
 
-                pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
-                pipeline.addLast(CORE_HANDLER, createHandler());
-                pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
+                pipeline.addLast(getExecutorGroup(), "stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
+                pipeline.addLast(getExecutorGroup(), CORE_HANDLER, createHandler());
+                pipeline.addLast(getExecutorGroup(), "stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
             }
 
         };
diff --git a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
index 242874a..f05ec6c 100644
--- a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
+++ b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
@@ -77,7 +77,7 @@ public class POP3Server extends AbstractProtocolAsyncServer implements POP3Serve
     protected void preInit() throws Exception {
         super.preInit();
         POP3Protocol protocol = new POP3Protocol(getProtocolHandlerChain(), theConfigData);
-        coreHandler = new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, getEncryption());
+        coreHandler = new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, getEncryption(), getExecutorGroup());
     }
 
     @Override
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
index 8152716..5379db5 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
@@ -29,6 +29,7 @@ import org.apache.james.smtpserver.SMTPConstants;
 
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 /**
  * {@link BasicChannelUpstreamHandler} which is used by the SMTPServer
@@ -38,13 +39,13 @@ public class SMTPChannelUpstreamHandler extends BasicChannelUpstreamHandler {
 
     private final SmtpMetrics smtpMetrics;
 
-    public SMTPChannelUpstreamHandler(Protocol protocol, Encryption encryption, SmtpMetrics smtpMetrics) {
-        super(new SMTPMDCContextFactory(), protocol, encryption);
+    public SMTPChannelUpstreamHandler(Protocol protocol, Encryption encryption, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) {
+        super(new SMTPMDCContextFactory(), protocol, encryption, eventExecutorGroup);
         this.smtpMetrics = smtpMetrics;
     }
 
-    public SMTPChannelUpstreamHandler(Protocol protocol, SmtpMetrics smtpMetrics) {
-        super(new SMTPMDCContextFactory(), protocol);
+    public SMTPChannelUpstreamHandler(Protocol protocol, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) {
+        super(new SMTPMDCContextFactory(), protocol, eventExecutorGroup);
         this.smtpMetrics = smtpMetrics;
     }
 
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
index 65d8904..83f1534 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
@@ -219,7 +219,7 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe
             }
             
         };
-        coreHandler = new SMTPChannelUpstreamHandler(transport, getEncryption(), smtpMetrics);
+        coreHandler = new SMTPChannelUpstreamHandler(transport, getEncryption(), smtpMetrics, getExecutorGroup());
     }
 
     @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org