You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/05/09 01:35:33 UTC

[james-project] 03/09: JAMES-3737 Copy large APPEND to a file asynchronously

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e0017446c6373fdbc229dbc01ed463e1c8d3f1eb
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 22 11:23:21 2022 +0700

    JAMES-3737 Copy large APPEND to a file asynchronously
    
    This enable executing all IMAP requests on the event loop
---
 .../imapserver/netty/ImapRequestFrameDecoder.java  | 100 +++++++++++++--------
 1 file changed, 62 insertions(+), 38 deletions(-)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
index ab375b356e..fd4e2eec8a 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.imapserver.netty;
 
+import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -30,6 +32,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.imap.api.ImapMessage;
@@ -48,6 +51,9 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.ByteToMessageDecoder;
+import reactor.core.Disposable;
+import reactor.core.publisher.Sinks;
+import reactor.core.scheduler.Schedulers;
 
 
 /**
@@ -60,6 +66,9 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
     private static final String STORED_DATA = "STORED_DATA";
     private static final String WRITTEN_DATA = "WRITTEN_DATA";
     private static final String OUTPUT_STREAM = "OUTPUT_STREAM";
+    private static final String SINK = "SINK";
+    private static final String SUBSCRIPTION = "SUBSCRIPTION";
+
 
     private final ImapDecoder decoder;
     private final int inMemorySizeLimit;
@@ -169,8 +178,8 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
 
                     // ok seems like it will not fit in the memory limit so we
                     // need to store it in a temporary file
-                    reader = uploadToAFile(ctx, in, attachment, size);
-                    if (reader == null) return null;
+                    uploadToAFile(ctx, in, attachment, size);
+                    return null;
 
                 } else {
                     in.resetReaderIndex();
@@ -187,55 +196,70 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
         return Pair.of(reader, size);
     }
 
-    private ImapRequestLineReader uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int size) throws IOException {
-        ImapRequestLineReader reader;
+    private void uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int size) throws IOException {
         final File f;
-        int written;
+        Sinks.Many<byte[]> sink;
 
         OutputStream outputStream;
         // check if we have created a temporary file already or if
         // we need to create a new one
         if (attachment.containsKey(STORED_DATA)) {
-            f = (File) attachment.get(STORED_DATA);
-            written = (Integer) attachment.get(WRITTEN_DATA);
-            outputStream = (OutputStream) attachment.get(OUTPUT_STREAM);
+            sink = (Sinks.Many<byte[]>) attachment.get(SINK);
         } else {
             f = File.createTempFile("imap-literal", ".tmp");
             attachment.put(STORED_DATA, f);
-            written = 0;
+            final AtomicInteger written = new AtomicInteger(0);
             attachment.put(WRITTEN_DATA, written);
             outputStream = new FileOutputStream(f, true);
             attachment.put(OUTPUT_STREAM, outputStream);
-
-        }
-
-
-        try {
-            int amount = Math.min(in.readableBytes(), size - written);
-            in.readBytes(outputStream, amount);
-            written += amount;
-        } catch (Exception e) {
-            try {
-                outputStream.close();
-            } catch (IOException ignored) {
-                //ignore exception during close
-            }
-            throw e;
-        }
-        // Check if all needed data was streamed to the file.
-        if (written == size) {
-            try {
-                outputStream.close();
-            } catch (IOException ignored) {
-                //ignore exception during close
-            }
-
-            reader = new NettyStreamImapRequestLineReader(ctx.channel(), f, RETRY);
-        } else {
-            attachment.put(WRITTEN_DATA, written);
-            return null;
+            sink = Sinks.many().unicast().onBackpressureBuffer();
+            attachment.put(SINK, sink);
+
+            Disposable subscribe = sink.asFlux()
+                .publishOn(Schedulers.elastic())
+                .doOnNext(next -> {
+                    try {
+                        int amount = Math.min(next.length, size - written.get());
+                        outputStream.write(next, 0, amount);
+                        written.addAndGet(amount);
+                    } catch (Exception e) {
+                        try {
+                            outputStream.close();
+                        } catch (IOException ignored) {
+                            //ignore exception during close
+                        }
+                        throw new RuntimeException(e);
+                    }
+
+                    // Check if all needed data was streamed to the file.
+                    if (written.get() == size) {
+                        try {
+                            outputStream.close();
+                        } catch (IOException ignored) {
+                            //ignore exception during close
+                        }
+
+                        ImapRequestLineReader reader = new NettyStreamImapRequestLineReader(ctx.channel(), f, RETRY);
+
+                        try {
+                            parseImapMessage(ctx, null, attachment, Pair.of(reader, written.get()))
+                                .ifPresent(ctx::fireChannelRead);
+                        } catch (DecodingException e) {
+                            ctx.fireExceptionCaught(e);
+                        }
+                    }
+                })
+                .subscribe(o -> {},
+                    ctx::fireExceptionCaught,
+                    () -> {
+
+                    });
+            attachment.put(SUBSCRIPTION, subscribe);
         }
-        return reader;
+        final int readableBytes = in.readableBytes();
+        final byte[] bytes = new byte[readableBytes];
+        in.readBytes(bytes);
+        sink.emitNext(bytes, FAIL_FAST);
     }
 
     public void disableFraming(ChannelHandlerContext ctx) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org