You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/03/22 02:01:38 UTC

[james-project] 07/29: JAMES-3715 Fix SMTP pipelining

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 43205f9c2357516d95b8ff4405f9e6746727b14f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Mar 7 14:48:32 2022 +0700

    JAMES-3715 Fix SMTP pipelining
    
    Concurrent modifications of the pipeline are not safe, embed the line handler
    logic into the core handler to avoid bad surprises...
---
 .../james/protocols/api/ProtocolSession.java       |  6 ----
 .../james/protocols/api/ProtocolSessionImpl.java   |  5 ---
 .../james/protocols/api/ProtocolTransport.java     |  5 ---
 .../api/AbstractProtocolTransportTest.java         |  5 ---
 .../netty/BasicChannelUpstreamHandler.java         | 39 ++++++++++++++++------
 .../james/protocols/netty/LineHandlerAware.java    |  7 ++++
 .../protocols/netty/NettyProtocolTransport.java    | 27 ++++-----------
 .../apache/james/protocols/netty/NettyServer.java  | 12 ++-----
 .../protocols/smtp/utils/BaseFakeSMTPSession.java  |  5 ---
 .../apache/james/pop3server/netty/POP3Server.java  |  9 +++--
 .../netty/SMTPChannelUpstreamHandler.java          |  6 ++--
 .../apache/james/smtpserver/netty/SMTPServer.java  | 10 ++----
 testing/base/src/main/resources/logback-test.xml   |  5 +--
 13 files changed, 54 insertions(+), 87 deletions(-)

diff --git a/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSession.java b/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSession.java
index a9dc57e..9410124 100644
--- a/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSession.java
+++ b/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSession.java
@@ -231,10 +231,4 @@ public interface ProtocolSession extends CommandDetectionSession {
      * Pop the last command handler 
      */
     void popLineHandler();
-    
-    /**
-     * Return the size of the pushed {@link LineHandler}
-     * @return size of the pushed line handler
-     */
-    int getPushedLineHandlerCount();
 }
diff --git a/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSessionImpl.java b/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSessionImpl.java
index 1e88275..0750f82 100644
--- a/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSessionImpl.java
+++ b/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolSessionImpl.java
@@ -213,11 +213,6 @@ public class ProtocolSessionImpl implements ProtocolSession {
     }
 
     @Override
-    public int getPushedLineHandlerCount() {
-        return transport.getPushedLineHandlerCount();
-    }
-
-    @Override
     public <T extends ProtocolSession> void pushLineHandler(LineHandler<T> overrideCommandHandler) {
         transport.pushLineHandler(overrideCommandHandler, this);
     }
diff --git a/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolTransport.java b/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolTransport.java
index add23e8..09f5377 100644
--- a/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolTransport.java
+++ b/protocols/api/src/main/java/org/apache/james/protocols/api/ProtocolTransport.java
@@ -73,11 +73,6 @@ public interface ProtocolTransport {
      * Push a {@link LineHandler} in.
      */
     void pushLineHandler(LineHandler<? extends ProtocolSession> overrideCommandHandler, ProtocolSession session);
-
-    /**
-     * Return the count of pushed {@link LineHandler}'s
-     */
-    int getPushedLineHandlerCount();
     
     
     /**
diff --git a/protocols/api/src/test/java/org/apache/james/protocols/api/AbstractProtocolTransportTest.java b/protocols/api/src/test/java/org/apache/james/protocols/api/AbstractProtocolTransportTest.java
index 7d6a91a..f6c4933 100644
--- a/protocols/api/src/test/java/org/apache/james/protocols/api/AbstractProtocolTransportTest.java
+++ b/protocols/api/src/test/java/org/apache/james/protocols/api/AbstractProtocolTransportTest.java
@@ -92,11 +92,6 @@ public class AbstractProtocolTransportTest {
             }
             
             @Override
-            public int getPushedLineHandlerCount() {
-                throw new UnsupportedOperationException();
-            }
-            
-            @Override
             public InetSocketAddress getLocalAddress() {
                 throw new UnsupportedOperationException();
             }
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
index 1cde815..8534a99 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
@@ -22,9 +22,11 @@ import static org.apache.james.protocols.api.ProtocolSession.State.Connection;
 
 import java.io.Closeable;
 import java.nio.channels.ClosedChannelException;
+import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedDeque;
 
 import javax.net.ssl.SSLEngine;
 
@@ -44,21 +46,19 @@ import org.apache.james.util.MDCBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Iterables;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.util.AttributeKey;
-import io.netty.util.concurrent.EventExecutorGroup;
-
 
 /**
  * {@link ChannelInboundHandlerAdapter} which is used by the SMTPServer and other line based protocols
  */
-@Sharable
-public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
+public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter implements LineHandlerAware {
     private static final Logger LOGGER = LoggerFactory.getLogger(BasicChannelUpstreamHandler.class);
     public static final ProtocolSession.AttachmentKey<MDCBuilder> MDC_ATTRIBUTE_KEY = ProtocolSession.AttachmentKey.of("bound_MDC", MDCBuilder.class);
     public static final AttributeKey<CommandDetectionSession> SESSION_ATTRIBUTE_KEY =
@@ -68,18 +68,17 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
     protected final Protocol protocol;
     protected final ProtocolHandlerChain chain;
     protected final Encryption secure;
-    private final EventExecutorGroup eventExecutors;
+    private final Deque<LineHandlerUpstreamHandler> behaviourOverrides = new ConcurrentLinkedDeque<>();
 
-    public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, EventExecutorGroup eventExecutors) {
-        this(mdcContextFactory, protocol, null, eventExecutors);
+    public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol) {
+        this(mdcContextFactory, protocol, null);
     }
 
-    public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, Encryption secure, EventExecutorGroup eventExecutors) {
+    public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, Encryption secure) {
         this.mdcContextFactory = mdcContextFactory;
         this.protocol = protocol;
         this.chain = protocol.getProtocolChain();
         this.secure = secure;
-        this.eventExecutors = eventExecutors;
     }
 
 
@@ -151,6 +150,12 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        LineHandlerUpstreamHandler override = Iterables.getFirst(behaviourOverrides, null);
+        if (override != null) {
+            override.channelRead(ctx, msg);
+            return;
+        }
+
         try (Closeable closeable = mdc(ctx).build()) {
             ProtocolSession pSession = (ProtocolSession) ctx.channel().attr(SESSION_ATTRIBUTE_KEY).get();
             LinkedList<LineHandler> lineHandlers = chain.getHandlers(LineHandler.class);
@@ -200,7 +205,7 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
             engine = secure.createSSLEngine();
         }
 
-        return protocol.newSession(new NettyProtocolTransport(ctx.channel(), engine, eventExecutors));
+        return protocol.newSession(new NettyProtocolTransport(ctx.channel(), engine));
     }
 
     @Override
@@ -234,4 +239,16 @@ public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
         }
     }
 
+    @Override
+    public void pushLineHandler(LineHandlerUpstreamHandler lineHandlerUpstreamHandler) {
+        behaviourOverrides.addFirst(lineHandlerUpstreamHandler);
+    }
+
+    @Override
+    public void popLineHandler() {
+        if (!behaviourOverrides.isEmpty()) {
+            behaviourOverrides.removeFirst();
+        }
+    }
+
 }
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/LineHandlerAware.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/LineHandlerAware.java
new file mode 100644
index 0000000..516dadf
--- /dev/null
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/LineHandlerAware.java
@@ -0,0 +1,7 @@
+package org.apache.james.protocols.netty;
+
+public interface LineHandlerAware {
+    void pushLineHandler(LineHandlerUpstreamHandler lineHandlerUpstreamHandler);
+
+    void popLineHandler();
+}
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
index bdeee67..c5cc824 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyProtocolTransport.java
@@ -38,7 +38,6 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.DefaultFileRegion;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.stream.ChunkedStream;
-import io.netty.util.concurrent.EventExecutorGroup;
 
 
 /**
@@ -48,13 +47,10 @@ public class NettyProtocolTransport extends AbstractProtocolTransport {
     
     private final Channel channel;
     private final SSLEngine engine;
-    private final EventExecutorGroup eventExecutors;
-    private int lineHandlerCount = 0;
     
-    public NettyProtocolTransport(Channel channel, SSLEngine engine, EventExecutorGroup eventExecutors) {
+    public NettyProtocolTransport(Channel channel, SSLEngine engine) {
         this.channel = channel;
         this.engine = engine;
-        this.eventExecutors = eventExecutors;
     }
 
     @Override
@@ -80,15 +76,9 @@ public class NettyProtocolTransport extends AbstractProtocolTransport {
 
     @Override
     public void popLineHandler() {
-        if (lineHandlerCount > 0) {
-            channel.pipeline().remove("lineHandler" + lineHandlerCount);
-            lineHandlerCount--;
-        }
-    }
-
-    @Override
-    public int getPushedLineHandlerCount() {
-        return lineHandlerCount;
+        LineHandlerAware channelHandler = (LineHandlerAware) channel.pipeline()
+            .get(HandlerConstants.CORE_HANDLER);
+        channelHandler.popLineHandler();
     }
 
     /**
@@ -154,12 +144,9 @@ public class NettyProtocolTransport extends AbstractProtocolTransport {
     @Override
     @SuppressWarnings({ "unchecked", "rawtypes" })
     public void pushLineHandler(LineHandler<? extends ProtocolSession> overrideCommandHandler, ProtocolSession session) {
-        lineHandlerCount++;
-        // Add the linehandler in front of the coreHandler so we can be sure 
-        // it is executed with the same ExecutorHandler as the coreHandler (if one exist)
-        // 
-        // See JAMES-1277
-        channel.pipeline().addBefore(eventExecutors, HandlerConstants.CORE_HANDLER, "lineHandler" + lineHandlerCount, new LineHandlerUpstreamHandler(session, overrideCommandHandler));
+        LineHandlerAware channelHandler = (LineHandlerAware) channel.pipeline()
+            .get(HandlerConstants.CORE_HANDLER);
+        channelHandler.pushLineHandler(new LineHandlerUpstreamHandler(session, overrideCommandHandler));
     }
     
    
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
index e50a498..82e0e4f 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
@@ -30,16 +30,13 @@ import com.google.common.base.Preconditions;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.DefaultEventLoopGroup;
 import io.netty.channel.group.ChannelGroup;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
 
 
 /**
  * Generic NettyServer 
  */
 public class NettyServer extends AbstractAsyncServer {
-
     public static class Factory {
-
         private Protocol protocol;
         private Optional<Encryption> secure;
         private Optional<ChannelHandlerFactory> frameHandlerFactory;
@@ -77,11 +74,7 @@ public class NettyServer extends AbstractAsyncServer {
     protected final Encryption secure;
     protected final Protocol protocol;
     private final ChannelHandlerFactory frameHandlerFactory;
-
-    private ChannelInboundHandlerAdapter coreHandler;
-
     private int maxCurConnections;
-
     private int maxCurConnectionsPerIP;
    
     private NettyServer(Protocol protocol, Encryption secure, ChannelHandlerFactory frameHandlerFactory) {
@@ -105,12 +98,11 @@ public class NettyServer extends AbstractAsyncServer {
     }
 
     protected ChannelInboundHandlerAdapter createCoreHandler() {
-        return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, secure, new DefaultEventExecutorGroup(2));
+        return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, secure);
     }
     
     @Override
     public synchronized void bind() throws Exception {
-        coreHandler = createCoreHandler();
         super.bind();
     }
 
@@ -132,7 +124,7 @@ public class NettyServer extends AbstractAsyncServer {
 
             @Override
             protected ChannelInboundHandlerAdapter createHandler() {
-                return coreHandler;
+                return createCoreHandler();
             }
         };
 
diff --git a/protocols/smtp/src/test/java/org/apache/james/protocols/smtp/utils/BaseFakeSMTPSession.java b/protocols/smtp/src/test/java/org/apache/james/protocols/smtp/utils/BaseFakeSMTPSession.java
index 0954de6..b474f32 100644
--- a/protocols/smtp/src/test/java/org/apache/james/protocols/smtp/utils/BaseFakeSMTPSession.java
+++ b/protocols/smtp/src/test/java/org/apache/james/protocols/smtp/utils/BaseFakeSMTPSession.java
@@ -123,11 +123,6 @@ public class BaseFakeSMTPSession implements SMTPSession {
     }
 
     @Override
-    public int getPushedLineHandlerCount() {
-        throw new UnsupportedOperationException("Unimplemented Stub Method");
-    }
-
-    @Override
     public Response newLineTooLongResponse() {
         throw new UnsupportedOperationException("Unimplemented Stub Method");
     }
diff --git a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
index f05ec6c..8d18599 100644
--- a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
+++ b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
@@ -41,8 +41,8 @@ public class POP3Server extends AbstractProtocolAsyncServer implements POP3Serve
      * The configuration data to be passed to the handler
      */
     private final ProtocolConfiguration theConfigData = new POP3Configuration();
-    private BasicChannelUpstreamHandler coreHandler;
-    
+    private POP3Protocol protocol;
+
     @Override
     protected int getDefaultPort() {
         return 110;
@@ -76,8 +76,7 @@ public class POP3Server extends AbstractProtocolAsyncServer implements POP3Serve
     @Override
     protected void preInit() throws Exception {
         super.preInit();
-        POP3Protocol protocol = new POP3Protocol(getProtocolHandlerChain(), theConfigData);
-        coreHandler = new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, getEncryption(), getExecutorGroup());
+        protocol = new POP3Protocol(getProtocolHandlerChain(), theConfigData);
     }
 
     @Override
@@ -87,7 +86,7 @@ public class POP3Server extends AbstractProtocolAsyncServer implements POP3Serve
 
     @Override
     protected ChannelInboundHandlerAdapter createCoreHandler() {
-        return coreHandler; 
+        return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, getEncryption());
     }
 
     @Override
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
index 5379db5..df984d7 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
@@ -27,25 +27,23 @@ import org.apache.james.protocols.smtp.SMTPSession;
 import org.apache.james.protocols.smtp.core.SMTPMDCContextFactory;
 import org.apache.james.smtpserver.SMTPConstants;
 
-import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.concurrent.EventExecutorGroup;
 
 /**
  * {@link BasicChannelUpstreamHandler} which is used by the SMTPServer
  */
-@Sharable
 public class SMTPChannelUpstreamHandler extends BasicChannelUpstreamHandler {
 
     private final SmtpMetrics smtpMetrics;
 
     public SMTPChannelUpstreamHandler(Protocol protocol, Encryption encryption, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) {
-        super(new SMTPMDCContextFactory(), protocol, encryption, eventExecutorGroup);
+        super(new SMTPMDCContextFactory(), protocol, encryption);
         this.smtpMetrics = smtpMetrics;
     }
 
     public SMTPChannelUpstreamHandler(Protocol protocol, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) {
-        super(new SMTPMDCContextFactory(), protocol, eventExecutorGroup);
+        super(new SMTPMDCContextFactory(), protocol);
         this.smtpMetrics = smtpMetrics;
     }
 
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
index 83f1534..78dfa59 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
@@ -56,6 +56,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
  */
 public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServerMBean {
     private static final Logger LOGGER = LoggerFactory.getLogger(SMTPServer.class);
+    private SMTPProtocol transport;
 
     public enum AuthenticationAnnounceMode {
         NEVER,
@@ -186,8 +187,6 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe
 
     private DNSService dns;
     private String authorizedAddresses;
-    
-    private SMTPChannelUpstreamHandler coreHandler;
 
     public SMTPServer(SmtpMetrics smtpMetrics) {
         this.smtpMetrics = smtpMetrics;
@@ -211,15 +210,12 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe
             authorizedNetworks = new NetMatcher(networks, dns);
             LOGGER.info("Authorized addresses: {}", authorizedNetworks);
         }
-        SMTPProtocol transport = new SMTPProtocol(getProtocolHandlerChain(), theConfigData) {
-
+        transport = new SMTPProtocol(getProtocolHandlerChain(), theConfigData) {
             @Override
             public ProtocolSession newSession(ProtocolTransport transport) {
                 return new ExtendedSMTPSession(theConfigData, transport);
             }
-            
         };
-        coreHandler = new SMTPChannelUpstreamHandler(transport, getEncryption(), smtpMetrics, getExecutorGroup());
     }
 
     @Override
@@ -389,7 +385,7 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe
 
     @Override
     protected ChannelInboundHandlerAdapter createCoreHandler() {
-        return coreHandler;
+        return new SMTPChannelUpstreamHandler(transport, getEncryption(), smtpMetrics, getExecutorGroup());
     }
 
     @Override
diff --git a/testing/base/src/main/resources/logback-test.xml b/testing/base/src/main/resources/logback-test.xml
index ddcf72c..a09363d 100644
--- a/testing/base/src/main/resources/logback-test.xml
+++ b/testing/base/src/main/resources/logback-test.xml
@@ -17,10 +17,7 @@
                 <encoder>
                         <pattern>%d{HH:mm:ss.SSS} [%-5level] %logger{15} - %msg%n%rEx</pattern>
                 </encoder>
-                <immediateFlush>false</immediateFlush>
-                <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
-                        <level>ERROR</level>
-                </filter>
+                <immediateFlush>true</immediateFlush>
         </appender>
 
         <root level="WARN">

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org