You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/13 07:23:04 UTC

[james-project] branch master updated (c773dccb07 -> b077831a5b)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from c773dccb07 JAMES-3899 JavaDoc for StorageDirective
     new 1eb2f52ded [FIX] IMAP APPEND file deletion should be done on boundedElastic
     new a10d2e8300 [ENHANCEMENT] NettyStreamImapRequestLineReader: Remove no longer relevant TODO
     new f71681a87c [ENHANCEMENT] NettyStreamImapRequestLineReader: Buffer command parsing
     new a453247026 [PERF] Schedule Data chunker on boundedElastic
     new 57f1f7a979 [PERF] S3BlobStoreDAO: Open/close inout streams on elastic threads
     new a3f253e92c [PERF] CassandraMessageDAOV3: save headers on boundedElastic
     new b077831a5b [PERF] ImapRequestFrameDecoder: move file creation out of the Netty event loop

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cassandra/mail/CassandraMessageDAOV3.java      |  47 +++---
 .../cassandra/mail/CassandraMessageMapper.java     |   9 +-
 .../blob/objectstorage/aws/S3BlobStoreDAO.java     |   5 +-
 .../java/org/apache/james/util/DataChunker.java    |   5 +-
 .../imapserver/netty/ImapRequestFrameDecoder.java  | 166 ++++++++++++++-------
 .../netty/NettyStreamImapRequestLineReader.java    |  18 ++-
 6 files changed, 157 insertions(+), 93 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 06/07: [PERF] CassandraMessageDAOV3: save headers on boundedElastic

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a3f253e92c920635d25a99e775831acaf664fb7d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 12 13:46:31 2023 +0700

    [PERF] CassandraMessageDAOV3: save headers on boundedElastic
    
    Those are blocking operations for large files, and were performed on the
    Cassandra driver event loop.
---
 .../cassandra/mail/CassandraMessageDAOV3.java      | 47 +++++++++++-----------
 .../cassandra/mail/CassandraMessageMapper.java     |  9 ++---
 2 files changed, 26 insertions(+), 30 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
index 575c8852fd..fe45b058e8 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java
@@ -70,7 +70,6 @@ import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Attachments;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.ByteContent;
 import org.apache.james.mailbox.model.Cid;
@@ -99,6 +98,7 @@ import com.google.common.io.ByteSource;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple2;
 
 public class CassandraMessageDAOV3 {
@@ -178,7 +178,7 @@ public class CassandraMessageDAOV3 {
             .build());
     }
 
-    public Mono<Tuple2<BlobId, BlobId>> save(MailboxMessage message) throws MailboxException {
+    public Mono<Tuple2<BlobId, BlobId>> save(MailboxMessage message) {
         return saveContent(message)
             .flatMap(pair -> cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair)).thenReturn(pair));
     }
@@ -214,32 +214,31 @@ public class CassandraMessageDAOV3 {
 
     }
 
-    private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) throws MailboxException {
-        try {
-            byte[] headerContent = IOUtils.toByteArray(message.getHeaderContent(), message.getHeaderOctets());
-            ByteSource bodyByteSource = new ByteSource() {
-                @Override
-                public InputStream openStream() {
-                    try {
-                        return message.getBodyContent();
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
+    private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) {
+        return Mono.fromCallable(() -> IOUtils.toByteArray(message.getHeaderContent(), message.getHeaderOctets()))
+            .subscribeOn(Schedulers.boundedElastic())
+            .flatMap(headerContent -> {
+                ByteSource bodyByteSource = new ByteSource() {
+                    @Override
+                    public InputStream openStream() {
+                        try {
+                            return message.getBodyContent();
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
                     }
-                }
 
-                @Override
-                public long size() {
-                    return message.getBodyOctets();
-                }
-            };
+                    @Override
+                    public long size() {
+                        return message.getBodyOctets();
+                    }
+                };
 
-            Mono<BlobId> headerFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), headerContent, SIZE_BASED));
-            Mono<BlobId> bodyFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST));
+                Mono<BlobId> headerFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), headerContent, SIZE_BASED));
+                Mono<BlobId> bodyFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST));
 
-            return headerFuture.zipWith(bodyFuture);
-        } catch (IOException e) {
-            throw new MailboxException("Error saving mail content", e);
-        }
+                return headerFuture.zipWith(bodyFuture);
+            });
     }
 
     private BoundStatement boundWriteStatement(MailboxMessage message, Tuple2<BlobId, BlobId> pair) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 5f6924ae0b..04ed2cab13 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -75,7 +75,6 @@ import org.apache.james.util.streams.Limit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Streams;
@@ -405,10 +404,8 @@ public class CassandraMessageMapper implements MessageMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         return addUidAndModseqAndSaveDate(message, mailboxId)
-            .flatMap(Throwing.function((MailboxMessage messageWithUidAndModSeq) ->
-                save(mailbox, messageWithUidAndModSeq)
-                    .thenReturn(messageWithUidAndModSeq)))
-            .map(MailboxMessage::metaData);
+            .flatMap(messageWithUidAndModSeq -> save(mailbox, messageWithUidAndModSeq)
+                .thenReturn(messageWithUidAndModSeq.metaData()));
     }
 
     private Mono<MailboxMessage> addUidAndModseqAndSaveDate(MailboxMessage message, CassandraId mailboxId) {
@@ -620,7 +617,7 @@ public class CassandraMessageMapper implements MessageMapper {
             .map(MailboxMessage::metaData);
     }
 
-    private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException {
+    private Mono<Void> save(Mailbox mailbox, MailboxMessage message) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return messageDAOV3.save(message)
             .flatMap(headerAndBodyBlobIds -> insertIds(message, mailboxId, headerAndBodyBlobIds.getT1()));


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/07: [ENHANCEMENT] NettyStreamImapRequestLineReader: Buffer command parsing

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f71681a87c491fda6a47db6949a5c2e7043303af
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 12 13:43:25 2023 +0700

    [ENHANCEMENT] NettyStreamImapRequestLineReader: Buffer command parsing
    
    The first line of the IMAP request is parsed char by char. This generates uneeded
    small operations on the file system when parsing the first line of the APPEND command.
    Buffer it instead.
---
 .../james/imapserver/netty/NettyStreamImapRequestLineReader.java       | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyStreamImapRequestLineReader.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyStreamImapRequestLineReader.java
index a5b189dab5..67f922c7c4 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyStreamImapRequestLineReader.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyStreamImapRequestLineReader.java
@@ -30,6 +30,7 @@ import org.apache.james.imap.api.display.HumanReadableText;
 import org.apache.james.imap.decode.DecodingException;
 import org.apache.james.imap.message.Literal;
 import org.apache.james.imap.utils.EolInputStream;
+import org.apache.james.util.io.UnsynchronizedBufferedInputStream;
 
 import com.github.fge.lambdas.Throwing;
 import com.google.common.io.ByteStreams;
@@ -87,7 +88,7 @@ public class NettyStreamImapRequestLineReader extends AbstractNettyImapRequestLi
         super(channel, retry);
         this.backingFile = file;
         try {
-            this.in = new CountingInputStream(new FileInputStream(file));
+            this.in = new CountingInputStream(new UnsynchronizedBufferedInputStream(new FileInputStream(file)));
         } catch (FileNotFoundException e) {
             throw new RuntimeException(e);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/07: [PERF] Schedule Data chunker on boundedElastic

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a453247026d18390e518f33f2d9bc62f2feced73
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 12 13:44:09 2023 +0700

    [PERF] Schedule Data chunker on boundedElastic
    
    Reading an inputstream is blocking.
---
 .../util/src/main/java/org/apache/james/util/DataChunker.java        | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/server/container/util/src/main/java/org/apache/james/util/DataChunker.java b/server/container/util/src/main/java/org/apache/james/util/DataChunker.java
index 9dea8e9050..0db274d32f 100644
--- a/server/container/util/src/main/java/org/apache/james/util/DataChunker.java
+++ b/server/container/util/src/main/java/org/apache/james/util/DataChunker.java
@@ -29,6 +29,7 @@ import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class DataChunker {
 
@@ -58,8 +59,7 @@ public class DataChunker {
         Preconditions.checkNotNull(data);
         Preconditions.checkArgument(chunkSize > 0, CHUNK_SIZE_MUST_BE_STRICTLY_POSITIVE);
         UnsynchronizedBufferedInputStream bufferedInputStream = new UnsynchronizedBufferedInputStream(data);
-        return Flux
-            .<ByteBuffer>generate(sink -> {
+        return Flux.<ByteBuffer>generate(sink -> {
                 try {
                     byte[] buffer = new byte[chunkSize];
 
@@ -73,6 +73,7 @@ public class DataChunker {
                     sink.error(e);
                 }
             })
+            .subscribeOn(Schedulers.boundedElastic())
             .defaultIfEmpty(ByteBuffer.wrap(new byte[0]));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/07: [ENHANCEMENT] NettyStreamImapRequestLineReader: Remove no longer relevant TODO

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a10d2e83001251f3d8cacefde1eac068a8b1fa7b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 12 13:41:56 2023 +0700

    [ENHANCEMENT] NettyStreamImapRequestLineReader: Remove no longer relevant TODO
---
 .../james/imapserver/netty/NettyStreamImapRequestLineReader.java     | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyStreamImapRequestLineReader.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyStreamImapRequestLineReader.java
index e0c67c413e..a5b189dab5 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyStreamImapRequestLineReader.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyStreamImapRequestLineReader.java
@@ -106,7 +106,6 @@ public class NettyStreamImapRequestLineReader extends AbstractNettyImapRequestLi
      */
     @Override
     public char nextChar() throws DecodingException {
-        
         if (!nextSeen) {
             int next;
             try {
@@ -127,8 +126,7 @@ public class NettyStreamImapRequestLineReader extends AbstractNettyImapRequestLi
 
     /**
      * Reads and consumes a number of characters from the underlying reader,
-     * filling the char array provided. TODO: remove unnecessary copying of
-     * bits; line reader should maintain an internal ByteBuffer;
+     * filling the char array provided.
      * 
      * @param size
      *            number of characters to read and consume
@@ -144,7 +142,6 @@ public class NettyStreamImapRequestLineReader extends AbstractNettyImapRequestLi
         nextSeen = false;
         nextChar = 0;
 
-        //TODO move this copy in netty stack and try to avoid it
         try {
             long offset = in.getCount();
             in.skip(size);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 05/07: [PERF] S3BlobStoreDAO: Open/close inout streams on elastic threads

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 57f1f7a979307577d5188aa32b99415262ccde6c
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 12 13:45:11 2023 +0700

    [PERF] S3BlobStoreDAO: Open/close inout streams on elastic threads
    
    Blocking operations where performed on the Cassandra driver event loop.
---
 .../java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
index 564a8a8381..ba85e2375e 100644
--- a/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
+++ b/server/blob/blob-s3/src/main/java/org/apache/james/blob/objectstorage/aws/S3BlobStoreDAO.java
@@ -305,10 +305,9 @@ public class S3BlobStoreDAO implements BlobStoreDAO, Startable, Closeable {
 
         return Mono.fromCallable(content::size)
             .flatMap(contentLength ->
-                Mono.using(content::openStream,
+                Mono.usingWhen(Mono.fromCallable(content::openStream).subscribeOn(Schedulers.boundedElastic()),
                     stream -> save(resolvedBucketName, blobId, stream, contentLength),
-                    Throwing.consumer(InputStream::close),
-                    LAZY))
+                    stream -> Mono.fromRunnable(Throwing.runnable(stream::close))))
             .retryWhen(createBucketOnRetry(resolvedBucketName))
             .onErrorMap(IOException.class, e -> new ObjectStoreIOException("Error saving blob", e))
             .onErrorMap(SdkClientException.class, e -> new ObjectStoreIOException("Error saving blob", e))


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/07: [FIX] IMAP APPEND file deletion should be done on boundedElastic

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1eb2f52ded57f6c7e5b45e380483c0f2324baaba
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 12 10:25:49 2023 +0700

    [FIX] IMAP APPEND file deletion should be done on boundedElastic
---
 .../imapserver/netty/NettyStreamImapRequestLineReader.java     | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyStreamImapRequestLineReader.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyStreamImapRequestLineReader.java
index 57069c7576..e0c67c413e 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyStreamImapRequestLineReader.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyStreamImapRequestLineReader.java
@@ -24,19 +24,23 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.Files;
 
 import org.apache.james.imap.api.display.HumanReadableText;
 import org.apache.james.imap.decode.DecodingException;
 import org.apache.james.imap.message.Literal;
 import org.apache.james.imap.utils.EolInputStream;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.CountingInputStream;
 
 import io.netty.channel.Channel;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class NettyStreamImapRequestLineReader extends AbstractNettyImapRequestLineReader implements Closeable {
-    private class FileLiteral implements Literal, Closeable {
+    private static class FileLiteral implements Literal, Closeable {
         private final long offset;
         private final int size;
         private final boolean extraCRLF;
@@ -53,7 +57,9 @@ public class NettyStreamImapRequestLineReader extends AbstractNettyImapRequestLi
 
         @Override
         public void close() {
-            file.delete();
+            Mono.fromRunnable(Throwing.runnable(() -> Files.delete(file.toPath())))
+                .subscribeOn(Schedulers.boundedElastic())
+                .subscribe();
         }
 
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 07/07: [PERF] ImapRequestFrameDecoder: move file creation out of the Netty event loop

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b077831a5b28cf5319f6d23fb02bf1203281189b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 12 15:23:45 2023 +0700

    [PERF] ImapRequestFrameDecoder: move file creation out of the Netty event loop
    
    IMAP implementation buffers large mails into files. Priori to this change,
    the file creation was performed on the Netty event loop thread. This is
    problematic as file creation is blocking, it would result into the Netty
    event loop stalling, especially when the file system is overloaded.
    
    Also, in case of errors / connection closure during the buffering,
    we want to ensure that the intermediate temporary file is well
    disposed of. (not the case previously)
---
 .../imapserver/netty/ImapRequestFrameDecoder.java  | 166 ++++++++++++++-------
 1 file changed, 115 insertions(+), 51 deletions(-)

diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
index 02505969a2..4adab34f09 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
@@ -34,6 +34,8 @@ import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.imap.api.ImapMessage;
@@ -44,6 +46,7 @@ import org.apache.james.imap.decode.ImapDecoder;
 import org.apache.james.imap.decode.ImapRequestLineReader;
 import org.apache.james.protocols.netty.LineHandlerAware;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.buffer.ByteBuf;
@@ -53,6 +56,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
 import reactor.core.publisher.Sinks;
 import reactor.core.scheduler.Schedulers;
 
@@ -64,13 +68,9 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
     @VisibleForTesting
     static final String NEEDED_DATA = "NEEDED_DATA";
     private static final boolean RETRY = true;
-    private static final String STORED_DATA = "STORED_DATA";
-    private static final String WRITTEN_DATA = "WRITTEN_DATA";
-    private static final String OUTPUT_STREAM = "OUTPUT_STREAM";
     private static final String SINK = "SINK";
     private static final String SUBSCRIPTION = "SUBSCRIPTION";
 
-
     private final ImapDecoder decoder;
     private final int inMemorySizeLimit;
     private final int literalSizeLimit;
@@ -91,6 +91,16 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
         super.channelActive(ctx);
     }
 
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        Object subscription = ctx.channel().attr(FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY).get()
+            .get(SUBSCRIPTION);
+        if (subscription instanceof Disposable) {
+            ((Disposable) subscription).dispose();
+        }
+        super.channelInactive(ctx);
+    }
+
     @Override
     @SuppressWarnings("unchecked")
     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
@@ -195,73 +205,127 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
     }
 
     private void uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int size, int readerIndex) throws IOException {
-        final File f;
         Sinks.Many<byte[]> sink;
 
-        OutputStream outputStream;
         // check if we have created a temporary file already or if
         // we need to create a new one
-        if (attachment.containsKey(STORED_DATA)) {
+        if (attachment.containsKey(SINK)) {
             sink = (Sinks.Many<byte[]>) attachment.get(SINK);
         } else {
-            f = Files.createTempFile("imap-literal", ".tmp").toFile();
-            attachment.put(STORED_DATA, f);
-            final AtomicInteger written = new AtomicInteger(0);
-            attachment.put(WRITTEN_DATA, written);
-            outputStream = new FileOutputStream(f, true);
-            attachment.put(OUTPUT_STREAM, outputStream);
             sink = Sinks.many().unicast().onBackpressureBuffer();
             attachment.put(SINK, sink);
 
-            Disposable subscribe = sink.asFlux()
-                .publishOn(Schedulers.boundedElastic())
-                .doOnNext(next -> {
-                    try {
-                        int amount = Math.min(next.length, size - written.get());
-                        outputStream.write(next, 0, amount);
-                        written.addAndGet(amount);
-                    } catch (Exception e) {
-                        try {
-                            outputStream.close();
-                        } catch (IOException ignored) {
-                            //ignore exception during close
-                        }
-                        throw new RuntimeException(e);
-                    }
+            FileChunkConsumer fileChunkConsumer = new FileChunkConsumer(size,
+                (file, written) -> {
+                    ImapRequestLineReader reader = new NettyStreamImapRequestLineReader(ctx.channel(), file, RETRY);
 
-                    // Check if all needed data was streamed to the file.
-                    if (written.get() == size) {
-                        try {
-                            outputStream.close();
-                        } catch (IOException ignored) {
-                            //ignore exception during close
-                        }
-
-                        ImapRequestLineReader reader = new NettyStreamImapRequestLineReader(ctx.channel(), f, RETRY);
-
-                        try {
-                            parseImapMessage(ctx, null, attachment, Pair.of(reader, written.get()), readerIndex)
-                                .ifPresent(ctx::fireChannelRead);
-                        } catch (DecodingException e) {
-                            ctx.fireExceptionCaught(e);
-                        }
+                    try {
+                        parseImapMessage(ctx, null, attachment, Pair.of(reader, size), readerIndex)
+                            .ifPresent(ctx::fireChannelRead);
+                    } catch (DecodingException e) {
+                        ctx.fireExceptionCaught(e);
                     }
-                })
-                .subscribe(o -> {
-
+                });
+            Disposable subscribe = sink.asFlux()
+                .publishOn(Schedulers.boundedElastic())
+                .subscribe(fileChunkConsumer,
+                    e -> {
+                        fileChunkConsumer.discard();
+                        ctx.fireExceptionCaught(e);
                     },
-                    ctx::fireExceptionCaught,
                     () -> {
 
                     });
-            attachment.put(SUBSCRIPTION, subscribe);
+            attachment.put(SUBSCRIPTION, (Disposable) () -> {
+                subscribe.dispose();
+                fileChunkConsumer.discard();
+            });
         }
-        final int readableBytes = in.readableBytes();
-        final byte[] bytes = new byte[readableBytes];
+        int readableBytes = in.readableBytes();
+        byte[] bytes = new byte[readableBytes];
         in.readBytes(bytes);
         sink.emitNext(bytes, FAIL_FAST);
     }
 
+    static class FileChunkConsumer implements Consumer<byte[]> {
+        private final int size;
+        private final AtomicInteger written = new AtomicInteger(0);
+        private final BiConsumer<File, Integer> callback;
+        private final AtomicBoolean initialized = new AtomicBoolean(false);
+        private OutputStream outputStream;
+        private File f;
+
+        FileChunkConsumer(int size, BiConsumer<File, Integer> callback) {
+            this.size = size;
+            this.callback = callback;
+        }
+
+        @Override
+        public void accept(byte[] next) {
+            if (!initialized.get()) {
+                initialize();
+            }
+
+            writeChunk(next);
+
+            // Check if all needed data was streamed to the file.
+            if (isComplete()) {
+                finalizeDataTransfer();
+            }
+        }
+
+        private void initialize() {
+            try {
+                f = Files.createTempFile("imap-literal", ".tmp").toFile();
+                outputStream = new FileOutputStream(f, true);
+                initialized.set(true);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private void writeChunk(byte[] next) {
+            try {
+                int amount = Math.min(next.length, size - written.get());
+                outputStream.write(next, 0, amount);
+                written.addAndGet(amount);
+            } catch (Exception e) {
+                try {
+                    outputStream.close();
+                } catch (IOException ignored) {
+                    //ignore exception during close
+                }
+                throw new RuntimeException(e);
+            }
+        }
+
+        private boolean isComplete() {
+            return written.get() == size;
+        }
+
+        private void finalizeDataTransfer() {
+            try {
+                outputStream.close();
+            } catch (IOException ignored) {
+                //ignore exception during close
+            }
+
+            callback.accept(f, written.get());
+        }
+
+        void discard() {
+            Mono.fromRunnable(Throwing.runnable(() -> {
+                if (outputStream != null) {
+                    outputStream.close();
+                }
+                if (f != null) {
+                    Files.delete(f.toPath());
+                }
+            })).subscribeOn(Schedulers.boundedElastic())
+                .subscribe();
+        }
+    }
+
     public void disableFraming(ChannelHandlerContext ctx) {
         if (framingEnabled.getAndSet(false)) {
             ctx.channel().pipeline().remove(FRAMER);


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org