You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/03/22 02:01:38 UTC
[james-project] 07/29: JAMES-3715 Fix SMTP pipelining
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 43205f9c2357516d95b8ff4405f9e6746727b14f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Mar 7 14:48:32 2022 +0700
JAMES-3715 Fix SMTP pipelining
Concurrent modifications of the pipeline are not safe, embed the line handler
logic into the core handler to avoid bad surprises...
---
.../james/protocols/api/ProtocolSession.java | 6 ----
.../james/protocols/api/ProtocolSessionImpl.java | 5 ---
.../james/protocols/api/ProtocolTransport.java | 5 ---
.../api/AbstractProtocolTransportTest.java | 5 ---
.../netty/BasicChannelUpstreamHandler.java | 39 ++++++++++++++++------
.../james/protocols/netty/LineHandlerAware.java | 7 ++++
.../protocols/netty/NettyProtocolTransport.java | 27 ++++-----------
.../apache/james/protocols/netty/NettyServer.java | 12 ++-----
.../protocols/smtp/utils/BaseFakeSMTPSession.java | 5 ---
.../apache/james/pop3server/netty/POP3Server.java | 9 +++--
.../netty/SMTPChannelUpstreamHandler.java | 6 ++--
.../apache/james/smtpserver/netty/SMTPServer.java | 10 ++----
testing/base/src/main/resources/logback-test.xml | 5 +--
13 files changed, 54 insertions(+), 87 deletions(-)
diff --git a/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSession.java b/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSession.java
index a9dc57e..9410124 100644
--- a/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSession.java
+++ b/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSession.java
@@ -231,10 +231,4 @@ public interface ProtocolSession extends CommandDetectionSession {
* Pop the last command handler
*/
void popLineHandler();
-
- /**
- * Return the size of the pushed {@link LineHandler}
- * @return size of the pushed line handler
- */
- int getPushedLineHandlerCount();
}
diff --git a/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSessionImpl.java b/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSessionImpl.java
index 1e88275..0750f82 100644
--- a/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSessionImpl.java
+++ b/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSessionImpl.java
@@ -213,11 +213,6 @@ public class ProtocolSessionImpl implements ProtocolSession {
}
@Override
- public int getPushedLineHandlerCount() {
- return transport.getPushedLineHandlerCount();
- }
-
- @Override
public <T extends ProtocolSession> void pushLineHandler(LineHandler<T> overrideCommandHandler) {
transport.pushLineHandler(overrideCommandHandler, this);
}
diff --git a/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolTransport.java b/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolTransport.java
index add23e8..09f5377 100644
--- a/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolTransport.java
+++ b/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolTransport.java
@@ -73,11 +73,6 @@ public interface ProtocolTransport {
* Push a {@link LineHandler} in.
*/
void pushLineHandler(LineHandler<? extends ProtocolSession> overrideCommandHandler, ProtocolSession session);
-
- /**
- * Return the count of pushed {@link LineHandler}'s
- */
- int getPushedLineHandlerCount();
/**
diff --git a/protocols/api/src/test/java/org/apache/james/protocols/api/AbstractProtocolTransportTest.java b/protocols/api/src/test/java/org/apache/james/protocols/api/AbstractProtocolTransportTest.java
index 7d6a91a..f6c4933 100644
--- a/protocols/api/src/test/java/org/apache/james/protocols/api/AbstractProtocolTransportTest.java
+++ b/protocols/api/src/test/java/org/apache/james/protocols/api/AbstractProtocolTransportTest.java
@@ -92,11 +92,6 @@ public class AbstractProtocolTransportTest {
}
@Override
- public int getPushedLineHandlerCount() {
- throw new UnsupportedOperationException();
- }
-
- @Override
public InetSocketAddress getLocalAddress() {
throw new UnsupportedOperationException();
}
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
index 1cde815..8534a99 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
@@ -22,9 +22,11 @@ import static org.apache.james.protocols.api.ProtocolSession.State.Connection;
import java.io.Closeable;
import java.nio.channels.ClosedChannelException;
+import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedDeque;
import javax.net.ssl.SSLEngine;
@@ -44,21 +46,19 @@ import org.apache.james.util.MDCBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Iterables;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.AttributeKey;
-import io.netty.util.concurrent.EventExecutorGroup;
-
/**
* {@link ChannelInboundHandlerAdapter} which is used by the SMTPServer and other line based protocols
*/
-@Sharable
-public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
+public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter implements LineHandlerAware {
private static final Logger LOGGER = LoggerFactory.getLogger(BasicChannelUpstreamHandler.class);
public static final ProtocolSession.AttachmentKey<MDCBuilder> MDC_ATTRIBUTE_KEY = ProtocolSession.AttachmentKey.of("bound_MDC", MDCBuilder.class);
public static final AttributeKey<CommandDetectionSession> SESSION_ATTRIBUTE_KEY =
@@ -68,18 +68,17 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
protected final Protocol protocol;
protected final ProtocolHandlerChain chain;
protected final Encryption secure;
- private final EventExecutorGroup eventExecutors;
+ private final Deque<LineHandlerUpstreamHandler> behaviourOverrides = new ConcurrentLinkedDeque<>();
- public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, EventExecutorGroup eventExecutors) {
- this(mdcContextFactory, protocol, null, eventExecutors);
+ public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol) {
+ this(mdcContextFactory, protocol, null);
}
- public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, Encryption secure, EventExecutorGroup eventExecutors) {
+ public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, Encryption secure) {
this.mdcContextFactory = mdcContextFactory;
this.protocol = protocol;
this.chain = protocol.getProtocolChain();
this.secure = secure;
- this.eventExecutors = eventExecutors;
}
@@ -151,6 +150,12 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ LineHandlerUpstreamHandler override = Iterables.getFirst(behaviourOverrides, null);
+ if (override != null) {
+ override.channelRead(ctx, msg);
+ return;
+ }
+
try (Closeable closeable = mdc(ctx).build()) {
ProtocolSession pSession = (ProtocolSession) ctx.channel().attr(SESSION_ATTRIBUTE_KEY).get();
LinkedList<LineHandler> lineHandlers = chain.getHandlers(LineHandler.class);
@@ -200,7 +205,7 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
engine = secure.createSSLEngine();
}
- return protocol.newSession(new NettyProtocolTransport(ctx.channel(), engine, eventExecutors));
+ return protocol.newSession(new NettyProtocolTransport(ctx.channel(), engine));
}
@Override
@@ -234,4 +239,16 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
}
}
+ @Override
+ public void pushLineHandler(LineHandlerUpstreamHandler lineHandlerUpstreamHandler) {
+ behaviourOverrides.addFirst(lineHandlerUpstreamHandler);
+ }
+
+ @Override
+ public void popLineHandler() {
+ if (!behaviourOverrides.isEmpty()) {
+ behaviourOverrides.removeFirst();
+ }
+ }
+
}
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/LineHandlerAware.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/LineHandlerAware.java
new file mode 100644
index 0000000..516dadf
--- /dev/null
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/LineHandlerAware.java
@@ -0,0 +1,7 @@
+package org.apache.james.protocols.netty;
+
+public interface LineHandlerAware {
+ void pushLineHandler(LineHandlerUpstreamHandler lineHandlerUpstreamHandler);
+
+ void popLineHandler();
+}
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
index bdeee67..c5cc824 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
@@ -38,7 +38,6 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.DefaultFileRegion;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedStream;
-import io.netty.util.concurrent.EventExecutorGroup;
/**
@@ -48,13 +47,10 @@ public class NettyProtocolTransport extends AbstractProtocolTransport {
private final Channel channel;
private final SSLEngine engine;
- private final EventExecutorGroup eventExecutors;
- private int lineHandlerCount = 0;
- public NettyProtocolTransport(Channel channel, SSLEngine engine, EventExecutorGroup eventExecutors) {
+ public NettyProtocolTransport(Channel channel, SSLEngine engine) {
this.channel = channel;
this.engine = engine;
- this.eventExecutors = eventExecutors;
}
@Override
@@ -80,15 +76,9 @@ public class NettyProtocolTransport extends AbstractProtocolTransport {
@Override
public void popLineHandler() {
- if (lineHandlerCount > 0) {
- channel.pipeline().remove("lineHandler" + lineHandlerCount);
- lineHandlerCount--;
- }
- }
-
- @Override
- public int getPushedLineHandlerCount() {
- return lineHandlerCount;
+ LineHandlerAware channelHandler = (LineHandlerAware) channel.pipeline()
+ .get(HandlerConstants.CORE_HANDLER);
+ channelHandler.popLineHandler();
}
/**
@@ -154,12 +144,9 @@ public class NettyProtocolTransport extends AbstractProtocolTransport {
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public void pushLineHandler(LineHandler<? extends ProtocolSession> overrideCommandHandler, ProtocolSession session) {
- lineHandlerCount++;
- // Add the linehandler in front of the coreHandler so we can be sure
- // it is executed with the same ExecutorHandler as the coreHandler (if one exist)
- //
- // See JAMES-1277
- channel.pipeline().addBefore(eventExecutors, HandlerConstants.CORE_HANDLER, "lineHandler" + lineHandlerCount, new LineHandlerUpstreamHandler(session, overrideCommandHandler));
+ LineHandlerAware channelHandler = (LineHandlerAware) channel.pipeline()
+ .get(HandlerConstants.CORE_HANDLER);
+ channelHandler.pushLineHandler(new LineHandlerUpstreamHandler(session, overrideCommandHandler));
}
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
index e50a498..82e0e4f 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
@@ -30,16 +30,13 @@ import com.google.common.base.Preconditions;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.group.ChannelGroup;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
/**
* Generic NettyServer
*/
public class NettyServer extends AbstractAsyncServer {
-
public static class Factory {
-
private Protocol protocol;
private Optional<Encryption> secure;
private Optional<ChannelHandlerFactory> frameHandlerFactory;
@@ -77,11 +74,7 @@ public class NettyServer extends AbstractAsyncServer {
protected final Encryption secure;
protected final Protocol protocol;
private final ChannelHandlerFactory frameHandlerFactory;
-
- private ChannelInboundHandlerAdapter coreHandler;
-
private int maxCurConnections;
-
private int maxCurConnectionsPerIP;
private NettyServer(Protocol protocol, Encryption secure, ChannelHandlerFactory frameHandlerFactory) {
@@ -105,12 +98,11 @@ public class NettyServer extends AbstractAsyncServer {
}
protected ChannelInboundHandlerAdapter createCoreHandler() {
- return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, secure, new DefaultEventExecutorGroup(2));
+ return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, secure);
}
@Override
public synchronized void bind() throws Exception {
- coreHandler = createCoreHandler();
super.bind();
}
@@ -132,7 +124,7 @@ public class NettyServer extends AbstractAsyncServer {
@Override
protected ChannelInboundHandlerAdapter createHandler() {
- return coreHandler;
+ return createCoreHandler();
}
};
diff --git a/protocols/smtp/src/test/java/org/apache/james/protocols/smtp/utils/BaseFakeSMTPSession.java b/protocols/smtp/src/test/java/org/apache/james/protocols/smtp/utils/BaseFakeSMTPSession.java
index 0954de6..b474f32 100644
--- a/protocols/smtp/src/test/java/org/apache/james/protocols/smtp/utils/BaseFakeSMTPSession.java
+++ b/protocols/smtp/src/test/java/org/apache/james/protocols/smtp/utils/BaseFakeSMTPSession.java
@@ -123,11 +123,6 @@ public class BaseFakeSMTPSession implements SMTPSession {
}
@Override
- public int getPushedLineHandlerCount() {
- throw new UnsupportedOperationException("Unimplemented Stub Method");
- }
-
- @Override
public Response newLineTooLongResponse() {
throw new UnsupportedOperationException("Unimplemented Stub Method");
}
diff --git a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
index f05ec6c..8d18599 100644
--- a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
+++ b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
@@ -41,8 +41,8 @@ public class POP3Server extends AbstractProtocolAsyncServer implements POP3Serve
* The configuration data to be passed to the handler
*/
private final ProtocolConfiguration theConfigData = new POP3Configuration();
- private BasicChannelUpstreamHandler coreHandler;
-
+ private POP3Protocol protocol;
+
@Override
protected int getDefaultPort() {
return 110;
@@ -76,8 +76,7 @@ public class POP3Server extends AbstractProtocolAsyncServer implements POP3Serve
@Override
protected void preInit() throws Exception {
super.preInit();
- POP3Protocol protocol = new POP3Protocol(getProtocolHandlerChain(), theConfigData);
- coreHandler = new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, getEncryption(), getExecutorGroup());
+ protocol = new POP3Protocol(getProtocolHandlerChain(), theConfigData);
}
@Override
@@ -87,7 +86,7 @@ public class POP3Server extends AbstractProtocolAsyncServer implements POP3Serve
@Override
protected ChannelInboundHandlerAdapter createCoreHandler() {
- return coreHandler;
+ return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, getEncryption());
}
@Override
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
index 5379db5..df984d7 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
@@ -27,25 +27,23 @@ import org.apache.james.protocols.smtp.SMTPSession;
import org.apache.james.protocols.smtp.core.SMTPMDCContextFactory;
import org.apache.james.smtpserver.SMTPConstants;
-import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.EventExecutorGroup;
/**
* {@link BasicChannelUpstreamHandler} which is used by the SMTPServer
*/
-@Sharable
public class SMTPChannelUpstreamHandler extends BasicChannelUpstreamHandler {
private final SmtpMetrics smtpMetrics;
public SMTPChannelUpstreamHandler(Protocol protocol, Encryption encryption, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) {
- super(new SMTPMDCContextFactory(), protocol, encryption, eventExecutorGroup);
+ super(new SMTPMDCContextFactory(), protocol, encryption);
this.smtpMetrics = smtpMetrics;
}
public SMTPChannelUpstreamHandler(Protocol protocol, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) {
- super(new SMTPMDCContextFactory(), protocol, eventExecutorGroup);
+ super(new SMTPMDCContextFactory(), protocol);
this.smtpMetrics = smtpMetrics;
}
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
index 83f1534..78dfa59 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
@@ -56,6 +56,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
*/
public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServerMBean {
private static final Logger LOGGER = LoggerFactory.getLogger(SMTPServer.class);
+ private SMTPProtocol transport;
public enum AuthenticationAnnounceMode {
NEVER,
@@ -186,8 +187,6 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe
private DNSService dns;
private String authorizedAddresses;
-
- private SMTPChannelUpstreamHandler coreHandler;
public SMTPServer(SmtpMetrics smtpMetrics) {
this.smtpMetrics = smtpMetrics;
@@ -211,15 +210,12 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe
authorizedNetworks = new NetMatcher(networks, dns);
LOGGER.info("Authorized addresses: {}", authorizedNetworks);
}
- SMTPProtocol transport = new SMTPProtocol(getProtocolHandlerChain(), theConfigData) {
-
+ transport = new SMTPProtocol(getProtocolHandlerChain(), theConfigData) {
@Override
public ProtocolSession newSession(ProtocolTransport transport) {
return new ExtendedSMTPSession(theConfigData, transport);
}
-
};
- coreHandler = new SMTPChannelUpstreamHandler(transport, getEncryption(), smtpMetrics, getExecutorGroup());
}
@Override
@@ -389,7 +385,7 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe
@Override
protected ChannelInboundHandlerAdapter createCoreHandler() {
- return coreHandler;
+ return new SMTPChannelUpstreamHandler(transport, getEncryption(), smtpMetrics, getExecutorGroup());
}
@Override
diff --git a/testing/base/src/main/resources/logback-test.xml b/testing/base/src/main/resources/logback-test.xml
index ddcf72c..a09363d 100644
--- a/testing/base/src/main/resources/logback-test.xml
+++ b/testing/base/src/main/resources/logback-test.xml
@@ -17,10 +17,7 @@
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%-5level] %logger{15} - %msg%n%rEx</pattern>
</encoder>
- <immediateFlush>false</immediateFlush>
- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
- <level>ERROR</level>
- </filter>
+ <immediateFlush>true</immediateFlush>
</appender>
<root level="WARN">
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org