You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/03/19 07:44:49 UTC

[james-project] 01/02: [FIX] Reactify attachments

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a3fed7cc07c5591e582a00702ef667bcdacc6586
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Mar 14 18:18:27 2023 +0700

    [FIX] Reactify attachments
    
    This avoids blocking calls into a parrallel thread,
    or into a Cassandra driver thread...
---
 .../james/mailbox/AttachmentContentLoader.java     |  7 ++++++
 .../apache/james/mailbox/AttachmentManager.java    |  9 ++++++++
 .../cassandra/mail/CassandraAttachmentMapper.java  | 14 +++++++++++
 .../mailbox/store/StoreAttachmentManager.java      | 24 +++++++++++++++++++
 .../james/mailbox/store/mail/AttachmentMapper.java | 10 ++++++++
 .../james/jmap/draft/methods/BlobManagerImpl.java  | 27 ++++++++++++++++++----
 .../org/apache/james/jmap/draft/model/Blob.java    | 12 ++++++++++
 .../org/apache/james/jmap/http/DownloadRoutes.java |  6 +++--
 .../jmap/draft/methods/BlobManagerImplTest.java    |  5 ++--
 .../apache/james/jmap/routes/DownloadRoutes.scala  |  5 ++--
 10 files changed, 108 insertions(+), 11 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java
index 9536c722a1..5417a56197 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java
@@ -24,9 +24,16 @@ import java.io.InputStream;
 
 import org.apache.james.mailbox.exception.AttachmentNotFoundException;
 import org.apache.james.mailbox.model.AttachmentMetadata;
+import org.apache.james.util.ReactorUtils;
+
+import reactor.core.publisher.Mono;
 
 public interface AttachmentContentLoader {
 
     InputStream load(AttachmentMetadata attachment, MailboxSession mailboxSession) throws IOException, AttachmentNotFoundException;
 
+    default Mono<InputStream> loadReactive(AttachmentMetadata attachment, MailboxSession mailboxSession) {
+        return Mono.fromCallable(() -> load(attachment, mailboxSession))
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+    }
 }
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
index 5ed6de1ad3..2387ce722c 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
@@ -28,6 +28,8 @@ import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.AttachmentMetadata;
 
+import reactor.core.publisher.Mono;
+
 public interface AttachmentManager extends AttachmentContentLoader {
 
     boolean exists(AttachmentId attachmentId, MailboxSession session) throws MailboxException;
@@ -38,9 +40,16 @@ public interface AttachmentManager extends AttachmentContentLoader {
 
     InputStream loadAttachmentContent(AttachmentId attachmentId, MailboxSession mailboxSession) throws AttachmentNotFoundException, IOException;
 
+    Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId, MailboxSession mailboxSession);
+
     @Override
     default InputStream load(AttachmentMetadata attachment, MailboxSession mailboxSession) throws IOException, AttachmentNotFoundException {
         return loadAttachmentContent(attachment.getAttachmentId(), mailboxSession);
     }
 
+    @Override
+    default Mono<InputStream> loadReactive(AttachmentMetadata attachment, MailboxSession mailboxSession) {
+        return loadAttachmentContentReactive(attachment.getAttachmentId(), mailboxSession);
+    }
+
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index a1200efc53..b1b3e16fb0 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -72,6 +72,13 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
             .orElseThrow(() -> new AttachmentNotFoundException(attachmentId.getId()));
     }
 
+    @Override
+    public Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId) {
+        Preconditions.checkArgument(attachmentId != null);
+        return getAttachmentInternal(attachmentId)
+            .switchIfEmpty(Mono.error(() -> new AttachmentNotFoundException(attachmentId.getId())));
+    }
+
     @Override
     public List<AttachmentMetadata> getAttachments(Collection<AttachmentId> attachmentIds) {
         Preconditions.checkArgument(attachmentIds != null);
@@ -89,6 +96,13 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
             .orElseThrow(() -> new AttachmentNotFoundException(attachmentId.toString()));
     }
 
+    @Override
+    public Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId) {
+        return attachmentDAOV2.getAttachment(attachmentId, messageIdFallback(attachmentId))
+            .flatMap(daoAttachment -> Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), daoAttachment.getBlobId(), LOW_COST)))
+            .switchIfEmpty(Mono.error(() -> new AttachmentNotFoundException(attachmentId.toString())));
+    }
+
     private Mono<CassandraMessageId> messageIdFallback(AttachmentId attachmentId) {
         return attachmentMessageIdDAO.getOwnerMessageIds(attachmentId)
             .map(CassandraMessageId.class::cast)
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
index c095c9a0e4..b11ee9eccc 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
@@ -38,6 +38,8 @@ import org.apache.james.mailbox.store.mail.AttachmentMapperFactory;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Mono;
+
 public class StoreAttachmentManager implements AttachmentManager {
     private final AttachmentMapperFactory attachmentMapperFactory;
     private final MessageIdManager messageIdManager;
@@ -55,10 +57,21 @@ public class StoreAttachmentManager implements AttachmentManager {
         return exists(attachment, session);
     }
 
+    public Mono<Boolean> existsReactive(AttachmentId attachmentId, MailboxSession session) {
+        return attachmentMapperFactory.getAttachmentMapper(session)
+            .getAttachmentReactive(attachmentId)
+            .flatMap(attachment -> existsReactive(attachment, session));
+    }
+
     public boolean exists(AttachmentMetadata attachment, MailboxSession session) throws MailboxException {
         return !messageIdManager.accessibleMessages(ImmutableList.of(attachment.getMessageId()), session).isEmpty();
     }
 
+    public Mono<Boolean> existsReactive(AttachmentMetadata attachment, MailboxSession session) {
+        return Mono.from(messageIdManager.accessibleMessagesReactive(ImmutableList.of(attachment.getMessageId()), session))
+            .map(accessibleMessages -> !accessibleMessages.isEmpty());
+    }
+
     @Override
     public AttachmentMetadata getAttachment(AttachmentId attachmentId, MailboxSession mailboxSession) throws MailboxException, AttachmentNotFoundException {
         AttachmentMetadata attachment = attachmentMapperFactory.getAttachmentMapper(mailboxSession).getAttachment(attachmentId);
@@ -94,4 +107,15 @@ public class StoreAttachmentManager implements AttachmentManager {
         }
         return attachmentMapperFactory.getAttachmentMapper(mailboxSession).loadAttachmentContent(attachmentId);
     }
+
+    @Override
+    public Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId, MailboxSession mailboxSession) {
+        return existsReactive(attachmentId, mailboxSession)
+            .flatMap(exist -> {
+                    if (!exist) {
+                        return Mono.error(new AttachmentNotFoundException(attachmentId.getId()));
+                    }
+                    return attachmentMapperFactory.getAttachmentMapper(mailboxSession).loadAttachmentContentReactive(attachmentId);
+                });
+    }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
index 10967da25c..f566aa5f49 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
@@ -39,8 +39,18 @@ public interface AttachmentMapper extends Mapper {
 
     InputStream loadAttachmentContent(AttachmentId attachmentId) throws AttachmentNotFoundException, IOException;
 
+    default Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId) {
+        return Mono.fromCallable(() -> loadAttachmentContent(attachmentId))
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+    }
+
     AttachmentMetadata getAttachment(AttachmentId attachmentId) throws AttachmentNotFoundException;
 
+    default Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId) {
+        return Mono.fromCallable(() -> getAttachment(attachmentId))
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+    }
+
     List<AttachmentMetadata> getAttachments(Collection<AttachmentId> attachmentIds);
 
     List<MessageAttachmentMetadata> storeAttachments(Collection<ParsedAttachment> attachments, MessageId ownerMessageId) throws MailboxException;
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java
index c3e1d6d34e..5d7211d4b5 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.jmap.draft.methods;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
@@ -156,11 +158,26 @@ public class BlobManagerImpl implements BlobManager {
         BlobId blobId = BlobId.of(attachment.getAttachmentId());
         return Blob.builder()
             .id(blobId)
-            .payload(() -> {
-                try {
-                    return attachmentManager.loadAttachmentContent(attachment.getAttachmentId(), mailboxSession);
-                } catch (AttachmentNotFoundException e) {
-                    throw new BlobNotFoundException(blobId, e);
+            .payload(new Blob.InputStreamSupplier() {
+                @Override
+                public InputStream load() throws IOException, BlobNotFoundException {
+                    try {
+                        return loadReactive().block();
+                    } catch (RuntimeException e) {
+                        if (e.getCause() instanceof IOException) {
+                            throw (IOException) e.getCause();
+                        }
+                        if (e.getCause() instanceof BlobNotFoundException) {
+                            throw (BlobNotFoundException) e.getCause();
+                        }
+                        throw e;
+                    }
+                }
+
+                @Override
+                public Mono<InputStream> loadReactive() {
+                    return attachmentManager.loadAttachmentContentReactive(attachment.getAttachmentId(), mailboxSession)
+                        .onErrorResume(AttachmentNotFoundException.class, e -> Mono.error(new BlobNotFoundException(blobId, e)));
                 }
             })
             .size(attachment.getSize())
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java
index a4bae5353c..9e718f1ebb 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java
@@ -25,11 +25,14 @@ import java.util.Objects;
 
 import org.apache.james.jmap.draft.exceptions.BlobNotFoundException;
 import org.apache.james.mailbox.model.ContentType;
+import org.apache.james.util.ReactorUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Mono;
+
 public class Blob {
 
     @FunctionalInterface
@@ -40,6 +43,11 @@ public class Blob {
          * The caller is responsible of closing it.
          */
         InputStream load() throws IOException, BlobNotFoundException;
+
+        default Mono<InputStream> loadReactive() {
+            return Mono.fromCallable(this::load)
+                .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+        }
     }
 
     public static class Builder {
@@ -111,6 +119,10 @@ public class Blob {
         return payload.load();
     }
 
+    public Mono<InputStream> getStreamReactive() {
+        return payload.loadReactive();
+    }
+
     public long getSize() {
         return size;
     }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java
index 9c55521dd9..40231a7850 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java
@@ -213,11 +213,13 @@ public class DownloadRoutes implements JMAPRoutes {
         String blobId = downloadPath.getBlobId();
 
         return Mono.from(blobManager.retrieve(ImmutableList.of(BlobId.of(blobId)), mailboxSession))
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
             .switchIfEmpty(Mono.error(() -> new BlobNotFoundException(BlobId.of(blobId))))
             .flatMap(blob -> Mono.usingWhen(
-                Mono.fromCallable(blob::getStream),
+                blob.getStreamReactive(),
                 stream -> downloadBlob(downloadPath.getName(), response, blob.getSize(), blob.getContentType(), stream),
-                stream -> Mono.fromRunnable(Throwing.runnable(stream::close).sneakyThrow())))
+                stream -> Mono.fromRunnable(Throwing.runnable(stream::close).sneakyThrow()))
+                .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER))
             .onErrorResume(BlobNotFoundException.class, e -> {
                 LOGGER.info("Attachment '{}' not found", blobId, e);
                 return response.status(NOT_FOUND).send();
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java
index 1b412653ac..5fdf8d096e 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java
@@ -63,6 +63,7 @@ import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 class BlobManagerImplTest {
     static final String ID = "abc";
@@ -97,8 +98,8 @@ class BlobManagerImplTest {
                 .size(BYTES.length)
                 .type(CONTENT_TYPE)
                 .build());
-        when(attachmentManager.loadAttachmentContent(ATTACHMENT_ID, session))
-            .thenReturn(new ByteArrayInputStream(BYTES));
+        when(attachmentManager.loadAttachmentContentReactive(ATTACHMENT_ID, session))
+            .thenReturn(Mono.just(new ByteArrayInputStream(BYTES)));
 
         Blob blob = blobManager.retrieve(BLOB_ID_ATTACHMENT, session);
 
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
index 8b3446bd55..148111a084 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
@@ -178,8 +178,9 @@ class AttachmentBlobResolver @Inject()(val attachmentManager: AttachmentManager)
     AttachmentId.from(blobId.value.value) match {
       case attachmentId: AttachmentId =>
         Try(attachmentManager.getAttachment(attachmentId, mailboxSession)) match {
-          case Success(attachmentMetadata) => Applicable(
-            SMono.fromCallable(() => AttachmentBlob(attachmentMetadata, attachmentManager.load(attachmentMetadata, mailboxSession))))
+          case Success(attachmentMetadata) =>
+            Applicable(SMono(attachmentManager.loadReactive(attachmentMetadata, mailboxSession))
+              .map(content => AttachmentBlob(attachmentMetadata, content)))
           case Failure(_) => NonApplicable
         }
       case _ => NonApplicable


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org