You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/03/22 02:01:37 UTC
[james-project] 06/29: JAMES-3715 Execute core handlers outside of the event loop
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 92dcc05469b17f014fc72711ec0c3269b4160b7b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Mar 7 09:28:31 2022 +0700
JAMES-3715 Execute core handlers outside of the event loop
WIP SMTP pipelining is currently brokem
---
.../protocols/netty/AbstractChannelPipelineFactory.java | 13 ++++++++-----
.../netty/AbstractSSLAwareChannelPipelineFactory.java | 10 ++++++----
.../protocols/netty/BasicChannelUpstreamHandler.java | 11 +++++++----
.../james/protocols/netty/NettyProtocolTransport.java | 7 +++++--
.../org/apache/james/protocols/netty/NettyServer.java | 8 +++++---
.../org/apache/james/imapserver/netty/IMAPServer.java | 10 +++++-----
.../lib/netty/AbstractConfigurableAsyncServer.java | 16 ++++++++++++----
.../AbstractExecutorAwareChannelPipelineFactory.java | 5 +++--
.../org/apache/james/lmtpserver/netty/LMTPServer.java | 2 +-
.../james/managesieveserver/netty/ManageSieveServer.java | 14 +++++++-------
.../org/apache/james/pop3server/netty/POP3Server.java | 2 +-
.../smtpserver/netty/SMTPChannelUpstreamHandler.java | 9 +++++----
.../org/apache/james/smtpserver/netty/SMTPServer.java | 2 +-
13 files changed, 66 insertions(+), 43 deletions(-)
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java
index 32049af..db4a1bd 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java
@@ -25,6 +25,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.concurrent.EventExecutorGroup;
/**
* Abstract base class for {@link ChannelInitializer} implementations
@@ -38,19 +39,21 @@ public abstract class AbstractChannelPipelineFactory<C extends SocketChannel> ex
private final ChannelGroupHandler groupHandler;
private final int timeout;
private final ChannelHandlerFactory frameHandlerFactory;
+ private final EventExecutorGroup eventExecutorGroup;
public AbstractChannelPipelineFactory(ChannelGroup channels,
- ChannelHandlerFactory frameHandlerFactory) {
- this(0, 0, 0, channels, frameHandlerFactory);
+ ChannelHandlerFactory frameHandlerFactory, EventExecutorGroup eventExecutorGroup) {
+ this(0, 0, 0, channels, frameHandlerFactory, eventExecutorGroup);
}
public AbstractChannelPipelineFactory(int timeout, int maxConnections, int maxConnectsPerIp, ChannelGroup channels,
- ChannelHandlerFactory frameHandlerFactory) {
+ ChannelHandlerFactory frameHandlerFactory, EventExecutorGroup eventExecutorGroup) {
this.connectionLimitHandler = new ConnectionLimitUpstreamHandler(maxConnections);
this.connectionPerIpLimitHandler = new ConnectionPerIpLimitUpstreamHandler(maxConnectsPerIp);
this.groupHandler = new ChannelGroupHandler(channels);
this.timeout = timeout;
this.frameHandlerFactory = frameHandlerFactory;
+ this.eventExecutorGroup = eventExecutorGroup;
}
@@ -66,13 +69,13 @@ public abstract class AbstractChannelPipelineFactory<C extends SocketChannel> ex
// Add the text line decoder which limit the max line length, don't strip the delimiter and use CRLF as delimiter
- pipeline.addLast(HandlerConstants.FRAMER, frameHandlerFactory.create(pipeline));
+ pipeline.addLast(eventExecutorGroup, HandlerConstants.FRAMER, frameHandlerFactory.create(pipeline));
// Add the ChunkedWriteHandler to be able to write ChunkInput
pipeline.addLast(HandlerConstants.CHUNK_HANDLER, new ChunkedWriteHandler());
pipeline.addLast(HandlerConstants.TIMEOUT_HANDLER, new TimeoutHandler(timeout));
- pipeline.addLast(HandlerConstants.CORE_HANDLER, createHandler());
+ pipeline.addLast(eventExecutorGroup, HandlerConstants.CORE_HANDLER, createHandler());
}
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java
index e509ee0..f3399c9 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractSSLAwareChannelPipelineFactory.java
@@ -27,6 +27,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.EventExecutorGroup;
/**
@@ -39,14 +40,15 @@ public abstract class AbstractSSLAwareChannelPipelineFactory<C extends SocketCha
public AbstractSSLAwareChannelPipelineFactory(int timeout,
int maxConnections, int maxConnectsPerIp, ChannelGroup group,
- ChannelHandlerFactory frameHandlerFactory) {
- super(timeout, maxConnections, maxConnectsPerIp, group, frameHandlerFactory);
+ ChannelHandlerFactory frameHandlerFactory,
+ EventExecutorGroup eventExecutorGroup) {
+ super(timeout, maxConnections, maxConnectsPerIp, group, frameHandlerFactory, eventExecutorGroup);
}
public AbstractSSLAwareChannelPipelineFactory(int timeout,
int maxConnections, int maxConnectsPerIp, ChannelGroup group, Encryption secure,
- ChannelHandlerFactory frameHandlerFactory) {
- this(timeout, maxConnections, maxConnectsPerIp, group, frameHandlerFactory);
+ ChannelHandlerFactory frameHandlerFactory, EventExecutorGroup eventExecutorGroup) {
+ this(timeout, maxConnections, maxConnectsPerIp, group, frameHandlerFactory, eventExecutorGroup);
this.secure = secure;
}
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
index 08086a6..1cde815 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
@@ -51,6 +51,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.EventExecutorGroup;
/**
@@ -67,16 +68,18 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
protected final Protocol protocol;
protected final ProtocolHandlerChain chain;
protected final Encryption secure;
+ private final EventExecutorGroup eventExecutors;
- public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol) {
- this(mdcContextFactory, protocol, null);
+ public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, EventExecutorGroup eventExecutors) {
+ this(mdcContextFactory, protocol, null, eventExecutors);
}
- public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, Encryption secure) {
+ public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, Encryption secure, EventExecutorGroup eventExecutors) {
this.mdcContextFactory = mdcContextFactory;
this.protocol = protocol;
this.chain = protocol.getProtocolChain();
this.secure = secure;
+ this.eventExecutors = eventExecutors;
}
@@ -197,7 +200,7 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
engine = secure.createSSLEngine();
}
- return protocol.newSession(new NettyProtocolTransport(ctx.channel(), engine));
+ return protocol.newSession(new NettyProtocolTransport(ctx.channel(), engine, eventExecutors));
}
@Override
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
index 36d5ab8..bdeee67 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
@@ -38,6 +38,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.DefaultFileRegion;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedStream;
+import io.netty.util.concurrent.EventExecutorGroup;
/**
@@ -47,11 +48,13 @@ public class NettyProtocolTransport extends AbstractProtocolTransport {
private final Channel channel;
private final SSLEngine engine;
+ private final EventExecutorGroup eventExecutors;
private int lineHandlerCount = 0;
- public NettyProtocolTransport(Channel channel, SSLEngine engine) {
+ public NettyProtocolTransport(Channel channel, SSLEngine engine, EventExecutorGroup eventExecutors) {
this.channel = channel;
this.engine = engine;
+ this.eventExecutors = eventExecutors;
}
@Override
@@ -156,7 +159,7 @@ public class NettyProtocolTransport extends AbstractProtocolTransport {
// it is executed with the same ExecutorHandler as the coreHandler (if one exist)
//
// See JAMES-1277
- channel.pipeline().addBefore(HandlerConstants.CORE_HANDLER, "lineHandler" + lineHandlerCount, new LineHandlerUpstreamHandler(session, overrideCommandHandler));
+ channel.pipeline().addBefore(eventExecutors, HandlerConstants.CORE_HANDLER, "lineHandler" + lineHandlerCount, new LineHandlerUpstreamHandler(session, overrideCommandHandler));
}
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
index 35da9a9..e50a498 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
@@ -28,8 +28,9 @@ import org.apache.james.protocols.api.Protocol;
import com.google.common.base.Preconditions;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.group.ChannelGroup;
-
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
/**
@@ -104,7 +105,7 @@ public class NettyServer extends AbstractAsyncServer {
}
protected ChannelInboundHandlerAdapter createCoreHandler() {
- return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, secure);
+ return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, secure, new DefaultEventExecutorGroup(2));
}
@Override
@@ -126,7 +127,8 @@ public class NettyServer extends AbstractAsyncServer {
maxCurConnectionsPerIP,
group,
secure,
- getFrameHandlerFactory()) {
+ getFrameHandlerFactory(),
+ new DefaultEventLoopGroup(16)) {
@Override
protected ChannelInboundHandlerAdapter createHandler() {
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
index 7896835..649f4a8 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
@@ -211,7 +211,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC
@Override
protected AbstractChannelPipelineFactory createPipelineFactory(final ChannelGroup group) {
- return new AbstractChannelPipelineFactory(group, getFrameHandlerFactory()) {
+ return new AbstractChannelPipelineFactory(group, getFrameHandlerFactory(), getExecutorGroup()) {
@Override
protected ChannelInboundHandlerAdapter createHandler() {
@@ -235,7 +235,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC
// Add the text line decoder which limit the max line length,
// don't strip the delimiter and use CRLF as delimiter
// Use a SwitchableDelimiterBasedFrameDecoder, see JAMES-1436
- pipeline.addLast(FRAMER, getFrameHandlerFactory().create(pipeline));
+ pipeline.addLast(getExecutorGroup(), FRAMER, getFrameHandlerFactory().create(pipeline));
Encryption secure = getEncryption();
if (secure != null && !secure.isStartTLS()) {
@@ -248,11 +248,11 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC
}
pipeline.addLast(CONNECTION_COUNT_HANDLER, getConnectionCountHandler());
- pipeline.addLast(CHUNK_WRITE_HANDLER, new ChunkedWriteHandler());
+ pipeline.addLast(getExecutorGroup(), CHUNK_WRITE_HANDLER, new ChunkedWriteHandler());
- pipeline.addLast(REQUEST_DECODER, new ImapRequestFrameDecoder(decoder, inMemorySizeLimit, literalSizeLimit));
+ pipeline.addLast(getExecutorGroup(), REQUEST_DECODER, new ImapRequestFrameDecoder(decoder, inMemorySizeLimit, literalSizeLimit));
- pipeline.addLast(CORE_HANDLER, createHandler());
+ pipeline.addLast(getExecutorGroup(), CORE_HANDLER, createCoreHandler());
}
};
diff --git a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java
index 1ccbd84..1da9ce6 100644
--- a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java
+++ b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractConfigurableAsyncServer.java
@@ -48,6 +48,7 @@ import org.apache.james.protocols.lib.jmx.ServerMBean;
import org.apache.james.protocols.netty.AbstractAsyncServer;
import org.apache.james.protocols.netty.AbstractChannelPipelineFactory;
import org.apache.james.protocols.netty.ChannelHandlerFactory;
+import org.apache.james.util.concurrent.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,8 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
import nl.altindag.ssl.SSLFactory;
import nl.altindag.ssl.util.PemUtils;
@@ -118,7 +121,7 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe
private ChannelHandlerFactory frameHandlerFactory;
- private int maxExecutorThreads;
+ private EventExecutorGroup executorGroup;
private MBeanServer mbeanServer;
@@ -189,8 +192,8 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe
int ioWorker = config.getInt("ioWorkerCount", DEFAULT_IO_WORKER_COUNT);
setIoWorkerCount(ioWorker);
- maxExecutorThreads = config.getInt("maxExecutorCount", DEFAULT_MAX_EXECUTOR_COUNT);
-
+ executorGroup = new DefaultEventExecutorGroup(config.getInt("maxExecutorCount", DEFAULT_MAX_EXECUTOR_COUNT),
+ NamedThreadFactory.withName(jmxName));
configureHelloName(config);
@@ -266,6 +269,10 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe
}
+ protected EventExecutorGroup getExecutorGroup() {
+ return executorGroup;
+ }
+
@PostConstruct
public final void init() throws Exception {
@@ -308,6 +315,7 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe
if (isEnabled()) {
unbind();
postDestroy();
+ executorGroup.shutdownGracefully();
unregisterMBean();
}
@@ -550,7 +558,7 @@ public abstract class AbstractConfigurableAsyncServer extends AbstractAsyncServe
@Override
protected AbstractChannelPipelineFactory createPipelineFactory(ChannelGroup group) {
return new AbstractExecutorAwareChannelPipelineFactory(getTimeout(), connectionLimit, connPerIP, group,
- getEncryption(), getFrameHandlerFactory()) {
+ getEncryption(), getFrameHandlerFactory(), getExecutorGroup()) {
@Override
protected ChannelInboundHandlerAdapter createHandler() {
diff --git a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java
index a4271eb..4890f3c 100644
--- a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java
+++ b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/AbstractExecutorAwareChannelPipelineFactory.java
@@ -24,6 +24,7 @@ import org.apache.james.protocols.netty.ChannelHandlerFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
/**
* Abstract base class which should get used if you MAY need an {@link ExecutionHandler}
@@ -35,8 +36,8 @@ public abstract class AbstractExecutorAwareChannelPipelineFactory extends Abstra
public AbstractExecutorAwareChannelPipelineFactory(int timeout, int maxConnections, int maxConnectsPerIp,
ChannelGroup group, Encryption encryption,
- ChannelHandlerFactory frameHandlerFactory) {
- super(timeout, maxConnections, maxConnectsPerIp, group, encryption, frameHandlerFactory);
+ ChannelHandlerFactory frameHandlerFactory, EventExecutorGroup eventExecutorGroup) {
+ super(timeout, maxConnections, maxConnectsPerIp, group, encryption, frameHandlerFactory, eventExecutorGroup);
}
/**
diff --git a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
index 7899a7a..0596813 100644
--- a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
+++ b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
@@ -142,7 +142,7 @@ public class LMTPServer extends AbstractProtocolAsyncServer implements LMTPServe
@Override
protected ChannelInboundHandlerAdapter createCoreHandler() {
SMTPProtocol protocol = new SMTPProtocol(getProtocolHandlerChain(), lmtpConfig);
- return new SMTPChannelUpstreamHandler(protocol, lmtpMetrics);
+ return new SMTPChannelUpstreamHandler(protocol, lmtpMetrics, getExecutorGroup());
}
@Override
diff --git a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java
index e9f35fa..294ddc7 100644
--- a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java
+++ b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java
@@ -86,7 +86,7 @@ public class ManageSieveServer extends AbstractConfigurableAsyncServer implement
@Override
protected AbstractChannelPipelineFactory createPipelineFactory(final ChannelGroup group) {
- return new AbstractChannelPipelineFactory(group, createFrameHandlerFactory()) {
+ return new AbstractChannelPipelineFactory(group, createFrameHandlerFactory(), getExecutorGroup()) {
@Override
protected ChannelInboundHandlerAdapter createHandler() {
@@ -113,13 +113,13 @@ public class ManageSieveServer extends AbstractConfigurableAsyncServer implement
// Add the text line decoder which limit the max line length,
// don't strip the delimiter and use CRLF as delimiter
// Use a SwitchableDelimiterBasedFrameDecoder, see JAMES-1436
- pipeline.addLast(FRAMER, getFrameHandlerFactory().create(pipeline));
- pipeline.addLast(CONNECTION_COUNT_HANDLER, getConnectionCountHandler());
- pipeline.addLast(CHUNK_WRITE_HANDLER, new ChunkedWriteHandler());
+ pipeline.addLast(getExecutorGroup(), FRAMER, getFrameHandlerFactory().create(pipeline));
+ pipeline.addLast(getExecutorGroup(), CONNECTION_COUNT_HANDLER, getConnectionCountHandler());
+ pipeline.addLast(getExecutorGroup(), CHUNK_WRITE_HANDLER, new ChunkedWriteHandler());
- pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
- pipeline.addLast(CORE_HANDLER, createHandler());
- pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
+ pipeline.addLast(getExecutorGroup(), "stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
+ pipeline.addLast(getExecutorGroup(), CORE_HANDLER, createHandler());
+ pipeline.addLast(getExecutorGroup(), "stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
}
};
diff --git a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
index 242874a..f05ec6c 100644
--- a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
+++ b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
@@ -77,7 +77,7 @@ public class POP3Server extends AbstractProtocolAsyncServer implements POP3Serve
protected void preInit() throws Exception {
super.preInit();
POP3Protocol protocol = new POP3Protocol(getProtocolHandlerChain(), theConfigData);
- coreHandler = new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, getEncryption());
+ coreHandler = new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, getEncryption(), getExecutorGroup());
}
@Override
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
index 8152716..5379db5 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
@@ -29,6 +29,7 @@ import org.apache.james.smtpserver.SMTPConstants;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.EventExecutorGroup;
/**
* {@link BasicChannelUpstreamHandler} which is used by the SMTPServer
@@ -38,13 +39,13 @@ public class SMTPChannelUpstreamHandler extends BasicChannelUpstreamHandler {
private final SmtpMetrics smtpMetrics;
- public SMTPChannelUpstreamHandler(Protocol protocol, Encryption encryption, SmtpMetrics smtpMetrics) {
- super(new SMTPMDCContextFactory(), protocol, encryption);
+ public SMTPChannelUpstreamHandler(Protocol protocol, Encryption encryption, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) {
+ super(new SMTPMDCContextFactory(), protocol, encryption, eventExecutorGroup);
this.smtpMetrics = smtpMetrics;
}
- public SMTPChannelUpstreamHandler(Protocol protocol, SmtpMetrics smtpMetrics) {
- super(new SMTPMDCContextFactory(), protocol);
+ public SMTPChannelUpstreamHandler(Protocol protocol, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) {
+ super(new SMTPMDCContextFactory(), protocol, eventExecutorGroup);
this.smtpMetrics = smtpMetrics;
}
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
index 65d8904..83f1534 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
@@ -219,7 +219,7 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe
}
};
- coreHandler = new SMTPChannelUpstreamHandler(transport, getEncryption(), smtpMetrics);
+ coreHandler = new SMTPChannelUpstreamHandler(transport, getEncryption(), smtpMetrics, getExecutorGroup());
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org