You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/01/03 03:48:20 UTC

[james-project] branch master updated: JAMES-3870 Group IMAP response line within TCP packets (#1364)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 0659f18f8d JAMES-3870 Group IMAP response line within TCP packets (#1364)
0659f18f8d is described below

commit 0659f18f8d624ba118903ca124b07bd730e89957
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Tue Jan 3 10:48:14 2023 +0700

    JAMES-3870 Group IMAP response line within TCP packets (#1364)
    
    Today each Imap response line is transmitted in a
    distinct TCP packet.
    
    This causes a lot of network overhead as
    => TCP headers are added for each response line.
       To give an idea a LIST response line is 35 bytes
       long but result in a 101 bytes TCP frame so a 188%
       overcost....
    => TCP ack are conducted for each line independently.
       An ACK is 66 bytes
    
    This is especially problematic for LIST, FETCH commands
    that actually result in many (100, 1000, maybe millions)
    response lines.
    
    We should try to limit the calls to "flush" with Netty,
    and force the flush only once per IMAP command (at the end
    of processing). Netty is free to transmit some data earlier
    if it's buffer states requires it.
    
    The entire mailbox list take 1 packet to transmit (1420
    bytes total for 41 mailboxes so ~35 bytes per mailbox)
    and a single ACK (66 bytes).
---
 .../apache/james/imap/api/process/ImapSession.java  |  4 ++++
 .../apache/james/imap/processor/IdleProcessor.java  |  5 +++--
 .../imapserver/netty/ChannelImapResponseWriter.java | 12 +++++++-----
 .../netty/ImapChannelUpstreamHandler.java           | 21 ++++++++++++++-------
 .../imapserver/netty/ImapLineHandlerAdapter.java    |  1 +
 .../james/imapserver/netty/NettyImapSession.java    |  7 +++++++
 6 files changed, 36 insertions(+), 14 deletions(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
index 0e384c755b..35224b7ceb 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
@@ -263,4 +263,8 @@ public interface ImapSession extends CommandDetectionSession {
     }
 
     void schedule(Runnable runnable, Duration waitDelay);
+
+    default void flush() {
+
+    }
 }
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
index a8cb8d5f53..b6fdf32eaa 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/IdleProcessor.java
@@ -97,7 +97,6 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme
             if (sm != null) {
                 sm.unregisterIdle();
             }
-            session1.popLineHandler();
             if (!DONE.equals(line.toUpperCase(Locale.US))) {
                 String message = String.format("Continuation for IMAP IDLE was not understood. Expected 'DONE', got '%s'.", line);
                 StatusResponse response = getStatusResponseFactory()
@@ -109,6 +108,7 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme
             } else {
                 okComplete(request, responder);
             }
+            session1.popLineHandler();
             idleActive.set(false);
         });
 
@@ -167,7 +167,8 @@ public class IdleProcessor extends AbstractMailboxProcessor<IdleRequest> impleme
 
         @Override
         public Publisher<Void> reactiveEvent(Event event) {
-            return unsolicitedResponses(session, responder, false);
+            return unsolicitedResponses(session, responder, false)
+                .then(Mono.fromRunnable(session::flush));
         }
 
         @Override
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java
index 399b3f6f60..78d96fe75e 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ChannelImapResponseWriter.java
@@ -57,7 +57,7 @@ public class ChannelImapResponseWriter implements ImapResponseWriter {
     @Override
     public void write(byte[] buffer) throws IOException {
         if (channel.isActive()) {
-            channel.writeAndFlush(Unpooled.wrappedBuffer(buffer));
+            channel.write(Unpooled.wrappedBuffer(buffer));
         }
     }
 
@@ -72,16 +72,18 @@ public class ChannelImapResponseWriter implements ImapResponseWriter {
                 // See JAMES-1305 and JAMES-1306
                 ChannelPipeline cp = channel.pipeline();
                 if (zeroCopy && cp.get(SslHandler.class) == null && cp.get(ZlibEncoder.class) == null) {
-                    channel.writeAndFlush(new DefaultFileRegion(fc, fc.position(), literal.size()));
+                    channel.write(new DefaultFileRegion(fc, fc.position(), literal.size()));
                 } else {
-                    channel.writeAndFlush(new ChunkedNioFile(fc, 8192));
+                    channel.write(new ChunkedNioFile(fc, 8192));
                 }
             } else {
-                channel.writeAndFlush(new ChunkedStream(in));
+                channel.write(new ChunkedStream(in));
             }
         }
     }
     
-    
+    public void flush() {
+        channel.flush();
+    }
 
 }
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
index b0dd0e66f6..d451b4ed0a 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
@@ -181,10 +181,11 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
             LOGGER.info("Connection established from {}", address.getAddress().getHostAddress());
             imapConnectionsMetric.increment();
 
-            ImapResponseComposer response = new ImapResponseComposerImpl(new ChannelImapResponseWriter(ctx.channel()));
-
+            ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());
+            ImapResponseComposer response = new ImapResponseComposerImpl(writer);
             // write hello to client
             response.untagged().message("OK").message(hello).end();
+            writer.flush();
             super.channelActive(ctx);
         }
 
@@ -249,8 +250,10 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
                 // command length."
                 //
                 // See also JAMES-1190
-                ImapResponseComposer response = new ImapResponseComposerImpl(new ChannelImapResponseWriter(ctx.channel()));
+                ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());
+                ImapResponseComposer response = new ImapResponseComposerImpl(writer);
                 response.untaggedResponse(ImapConstants.BAD + " failed. Maximum command line length exceeded");
+                writer.flush();
 
             } else if (cause instanceof ReactiveThrottler.RejectedException) {
                 manageRejectedException(ctx, (ReactiveThrottler.RejectedException) cause);
@@ -263,10 +266,12 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
     private void manageRejectedException(ChannelHandlerContext ctx, ReactiveThrottler.RejectedException cause) {
         if (cause.getImapMessage() instanceof AbstractImapRequest) {
             AbstractImapRequest req = (AbstractImapRequest) cause.getImapMessage();
-            ImapResponseComposer response = new ImapResponseComposerImpl(new ChannelImapResponseWriter(ctx.channel()));
-
+            ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());
+            ImapResponseComposer response = new ImapResponseComposerImpl(writer);
             new ResponseEncoder(encoder, response)
-            .respond(new ImmutableStatusResponse(StatusResponse.Type.NO, req.getTag(), req.getCommand(), new HumanReadableText(cause.getClass().getName(), cause.getMessage()), null));
+                .respond(new ImmutableStatusResponse(StatusResponse.Type.NO, req.getTag(), req.getCommand(),
+                    new HumanReadableText(cause.getClass().getName(), cause.getMessage()), null));
+            writer.flush();
         } else {
             manageUnknownError(ctx);
         }
@@ -306,7 +311,8 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
         imapCommandsMetric.increment();
         ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
         Attribute<Disposable> disposableAttribute = ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY);
-        ImapResponseComposer response = new ImapResponseComposerImpl(new ChannelImapResponseWriter(ctx.channel()));
+        ChannelImapResponseWriter writer = new ChannelImapResponseWriter(ctx.channel());
+        ImapResponseComposer response = new ImapResponseComposerImpl(writer);
         ImapMessage message = (ImapMessage) msg;
 
         beforeIDLEUponProcessing(ctx);
@@ -344,6 +350,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp
                         ctx.fireExceptionCaught(signal.getThrowable());
                     }
                     disposableAttribute.set(null);
+                    writer.flush();
                     ctx.fireChannelReadComplete();
                 }))
                 .contextWrite(ReactorUtils.context("imap", mdc(ctx))), message)
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapLineHandlerAdapter.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapLineHandlerAdapter.java
index c03e8d76ee..59febae27b 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapLineHandlerAdapter.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapLineHandlerAdapter.java
@@ -49,6 +49,7 @@ public class ImapLineHandlerAdapter extends ChannelInboundHandlerAdapter {
         byte[] data = new byte[buf.readableBytes()];
         buf.readBytes(data);
         lineHandler.onLine(session, data);
+        ctx.channel().flush();
     }
 
 }
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
index 6c8b228d10..611e84561d 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
@@ -183,6 +183,7 @@ public class NettyImapSession implements ImapSession, NettyConstants {
         }
         executeSafely(() -> {
             runnable.run();
+            channel.flush();
             channel.pipeline().addFirst(SSL_HANDLER, secure.sslHandler());
             stopDetectingCommandInjection();
         });
@@ -229,6 +230,7 @@ public class NettyImapSession implements ImapSession, NettyConstants {
 
         executeSafely(() -> {
             runnable.run();
+            channel.flush();
             ZlibDecoder decoder = new JZlibDecoder(ZlibWrapper.NONE);
             ZlibEncoder encoder = new JZlibEncoder(ZlibWrapper.NONE, 5);
 
@@ -307,4 +309,9 @@ public class NettyImapSession implements ImapSession, NettyConstants {
     public void schedule(Runnable runnable, Duration waitDelay) {
         channel.eventLoop().schedule(runnable, waitDelay.toMillis(), TimeUnit.MILLISECONDS);
     }
+
+    @Override
+    public void flush() {
+        channel.flush();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org