You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/05/09 01:35:30 UTC

[james-project] branch master updated (70cc7d388d -> 169d83405e)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from 70cc7d388d JAMES-3747 Reactive implementation for RabbitMQ channelPool (#979)
     new 11fd6163a6 JAMES-3737 Extract methods in ImapRequestFrameDecoder
     new 9fc41cbe68 JAMES-3737 Extract methods in ImapRequestFrameDecoder: small enhancements
     new e0017446c6 JAMES-3737 Copy large APPEND to a file asynchronously
     new caff4d0781 JAMES-3737 Execute IMAP requests on eventLoop
     new 294cbd8000 JAMES-3737 StartTLS and compress no longer need a distinct writer
     new 1f2791f325 JAMES-3737 Avoid a race condition upon compress
     new b80c0ed81a JAMES-3737 Avoid a race condition upon STARTTLS
     new 0941a11ebf JAMES-3737 AuthenticateProcessor should disable reads when adding a line handler
     new 169d83405e JAMES-3737 Backport resetReaderIndex fix

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/james/imap/api/process/ImapSession.java |   9 +-
 .../apache/james/imap/encode/FakeImapSession.java  |   5 +-
 .../imap/processor/AbstractMailboxProcessor.java   |   4 +-
 .../imap/processor/AuthenticateProcessor.java      |  12 +-
 .../james/imap/processor/CompressProcessor.java    |   3 +-
 .../james/imap/processor/StartTLSProcessor.java    |   3 +-
 .../apache/james/imapserver/netty/IMAPServer.java  |   4 +-
 .../imapserver/netty/ImapRequestFrameDecoder.java  | 225 +++++++++++++--------
 .../james/imapserver/netty/NettyImapSession.java   |  74 ++++---
 9 files changed, 196 insertions(+), 143 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/09: JAMES-3737 Extract methods in ImapRequestFrameDecoder: small enhancements

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9fc41cbe6801414fe6dabe56972231be90f2d3af
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 22 10:20:44 2022 +0700

    JAMES-3737 Extract methods in ImapRequestFrameDecoder: small enhancements
    
     - Reduce counts of parameters
     -Also avoid a double map lookup
---
 .../james/imapserver/netty/ImapRequestFrameDecoder.java      | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
index 09bcbb0c53..ab375b356e 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
@@ -56,6 +56,7 @@ import io.netty.handler.codec.ByteToMessageDecoder;
 public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements NettyConstants, LineHandlerAware {
     @VisibleForTesting
     static final String NEEDED_DATA = "NEEDED_DATA";
+    private static final boolean RETRY = true;
     private static final String STORED_DATA = "STORED_DATA";
     private static final String WRITTEN_DATA = "WRITTEN_DATA";
     private static final String OUTPUT_STREAM = "OUTPUT_STREAM";
@@ -155,9 +156,10 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
         // check if we failed before and if we already know how much data we
         // need to sucess next run
         int size = -1;
-        if (attachment.containsKey(NEEDED_DATA)) {
+        final Object rawSize = attachment.get(NEEDED_DATA);
+        if (rawSize != null) {
             retry = true;
-            size = (Integer) attachment.get(NEEDED_DATA);
+            size = (Integer) rawSize;
             // now see if the buffer hold enough data to process.
             if (size != NettyImapRequestLineReader.NotEnoughDataException.UNKNOWN_SIZE && size > in.readableBytes()) {
 
@@ -167,7 +169,7 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
 
                     // ok seems like it will not fit in the memory limit so we
                     // need to store it in a temporary file
-                    reader = uploadToAFile(ctx, in, retry, attachment, size);
+                    reader = uploadToAFile(ctx, in, attachment, size);
                     if (reader == null) return null;
 
                 } else {
@@ -185,7 +187,7 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
         return Pair.of(reader, size);
     }
 
-    private ImapRequestLineReader uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, boolean retry, Map<String, Object> attachment, int size) throws IOException {
+    private ImapRequestLineReader uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int size) throws IOException {
         ImapRequestLineReader reader;
         final File f;
         int written;
@@ -228,7 +230,7 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
                 //ignore exception during close
             }
 
-            reader = new NettyStreamImapRequestLineReader(ctx.channel(), f, retry);
+            reader = new NettyStreamImapRequestLineReader(ctx.channel(), f, RETRY);
         } else {
             attachment.put(WRITTEN_DATA, written);
             return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/09: JAMES-3737 Copy large APPEND to a file asynchronously

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e0017446c6373fdbc229dbc01ed463e1c8d3f1eb
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 22 11:23:21 2022 +0700

    JAMES-3737 Copy large APPEND to a file asynchronously
    
    This enable executing all IMAP requests on the event loop
---
 .../imapserver/netty/ImapRequestFrameDecoder.java  | 100 +++++++++++++--------
 1 file changed, 62 insertions(+), 38 deletions(-)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
index ab375b356e..fd4e2eec8a 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.imapserver.netty;
 
+import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -30,6 +32,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.imap.api.ImapMessage;
@@ -48,6 +51,9 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.ByteToMessageDecoder;
+import reactor.core.Disposable;
+import reactor.core.publisher.Sinks;
+import reactor.core.scheduler.Schedulers;
 
 
 /**
@@ -60,6 +66,9 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
     private static final String STORED_DATA = "STORED_DATA";
     private static final String WRITTEN_DATA = "WRITTEN_DATA";
     private static final String OUTPUT_STREAM = "OUTPUT_STREAM";
+    private static final String SINK = "SINK";
+    private static final String SUBSCRIPTION = "SUBSCRIPTION";
+
 
     private final ImapDecoder decoder;
     private final int inMemorySizeLimit;
@@ -169,8 +178,8 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
 
                     // ok seems like it will not fit in the memory limit so we
                     // need to store it in a temporary file
-                    reader = uploadToAFile(ctx, in, attachment, size);
-                    if (reader == null) return null;
+                    uploadToAFile(ctx, in, attachment, size);
+                    return null;
 
                 } else {
                     in.resetReaderIndex();
@@ -187,55 +196,70 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
         return Pair.of(reader, size);
     }
 
-    private ImapRequestLineReader uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int size) throws IOException {
-        ImapRequestLineReader reader;
+    private void uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int size) throws IOException {
         final File f;
-        int written;
+        Sinks.Many<byte[]> sink;
 
         OutputStream outputStream;
         // check if we have created a temporary file already or if
         // we need to create a new one
         if (attachment.containsKey(STORED_DATA)) {
-            f = (File) attachment.get(STORED_DATA);
-            written = (Integer) attachment.get(WRITTEN_DATA);
-            outputStream = (OutputStream) attachment.get(OUTPUT_STREAM);
+            sink = (Sinks.Many<byte[]>) attachment.get(SINK);
         } else {
             f = File.createTempFile("imap-literal", ".tmp");
             attachment.put(STORED_DATA, f);
-            written = 0;
+            final AtomicInteger written = new AtomicInteger(0);
             attachment.put(WRITTEN_DATA, written);
             outputStream = new FileOutputStream(f, true);
             attachment.put(OUTPUT_STREAM, outputStream);
-
-        }
-
-
-        try {
-            int amount = Math.min(in.readableBytes(), size - written);
-            in.readBytes(outputStream, amount);
-            written += amount;
-        } catch (Exception e) {
-            try {
-                outputStream.close();
-            } catch (IOException ignored) {
-                //ignore exception during close
-            }
-            throw e;
-        }
-        // Check if all needed data was streamed to the file.
-        if (written == size) {
-            try {
-                outputStream.close();
-            } catch (IOException ignored) {
-                //ignore exception during close
-            }
-
-            reader = new NettyStreamImapRequestLineReader(ctx.channel(), f, RETRY);
-        } else {
-            attachment.put(WRITTEN_DATA, written);
-            return null;
+            sink = Sinks.many().unicast().onBackpressureBuffer();
+            attachment.put(SINK, sink);
+
+            Disposable subscribe = sink.asFlux()
+                .publishOn(Schedulers.elastic())
+                .doOnNext(next -> {
+                    try {
+                        int amount = Math.min(next.length, size - written.get());
+                        outputStream.write(next, 0, amount);
+                        written.addAndGet(amount);
+                    } catch (Exception e) {
+                        try {
+                            outputStream.close();
+                        } catch (IOException ignored) {
+                            //ignore exception during close
+                        }
+                        throw new RuntimeException(e);
+                    }
+
+                    // Check if all needed data was streamed to the file.
+                    if (written.get() == size) {
+                        try {
+                            outputStream.close();
+                        } catch (IOException ignored) {
+                            //ignore exception during close
+                        }
+
+                        ImapRequestLineReader reader = new NettyStreamImapRequestLineReader(ctx.channel(), f, RETRY);
+
+                        try {
+                            parseImapMessage(ctx, null, attachment, Pair.of(reader, written.get()))
+                                .ifPresent(ctx::fireChannelRead);
+                        } catch (DecodingException e) {
+                            ctx.fireExceptionCaught(e);
+                        }
+                    }
+                })
+                .subscribe(o -> {},
+                    ctx::fireExceptionCaught,
+                    () -> {
+
+                    });
+            attachment.put(SUBSCRIPTION, subscribe);
         }
-        return reader;
+        final int readableBytes = in.readableBytes();
+        final byte[] bytes = new byte[readableBytes];
+        in.readBytes(bytes);
+        sink.emitNext(bytes, FAIL_FAST);
     }
 
     public void disableFraming(ChannelHandlerContext ctx) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 08/09: JAMES-3737 AuthenticateProcessor should disable reads when adding a line handler

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 0941a11ebf74aefa68cffc11b5c30d42ab798759
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu May 5 07:03:01 2022 +0700

    JAMES-3737 AuthenticateProcessor should disable reads when adding a line handler
---
 .../apache/james/imap/api/process/ImapSession.java   |  4 ++++
 .../james/imap/processor/AuthenticateProcessor.java  | 12 +++++++-----
 .../james/imapserver/netty/NettyImapSession.java     | 20 ++++++++++++--------
 3 files changed, 23 insertions(+), 13 deletions(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
index d0d0e9359d..65287b1f10 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
@@ -90,6 +90,10 @@ public interface ImapSession extends CommandDetectionSession {
      */
     SessionId sessionId();
 
+    default void executeSafely(Runnable runnable) {
+        runnable.run();
+    }
+
     /**
      * Logs out the session. Marks the connection for closure;
      */
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AuthenticateProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AuthenticateProcessor.java
index 598333f5eb..590242dfa3 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AuthenticateProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AuthenticateProcessor.java
@@ -79,11 +79,13 @@ public class AuthenticateProcessor extends AbstractAuthProcessor<AuthenticateReq
                     IRAuthenticateRequest irRequest = (IRAuthenticateRequest) request;
                     doPlainAuth(irRequest.getInitialClientResponse(), session, request, responder);
                 } else {
-                    responder.respond(new AuthenticateResponse());
-                    session.pushLineHandler((requestSession, data) -> {
-                        doPlainAuth(extractInitialClientResponse(data), requestSession, request, responder);
-                        // remove the handler now
-                        requestSession.popLineHandler();
+                    session.executeSafely(() -> {
+                        responder.respond(new AuthenticateResponse());
+                        session.pushLineHandler((requestSession, data) -> {
+                            doPlainAuth(extractInitialClientResponse(data), requestSession, request, responder);
+                            // remove the handler now
+                            requestSession.popLineHandler();
+                        });
                     });
                 }
             }
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
index 5e2dd29cf8..fb47fc2609 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
@@ -95,6 +95,16 @@ public class NettyImapSession implements ImapSession, NettyConstants {
         return sessionId;
     }
 
+    @Override
+    public void executeSafely(Runnable runnable) {
+        channel.eventLoop().execute(() -> {
+            channel.config().setAutoRead(false);
+            runnable.run();
+
+            channel.config().setAutoRead(true);
+        });
+    }
+
     @Override
     public Mono<Void> logout() {
         return closeMailbox()
@@ -166,13 +176,10 @@ public class NettyImapSession implements ImapSession, NettyConstants {
         if (!supportStartTLS()) {
             return false;
         }
-        channel.eventLoop().execute(() -> {
-            channel.config().setAutoRead(false);
+        executeSafely(() -> {
             runnable.run();
-
             channel.pipeline().addFirst(SSL_HANDLER, secure.sslHandler());
             stopDetectingCommandInjection();
-            channel.config().setAutoRead(true);
         });
 
         return true;
@@ -215,8 +222,7 @@ public class NettyImapSession implements ImapSession, NettyConstants {
             return false;
         }
 
-        channel.eventLoop().execute(() -> {
-            channel.config().setAutoRead(false);
+        executeSafely(() -> {
             runnable.run();
             ZlibDecoder decoder = new JZlibDecoder(ZlibWrapper.NONE);
             ZlibEncoder encoder = new JZlibEncoder(ZlibWrapper.NONE, 5);
@@ -232,8 +238,6 @@ public class NettyImapSession implements ImapSession, NettyConstants {
                 channel.pipeline().addAfter(SSL_HANDLER, ZLIB_DECODER, decoder);
                 channel.pipeline().addAfter(SSL_HANDLER, ZLIB_ENCODER, encoder);
             }
-
-            channel.config().setAutoRead(true);
         });
 
         return true;


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/09: JAMES-3737 Extract methods in ImapRequestFrameDecoder

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 11fd6163a60c93a4a8a8a5a23f54bae2574f6a9e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 22 10:09:53 2022 +0700

    JAMES-3737 Extract methods in ImapRequestFrameDecoder
---
 .../imapserver/netty/ImapRequestFrameDecoder.java  | 197 ++++++++++++---------
 1 file changed, 111 insertions(+), 86 deletions(-)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
index aa97885abb..09bcbb0c53 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
@@ -27,12 +27,15 @@ import java.util.Deque;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.imap.api.ImapMessage;
 import org.apache.james.imap.api.ImapSessionState;
 import org.apache.james.imap.api.process.ImapSession;
+import org.apache.james.imap.decode.DecodingException;
 import org.apache.james.imap.decode.ImapDecoder;
 import org.apache.james.imap.decode.ImapRequestLineReader;
 import org.apache.james.protocols.netty.LineHandlerAware;
@@ -87,12 +90,70 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
         }
 
         int readerIndex = in.readerIndex();
-        boolean retry = false;
 
+        Map<String, Object> attachment = ctx.channel().attr(FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY).get();
+
+        Pair<ImapRequestLineReader, Integer> readerAndSize = obtainReader(ctx, in, attachment);
+        if (readerAndSize == null) return;
+
+        parseImapMessage(ctx, in, attachment, readerAndSize, readerIndex)
+            .ifPresent(out::add);
+    }
+
+    private Optional<ImapMessage> parseImapMessage(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, Pair<ImapRequestLineReader, Integer> readerAndSize, int readerIndex) throws DecodingException {
+        ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
+
+        // check if the session was removed before to prevent a harmless NPE. See JAMES-1312
+        // Also check if the session was logged out if so there is not need to try to decode it. See JAMES-1341
+        if (session != null && session.getState() != ImapSessionState.LOGOUT) {
+            try {
+
+                ImapMessage message = decoder.decode(readerAndSize.getLeft(), session);
+
+                // if size is != -1 the case was a literal. if thats the case we
+                // should not consume the line
+                // See JAMES-1199
+                if (readerAndSize.getRight() == -1) {
+                    readerAndSize.getLeft().consumeLine();
+                }
+
+                enableFraming(ctx);
+
+                attachment.clear();
+                return Optional.of(message);
+            } catch (NettyImapRequestLineReader.NotEnoughDataException e) {
+                // this exception was thrown because we don't have enough data yet
+                requestMoreData(ctx, in, attachment, e.getNeededSize(), readerIndex);
+            }
+        } else {
+            // The session was null so may be the case because the channel was already closed but there were still bytes in the buffer.
+            // We now try to disconnect the client if still connected
+            if (ctx.channel().isActive()) {
+                ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+            }
+        }
+        return Optional.empty();
+    }
+
+    private void requestMoreData(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int neededData, int readerIndex) {
+        // store the needed data size for later usage
+        attachment.put(NEEDED_DATA, neededData);
+
+        // SwitchableDelimiterBasedFrameDecoder added further to JAMES-1436.
+        disableFraming(ctx);
+        if (in.readableBytes() > 0) {
+            ByteBuf spareBytes = in.retainedDuplicate();
+            internalBuffer().clear();
+            ctx.fireChannelRead(spareBytes);
+        }
+        in.readerIndex(readerIndex);
+    }
+
+    private Pair<ImapRequestLineReader, Integer> obtainReader(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment) throws IOException {
+        boolean retry = false;
         ImapRequestLineReader reader;
         // check if we failed before and if we already know how much data we
         // need to sucess next run
-        Map<String, Object> attachment = ctx.channel().attr(FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY).get();
         int size = -1;
         if (attachment.containsKey(NEEDED_DATA)) {
             retry = true;
@@ -106,56 +167,12 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
 
                     // ok seems like it will not fit in the memory limit so we
                     // need to store it in a temporary file
-                    final File f;
-                    int written;
-
-                    OutputStream outputStream;
-                    // check if we have created a temporary file already or if
-                    // we need to create a new one
-                    if (attachment.containsKey(STORED_DATA)) {
-                        f = (File) attachment.get(STORED_DATA);
-                        written = (Integer) attachment.get(WRITTEN_DATA);
-                        outputStream = (OutputStream) attachment.get(OUTPUT_STREAM);
-                    } else {
-                        f = File.createTempFile("imap-literal", ".tmp");
-                        attachment.put(STORED_DATA, f);
-                        written = 0;
-                        attachment.put(WRITTEN_DATA, written);
-                        outputStream = new FileOutputStream(f, true);
-                        attachment.put(OUTPUT_STREAM, outputStream);
-
-                    }
-
-
-                    try {
-                        int amount = Math.min(in.readableBytes(), size - written);
-                        in.readBytes(outputStream, amount);
-                        written += amount;
-                    } catch (Exception e) {
-                        try {
-                            outputStream.close();
-                        } catch (IOException ignored) {
-                            //ignore exception during close
-                        }
-                        throw e;
-                    }
-                    // Check if all needed data was streamed to the file.
-                    if (written == size) {
-                        try {
-                            outputStream.close();
-                        } catch (IOException ignored) {
-                            //ignore exception during close
-                        }
-
-                        reader = new NettyStreamImapRequestLineReader(ctx.channel(), f, retry);
-                    } else {
-                        attachment.put(WRITTEN_DATA, written);
-                        return;
-                    }
+                    reader = uploadToAFile(ctx, in, retry, attachment, size);
+                    if (reader == null) return null;
 
                 } else {
                     in.resetReaderIndex();
-                    return;
+                    return null;
                 }
 
             } else {
@@ -165,50 +182,58 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
         } else {
             reader = new NettyImapRequestLineReader(ctx.channel(), in, retry, literalSizeLimit);
         }
+        return Pair.of(reader, size);
+    }
 
-        ImapSession session = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
+    private ImapRequestLineReader uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, boolean retry, Map<String, Object> attachment, int size) throws IOException {
+        ImapRequestLineReader reader;
+        final File f;
+        int written;
+
+        OutputStream outputStream;
+        // check if we have created a temporary file already or if
+        // we need to create a new one
+        if (attachment.containsKey(STORED_DATA)) {
+            f = (File) attachment.get(STORED_DATA);
+            written = (Integer) attachment.get(WRITTEN_DATA);
+            outputStream = (OutputStream) attachment.get(OUTPUT_STREAM);
+        } else {
+            f = File.createTempFile("imap-literal", ".tmp");
+            attachment.put(STORED_DATA, f);
+            written = 0;
+            attachment.put(WRITTEN_DATA, written);
+            outputStream = new FileOutputStream(f, true);
+            attachment.put(OUTPUT_STREAM, outputStream);
 
-        // check if the session was removed before to prevent a harmless NPE. See JAMES-1312
-        // Also check if the session was logged out if so there is not need to try to decode it. See JAMES-1341
-        if (session != null && session.getState() != ImapSessionState.LOGOUT) {
-            try {
+        }
 
-                ImapMessage message = decoder.decode(reader, session);
 
-                // if size is != -1 the case was a literal. if thats the case we
-                // should not consume the line
-                // See JAMES-1199
-                if (size == -1) {
-                    reader.consumeLine();
-                }
-                
-                enableFraming(ctx);
-                
-                attachment.clear();
-                out.add(message);
-            } catch (NettyImapRequestLineReader.NotEnoughDataException e) {
-                // this exception was thrown because we don't have enough data
-                // yet
-                int neededData = e.getNeededSize();
-                // store the needed data size for later usage
-                attachment.put(NEEDED_DATA, neededData);
-
-                // SwitchableDelimiterBasedFrameDecoder added further to JAMES-1436.
-                disableFraming(ctx);
-                if (in.readableBytes() > 0) {
-                    ByteBuf spareBytes = in.retainedDuplicate();
-                    internalBuffer().clear();
-                    ctx.fireChannelRead(spareBytes);
-                }
-                in.readerIndex(readerIndex);
+        try {
+            int amount = Math.min(in.readableBytes(), size - written);
+            in.readBytes(outputStream, amount);
+            written += amount;
+        } catch (Exception e) {
+            try {
+                outputStream.close();
+            } catch (IOException ignored) {
+                //ignore exception during close
             }
-        } else {
-            // The session was null so may be the case because the channel was already closed but there were still bytes in the buffer.
-            // We now try to disconnect the client if still connected
-            if (ctx.channel().isActive()) {
-                ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+            throw e;
+        }
+        // Check if all needed data was streamed to the file.
+        if (written == size) {
+            try {
+                outputStream.close();
+            } catch (IOException ignored) {
+                //ignore exception during close
             }
+
+            reader = new NettyStreamImapRequestLineReader(ctx.channel(), f, retry);
+        } else {
+            attachment.put(WRITTEN_DATA, written);
+            return null;
         }
+        return reader;
     }
 
     public void disableFraming(ChannelHandlerContext ctx) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 09/09: JAMES-3737 Backport resetReaderIndex fix

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 169d83405ec8da1070542822c73a4f2da988b1fe
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu May 5 18:27:41 2022 +0700

    JAMES-3737 Backport resetReaderIndex fix
---
 .../apache/james/imapserver/netty/ImapRequestFrameDecoder.java | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
index fc51b68c97..9cd437eaf9 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
@@ -103,7 +103,7 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
 
         Map<String, Object> attachment = ctx.channel().attr(FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY).get();
 
-        Pair<ImapRequestLineReader, Integer> readerAndSize = obtainReader(ctx, in, attachment);
+        Pair<ImapRequestLineReader, Integer> readerAndSize = obtainReader(ctx, in, attachment, readerIndex);
         if (readerAndSize == null) {
             return;
         }
@@ -161,7 +161,7 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
         in.readerIndex(readerIndex);
     }
 
-    private Pair<ImapRequestLineReader, Integer> obtainReader(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment) throws IOException {
+    private Pair<ImapRequestLineReader, Integer> obtainReader(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int readerIndex) throws IOException {
         boolean retry = false;
         ImapRequestLineReader reader;
         // check if we failed before and if we already know how much data we
@@ -180,7 +180,7 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
 
                     // ok seems like it will not fit in the memory limit so we
                     // need to store it in a temporary file
-                    uploadToAFile(ctx, in, attachment, size);
+                    uploadToAFile(ctx, in, attachment, size, readerIndex);
                     return null;
 
                 } else {
@@ -198,7 +198,7 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
         return Pair.of(reader, size);
     }
 
-    private void uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int size) throws IOException {
+    private void uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int size, int readerIndex) throws IOException {
         final File f;
         Sinks.Many<byte[]> sink;
 
@@ -244,7 +244,7 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
                         ImapRequestLineReader reader = new NettyStreamImapRequestLineReader(ctx.channel(), f, RETRY);
 
                         try {
-                            parseImapMessage(ctx, null, attachment, Pair.of(reader, written.get()))
+                            parseImapMessage(ctx, null, attachment, Pair.of(reader, written.get()), readerIndex)
                                 .ifPresent(ctx::fireChannelRead);
                         } catch (DecodingException e) {
                             ctx.fireExceptionCaught(e);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/09: JAMES-3737 Execute IMAP requests on eventLoop

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit caff4d0781d0f60ea75013d8528279e81e53fd69
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Apr 18 21:12:26 2022 +0700

    JAMES-3737 Execute IMAP requests on eventLoop
    
    Avoids context switches on each IMAP requests
---
 .../org/apache/james/imap/processor/AbstractMailboxProcessor.java | 4 +++-
 .../main/java/org/apache/james/imapserver/netty/IMAPServer.java   | 4 ++--
 .../apache/james/imapserver/netty/ImapRequestFrameDecoder.java    | 8 ++++++--
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
index 31db867e2a..1dc0ffce2f 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
@@ -71,6 +71,7 @@ import com.github.fge.lambdas.Throwing;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public abstract class AbstractMailboxProcessor<R extends ImapRequest> extends AbstractProcessor<R> {
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMailboxProcessor.class);
@@ -355,7 +356,8 @@ public abstract class AbstractMailboxProcessor<R extends ImapRequest> extends Ab
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
-        }));
+        })).subscribeOn(Schedulers.elastic())
+            .then();
     }
 
     /**
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
index b220446912..ff39b32d44 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
@@ -242,10 +242,10 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC
 
                 pipeline.addLast(CHUNK_WRITE_HANDLER, new ChunkedWriteHandler());
 
-                pipeline.addLast(getExecutorGroup(), REQUEST_DECODER, new ImapRequestFrameDecoder(decoder, inMemorySizeLimit,
+                pipeline.addLast(REQUEST_DECODER, new ImapRequestFrameDecoder(decoder, inMemorySizeLimit,
                     literalSizeLimit, maxLineLength));
 
-                pipeline.addLast(getExecutorGroup(), CORE_HANDLER, createCoreHandler());
+                pipeline.addLast(CORE_HANDLER, createCoreHandler());
             }
 
         };
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
index fd4e2eec8a..fc51b68c97 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
@@ -104,7 +104,9 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
         Map<String, Object> attachment = ctx.channel().attr(FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY).get();
 
         Pair<ImapRequestLineReader, Integer> readerAndSize = obtainReader(ctx, in, attachment);
-        if (readerAndSize == null) return;
+        if (readerAndSize == null) {
+            return;
+        }
 
         parseImapMessage(ctx, in, attachment, readerAndSize, readerIndex)
             .ifPresent(out::add);
@@ -249,7 +251,9 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
                         }
                     }
                 })
-                .subscribe(o -> {},
+                .subscribe(o -> {
+
+                    },
                     ctx::fireExceptionCaught,
                     () -> {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 05/09: JAMES-3737 StartTLS and compress no longer need a distinct writer

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 294cbd80003a17581c71c0b44edab0c30a0e3c06
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 22 20:12:49 2022 +0700

    JAMES-3737 StartTLS and compress no longer need a distinct writer
    
    They run on an eventLoop thus are immediately applied
---
 .../apache/james/imap/api/process/ImapSession.java   |  5 ++---
 .../apache/james/imap/encode/FakeImapSession.java    |  5 ++---
 .../james/imap/processor/CompressProcessor.java      |  4 ++--
 .../james/imap/processor/StartTLSProcessor.java      |  4 ++--
 .../james/imapserver/netty/NettyImapSession.java     | 20 ++------------------
 5 files changed, 10 insertions(+), 28 deletions(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
index d6de96a27e..f2938af6d6 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
@@ -26,7 +26,6 @@ import java.util.Optional;
 import org.apache.commons.text.RandomStringGenerator;
 import org.apache.james.core.Username;
 import org.apache.james.imap.api.ImapSessionState;
-import org.apache.james.imap.message.response.ImmutableStatusResponse;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.protocols.api.CommandDetectionSession;
 import org.apache.james.protocols.api.OidcSASLConfiguration;
@@ -160,7 +159,7 @@ public interface ImapSession extends CommandDetectionSession {
      * 
      * @return true if the encryption of the session was successfully
      */
-    boolean startTLS(ImmutableStatusResponse startTlsResponse);
+    boolean startTLS();
 
     /**
      * Return true if the session is bound to a TLS encrypted socket.
@@ -196,7 +195,7 @@ public interface ImapSession extends CommandDetectionSession {
      * 
      * @return success
      */
-    boolean startCompression(ImmutableStatusResponse response);
+    boolean startCompression();
 
     /**
      * Push in a new {@link ImapLineHandler} which is called for the next line received
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/encode/FakeImapSession.java b/protocols/imap/src/main/java/org/apache/james/imap/encode/FakeImapSession.java
index 9f0e675831..07d9a772cc 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/encode/FakeImapSession.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/encode/FakeImapSession.java
@@ -31,7 +31,6 @@ import org.apache.james.imap.api.ImapSessionState;
 import org.apache.james.imap.api.process.ImapLineHandler;
 import org.apache.james.imap.api.process.ImapSession;
 import org.apache.james.imap.api.process.SelectedMailbox;
-import org.apache.james.imap.message.response.ImmutableStatusResponse;
 import org.apache.james.protocols.api.OidcSASLConfiguration;
 import org.apache.james.util.concurrent.NamedThreadFactory;
 
@@ -131,7 +130,7 @@ public class FakeImapSession implements ImapSession {
     }
     
     @Override
-    public boolean startTLS(ImmutableStatusResponse response) {
+    public boolean startTLS() {
         return false;
     }
 
@@ -146,7 +145,7 @@ public class FakeImapSession implements ImapSession {
     }
 
     @Override
-    public boolean startCompression(ImmutableStatusResponse response) {
+    public boolean startCompression() {
         return false;
     }
 
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/CompressProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/CompressProcessor.java
index 8645a1da30..0deee52959 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/CompressProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/CompressProcessor.java
@@ -28,7 +28,6 @@ import org.apache.james.imap.api.message.response.StatusResponse;
 import org.apache.james.imap.api.message.response.StatusResponseFactory;
 import org.apache.james.imap.api.process.ImapSession;
 import org.apache.james.imap.message.request.CompressRequest;
-import org.apache.james.imap.message.response.ImmutableStatusResponse;
 import org.apache.james.imap.processor.base.AbstractProcessor;
 import org.apache.james.util.MDCBuilder;
 
@@ -60,7 +59,8 @@ public class CompressProcessor extends AbstractProcessor<CompressRequest> implem
                     } else {
                         StatusResponse response = factory.taggedOk(request.getTag(), request.getCommand(), HumanReadableText.DEFLATE_ACTIVE);
 
-                        if (session.startCompression((ImmutableStatusResponse) response)) {
+                        responder.respond(response);
+                        if (session.startCompression()) {
                             session.setAttribute(COMPRESSED, true);
                         }
                     }
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/StartTLSProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/StartTLSProcessor.java
index e2aca8f6cd..c5e46b4a7e 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/StartTLSProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/StartTLSProcessor.java
@@ -27,7 +27,6 @@ import org.apache.james.imap.api.message.Capability;
 import org.apache.james.imap.api.message.response.StatusResponseFactory;
 import org.apache.james.imap.api.process.ImapSession;
 import org.apache.james.imap.message.request.StartTLSRequest;
-import org.apache.james.imap.message.response.ImmutableStatusResponse;
 import org.apache.james.imap.processor.base.AbstractProcessor;
 import org.apache.james.util.MDCBuilder;
 
@@ -51,7 +50,8 @@ public class StartTLSProcessor extends AbstractProcessor<StartTLSRequest> implem
     protected Mono<Void> doProcess(StartTLSRequest request, Responder responder, ImapSession session) {
         return Mono.fromRunnable(() -> {
             if (session.supportStartTLS()) {
-                session.startTLS((ImmutableStatusResponse) factory.taggedOk(request.getTag(), request.getCommand(), HumanReadableText.STARTTLS));
+                responder.respond(factory.taggedOk(request.getTag(), request.getCommand(), HumanReadableText.STARTTLS));
+                session.startTLS();
             } else {
                 responder.respond(factory.taggedBad(request.getTag(), request.getCommand(), HumanReadableText.UNKNOWN_COMMAND));
             }
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
index 76c0b88550..5f62999e4f 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
@@ -18,7 +18,6 @@
  ****************************************************************/
 package org.apache.james.imapserver.netty;
 
-import java.io.IOException;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,11 +30,7 @@ import org.apache.james.imap.api.process.ImapLineHandler;
 import org.apache.james.imap.api.process.ImapSession;
 import org.apache.james.imap.api.process.SelectedMailbox;
 import org.apache.james.imap.encode.ImapResponseWriter;
-import org.apache.james.imap.encode.StatusResponseEncoder;
-import org.apache.james.imap.encode.base.ImapResponseComposerImpl;
-import org.apache.james.imap.encode.main.DefaultLocalizer;
 import org.apache.james.imap.message.Literal;
-import org.apache.james.imap.message.response.ImmutableStatusResponse;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.protocols.api.OidcSASLConfiguration;
 import org.apache.james.protocols.netty.Encryption;
@@ -167,12 +162,11 @@ public class NettyImapSession implements ImapSession, NettyConstants {
     }
 
     @Override
-    public boolean startTLS(ImmutableStatusResponse statusResponse) {
+    public boolean startTLS() {
         if (!supportStartTLS()) {
             return false;
         }
         channel.config().setAutoRead(false);
-        write(statusResponse);
 
         channel.pipeline().addFirst(SSL_HANDLER, secure.sslHandler());
         stopDetectingCommandInjection();
@@ -181,15 +175,6 @@ public class NettyImapSession implements ImapSession, NettyConstants {
         return true;
     }
 
-    private void write(ImmutableStatusResponse statusResponse) {
-        try {
-            new StatusResponseEncoder(new DefaultLocalizer()).encode(statusResponse,
-                new ImapResponseComposerImpl(new EventLoopImapResponseWriter(channel), BUFFER_SIZE));
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     public static class EventLoopImapResponseWriter implements ImapResponseWriter {
         private final Channel channel;
 
@@ -222,13 +207,12 @@ public class NettyImapSession implements ImapSession, NettyConstants {
     }
 
     @Override
-    public boolean startCompression(ImmutableStatusResponse response) {
+    public boolean startCompression() {
         if (!isCompressionSupported()) {
             return false;
         }
 
         channel.config().setAutoRead(false);
-        write(response);
         ZlibDecoder decoder = new JZlibDecoder(ZlibWrapper.NONE);
         ZlibEncoder encoder = new JZlibEncoder(ZlibWrapper.NONE, 5);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 06/09: JAMES-3737 Avoid a race condition upon compress

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1f2791f3256a92224aa11dbeaadb2af32c25c014
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue May 3 09:04:06 2022 +0700

    JAMES-3737 Avoid a race condition upon compress
    
    Compress response and activation was sent from
    the event loop causing them to be sent and
    activated prior earlier responses. This causes
    a race condition when several commands are
    batched.
---
 .../apache/james/imap/api/process/ImapSession.java |  2 +-
 .../apache/james/imap/encode/FakeImapSession.java  |  2 +-
 .../james/imap/processor/CompressProcessor.java    |  3 +-
 .../james/imapserver/netty/NettyImapSession.java   | 37 ++++++++++++----------
 4 files changed, 23 insertions(+), 21 deletions(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
index f2938af6d6..62fd6bf599 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
@@ -195,7 +195,7 @@ public interface ImapSession extends CommandDetectionSession {
      * 
      * @return success
      */
-    boolean startCompression();
+    boolean startCompression(Runnable runnable);
 
     /**
      * Push in a new {@link ImapLineHandler} which is called for the next line received
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/encode/FakeImapSession.java b/protocols/imap/src/main/java/org/apache/james/imap/encode/FakeImapSession.java
index 07d9a772cc..776d98b205 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/encode/FakeImapSession.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/encode/FakeImapSession.java
@@ -145,7 +145,7 @@ public class FakeImapSession implements ImapSession {
     }
 
     @Override
-    public boolean startCompression() {
+    public boolean startCompression(Runnable runnable) {
         return false;
     }
 
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/CompressProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/CompressProcessor.java
index 0deee52959..a350901647 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/CompressProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/CompressProcessor.java
@@ -59,8 +59,7 @@ public class CompressProcessor extends AbstractProcessor<CompressRequest> implem
                     } else {
                         StatusResponse response = factory.taggedOk(request.getTag(), request.getCommand(), HumanReadableText.DEFLATE_ACTIVE);
 
-                        responder.respond(response);
-                        if (session.startCompression()) {
+                        if (session.startCompression(() -> responder.respond(response))) {
                             session.setAttribute(COMPRESSED, true);
                         }
                     }
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
index 5f62999e4f..d294c963c5 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
@@ -207,28 +207,31 @@ public class NettyImapSession implements ImapSession, NettyConstants {
     }
 
     @Override
-    public boolean startCompression() {
+    public boolean startCompression(Runnable runnable) {
         if (!isCompressionSupported()) {
             return false;
         }
 
-        channel.config().setAutoRead(false);
-        ZlibDecoder decoder = new JZlibDecoder(ZlibWrapper.NONE);
-        ZlibEncoder encoder = new JZlibEncoder(ZlibWrapper.NONE, 5);
-
-        // Check if we have the SslHandler in the pipeline already
-        // if so we need to move the compress encoder and decoder
-        // behind it in the chain
-        // See JAMES-1186
-        if (channel.pipeline().get(SSL_HANDLER) == null) {
-            channel.pipeline().addFirst(ZLIB_DECODER, decoder);
-            channel.pipeline().addFirst(ZLIB_ENCODER, encoder);
-        } else {
-            channel.pipeline().addAfter(SSL_HANDLER, ZLIB_DECODER, decoder);
-            channel.pipeline().addAfter(SSL_HANDLER, ZLIB_ENCODER, encoder);
-        }
+        channel.eventLoop().execute(() -> {
+            channel.config().setAutoRead(false);
+            runnable.run();
+            ZlibDecoder decoder = new JZlibDecoder(ZlibWrapper.NONE);
+            ZlibEncoder encoder = new JZlibEncoder(ZlibWrapper.NONE, 5);
+
+            // Check if we have the SslHandler in the pipeline already
+            // if so we need to move the compress encoder and decoder
+            // behind it in the chain
+            // See JAMES-1186
+            if (channel.pipeline().get(SSL_HANDLER) == null) {
+                channel.pipeline().addFirst(ZLIB_DECODER, decoder);
+                channel.pipeline().addFirst(ZLIB_ENCODER, encoder);
+            } else {
+                channel.pipeline().addAfter(SSL_HANDLER, ZLIB_DECODER, decoder);
+                channel.pipeline().addAfter(SSL_HANDLER, ZLIB_ENCODER, encoder);
+            }
 
-        channel.config().setAutoRead(true);
+            channel.config().setAutoRead(true);
+        });
 
         return true;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 07/09: JAMES-3737 Avoid a race condition upon STARTTLS

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b80c0ed81a2a732d3b6f275cc130803404766305
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue May 3 09:24:12 2022 +0700

    JAMES-3737 Avoid a race condition upon STARTTLS
    
    STARTTLS response and activation was sent from
    the event loop causing them to be sent and
    activated prior earlier responses. This causes
    a race condition when several commands are
    batched.
---
 .../java/org/apache/james/imap/api/process/ImapSession.java |  2 +-
 .../java/org/apache/james/imap/encode/FakeImapSession.java  |  2 +-
 .../org/apache/james/imap/processor/StartTLSProcessor.java  |  3 +--
 .../org/apache/james/imapserver/netty/NettyImapSession.java | 13 ++++++++-----
 4 files changed, 11 insertions(+), 9 deletions(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
index 62fd6bf599..d0d0e9359d 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/api/process/ImapSession.java
@@ -159,7 +159,7 @@ public interface ImapSession extends CommandDetectionSession {
      * 
      * @return true if the encryption of the session was successfully
      */
-    boolean startTLS();
+    boolean startTLS(Runnable runnable);
 
     /**
      * Return true if the session is bound to a TLS encrypted socket.
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/encode/FakeImapSession.java b/protocols/imap/src/main/java/org/apache/james/imap/encode/FakeImapSession.java
index 776d98b205..2e53993579 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/encode/FakeImapSession.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/encode/FakeImapSession.java
@@ -130,7 +130,7 @@ public class FakeImapSession implements ImapSession {
     }
     
     @Override
-    public boolean startTLS() {
+    public boolean startTLS(Runnable runnable) {
         return false;
     }
 
diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/StartTLSProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/StartTLSProcessor.java
index c5e46b4a7e..7882e9d69a 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/StartTLSProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/StartTLSProcessor.java
@@ -50,8 +50,7 @@ public class StartTLSProcessor extends AbstractProcessor<StartTLSRequest> implem
     protected Mono<Void> doProcess(StartTLSRequest request, Responder responder, ImapSession session) {
         return Mono.fromRunnable(() -> {
             if (session.supportStartTLS()) {
-                responder.respond(factory.taggedOk(request.getTag(), request.getCommand(), HumanReadableText.STARTTLS));
-                session.startTLS();
+                session.startTLS(() -> responder.respond(factory.taggedOk(request.getTag(), request.getCommand(), HumanReadableText.STARTTLS)));
             } else {
                 responder.respond(factory.taggedBad(request.getTag(), request.getCommand(), HumanReadableText.UNKNOWN_COMMAND));
             }
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
index d294c963c5..5e2dd29cf8 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyImapSession.java
@@ -162,15 +162,18 @@ public class NettyImapSession implements ImapSession, NettyConstants {
     }
 
     @Override
-    public boolean startTLS() {
+    public boolean startTLS(Runnable runnable) {
         if (!supportStartTLS()) {
             return false;
         }
-        channel.config().setAutoRead(false);
+        channel.eventLoop().execute(() -> {
+            channel.config().setAutoRead(false);
+            runnable.run();
 
-        channel.pipeline().addFirst(SSL_HANDLER, secure.sslHandler());
-        stopDetectingCommandInjection();
-        channel.config().setAutoRead(true);
+            channel.pipeline().addFirst(SSL_HANDLER, secure.sslHandler());
+            stopDetectingCommandInjection();
+            channel.config().setAutoRead(true);
+        });
 
         return true;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org