You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/05/09 01:35:33 UTC
[james-project] 03/09: JAMES-3737 Copy large APPEND to a file asynchronously
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e0017446c6373fdbc229dbc01ed463e1c8d3f1eb
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 22 11:23:21 2022 +0700
JAMES-3737 Copy large APPEND to a file asynchronously
This enable executing all IMAP requests on the event loop
---
.../imapserver/netty/ImapRequestFrameDecoder.java | 100 +++++++++++++--------
1 file changed, 62 insertions(+), 38 deletions(-)
diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
index ab375b356e..fd4e2eec8a 100644
--- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
+++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapRequestFrameDecoder.java
@@ -19,6 +19,8 @@
package org.apache.james.imapserver.netty;
+import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -30,6 +32,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.imap.api.ImapMessage;
@@ -48,6 +51,9 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.ByteToMessageDecoder;
+import reactor.core.Disposable;
+import reactor.core.publisher.Sinks;
+import reactor.core.scheduler.Schedulers;
/**
@@ -60,6 +66,9 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
private static final String STORED_DATA = "STORED_DATA";
private static final String WRITTEN_DATA = "WRITTEN_DATA";
private static final String OUTPUT_STREAM = "OUTPUT_STREAM";
+ private static final String SINK = "SINK";
+ private static final String SUBSCRIPTION = "SUBSCRIPTION";
+
private final ImapDecoder decoder;
private final int inMemorySizeLimit;
@@ -169,8 +178,8 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
// ok seems like it will not fit in the memory limit so we
// need to store it in a temporary file
- reader = uploadToAFile(ctx, in, attachment, size);
- if (reader == null) return null;
+ uploadToAFile(ctx, in, attachment, size);
+ return null;
} else {
in.resetReaderIndex();
@@ -187,55 +196,70 @@ public class ImapRequestFrameDecoder extends ByteToMessageDecoder implements Net
return Pair.of(reader, size);
}
- private ImapRequestLineReader uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int size) throws IOException {
- ImapRequestLineReader reader;
+ private void uploadToAFile(ChannelHandlerContext ctx, ByteBuf in, Map<String, Object> attachment, int size) throws IOException {
final File f;
- int written;
+ Sinks.Many<byte[]> sink;
OutputStream outputStream;
// check if we have created a temporary file already or if
// we need to create a new one
if (attachment.containsKey(STORED_DATA)) {
- f = (File) attachment.get(STORED_DATA);
- written = (Integer) attachment.get(WRITTEN_DATA);
- outputStream = (OutputStream) attachment.get(OUTPUT_STREAM);
+ sink = (Sinks.Many<byte[]>) attachment.get(SINK);
} else {
f = File.createTempFile("imap-literal", ".tmp");
attachment.put(STORED_DATA, f);
- written = 0;
+ final AtomicInteger written = new AtomicInteger(0);
attachment.put(WRITTEN_DATA, written);
outputStream = new FileOutputStream(f, true);
attachment.put(OUTPUT_STREAM, outputStream);
-
- }
-
-
- try {
- int amount = Math.min(in.readableBytes(), size - written);
- in.readBytes(outputStream, amount);
- written += amount;
- } catch (Exception e) {
- try {
- outputStream.close();
- } catch (IOException ignored) {
- //ignore exception during close
- }
- throw e;
- }
- // Check if all needed data was streamed to the file.
- if (written == size) {
- try {
- outputStream.close();
- } catch (IOException ignored) {
- //ignore exception during close
- }
-
- reader = new NettyStreamImapRequestLineReader(ctx.channel(), f, RETRY);
- } else {
- attachment.put(WRITTEN_DATA, written);
- return null;
+ sink = Sinks.many().unicast().onBackpressureBuffer();
+ attachment.put(SINK, sink);
+
+ Disposable subscribe = sink.asFlux()
+ .publishOn(Schedulers.elastic())
+ .doOnNext(next -> {
+ try {
+ int amount = Math.min(next.length, size - written.get());
+ outputStream.write(next, 0, amount);
+ written.addAndGet(amount);
+ } catch (Exception e) {
+ try {
+ outputStream.close();
+ } catch (IOException ignored) {
+ //ignore exception during close
+ }
+ throw new RuntimeException(e);
+ }
+
+ // Check if all needed data was streamed to the file.
+ if (written.get() == size) {
+ try {
+ outputStream.close();
+ } catch (IOException ignored) {
+ //ignore exception during close
+ }
+
+ ImapRequestLineReader reader = new NettyStreamImapRequestLineReader(ctx.channel(), f, RETRY);
+
+ try {
+ parseImapMessage(ctx, null, attachment, Pair.of(reader, written.get()))
+ .ifPresent(ctx::fireChannelRead);
+ } catch (DecodingException e) {
+ ctx.fireExceptionCaught(e);
+ }
+ }
+ })
+ .subscribe(o -> {},
+ ctx::fireExceptionCaught,
+ () -> {
+
+ });
+ attachment.put(SUBSCRIPTION, subscribe);
}
- return reader;
+ final int readableBytes = in.readableBytes();
+ final byte[] bytes = new byte[readableBytes];
+ in.readBytes(bytes);
+ sink.emitNext(bytes, FAIL_FAST);
}
public void disableFraming(ChannelHandlerContext ctx) {
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org