You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/03/19 07:44:49 UTC
[james-project] 01/02: [FIX] Reactify attachments
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a3fed7cc07c5591e582a00702ef667bcdacc6586
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Mar 14 18:18:27 2023 +0700
[FIX] Reactify attachments
This avoids blocking calls into a parrallel thread,
or into a Cassandra driver thread...
---
.../james/mailbox/AttachmentContentLoader.java | 7 ++++++
.../apache/james/mailbox/AttachmentManager.java | 9 ++++++++
.../cassandra/mail/CassandraAttachmentMapper.java | 14 +++++++++++
.../mailbox/store/StoreAttachmentManager.java | 24 +++++++++++++++++++
.../james/mailbox/store/mail/AttachmentMapper.java | 10 ++++++++
.../james/jmap/draft/methods/BlobManagerImpl.java | 27 ++++++++++++++++++----
.../org/apache/james/jmap/draft/model/Blob.java | 12 ++++++++++
.../org/apache/james/jmap/http/DownloadRoutes.java | 6 +++--
.../jmap/draft/methods/BlobManagerImplTest.java | 5 ++--
.../apache/james/jmap/routes/DownloadRoutes.scala | 5 ++--
10 files changed, 108 insertions(+), 11 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java
index 9536c722a1..5417a56197 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java
@@ -24,9 +24,16 @@ import java.io.InputStream;
import org.apache.james.mailbox.exception.AttachmentNotFoundException;
import org.apache.james.mailbox.model.AttachmentMetadata;
+import org.apache.james.util.ReactorUtils;
+
+import reactor.core.publisher.Mono;
public interface AttachmentContentLoader {
InputStream load(AttachmentMetadata attachment, MailboxSession mailboxSession) throws IOException, AttachmentNotFoundException;
+ default Mono<InputStream> loadReactive(AttachmentMetadata attachment, MailboxSession mailboxSession) {
+ return Mono.fromCallable(() -> load(attachment, mailboxSession))
+ .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+ }
}
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
index 5ed6de1ad3..2387ce722c 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
@@ -28,6 +28,8 @@ import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.model.AttachmentMetadata;
+import reactor.core.publisher.Mono;
+
public interface AttachmentManager extends AttachmentContentLoader {
boolean exists(AttachmentId attachmentId, MailboxSession session) throws MailboxException;
@@ -38,9 +40,16 @@ public interface AttachmentManager extends AttachmentContentLoader {
InputStream loadAttachmentContent(AttachmentId attachmentId, MailboxSession mailboxSession) throws AttachmentNotFoundException, IOException;
+ Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId, MailboxSession mailboxSession);
+
@Override
default InputStream load(AttachmentMetadata attachment, MailboxSession mailboxSession) throws IOException, AttachmentNotFoundException {
return loadAttachmentContent(attachment.getAttachmentId(), mailboxSession);
}
+ @Override
+ default Mono<InputStream> loadReactive(AttachmentMetadata attachment, MailboxSession mailboxSession) {
+ return loadAttachmentContentReactive(attachment.getAttachmentId(), mailboxSession);
+ }
+
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index a1200efc53..b1b3e16fb0 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -72,6 +72,13 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
.orElseThrow(() -> new AttachmentNotFoundException(attachmentId.getId()));
}
+ @Override
+ public Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId) {
+ Preconditions.checkArgument(attachmentId != null);
+ return getAttachmentInternal(attachmentId)
+ .switchIfEmpty(Mono.error(() -> new AttachmentNotFoundException(attachmentId.getId())));
+ }
+
@Override
public List<AttachmentMetadata> getAttachments(Collection<AttachmentId> attachmentIds) {
Preconditions.checkArgument(attachmentIds != null);
@@ -89,6 +96,13 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
.orElseThrow(() -> new AttachmentNotFoundException(attachmentId.toString()));
}
+ @Override
+ public Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId) {
+ return attachmentDAOV2.getAttachment(attachmentId, messageIdFallback(attachmentId))
+ .flatMap(daoAttachment -> Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), daoAttachment.getBlobId(), LOW_COST)))
+ .switchIfEmpty(Mono.error(() -> new AttachmentNotFoundException(attachmentId.toString())));
+ }
+
private Mono<CassandraMessageId> messageIdFallback(AttachmentId attachmentId) {
return attachmentMessageIdDAO.getOwnerMessageIds(attachmentId)
.map(CassandraMessageId.class::cast)
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
index c095c9a0e4..b11ee9eccc 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
@@ -38,6 +38,8 @@ import org.apache.james.mailbox.store.mail.AttachmentMapperFactory;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Mono;
+
public class StoreAttachmentManager implements AttachmentManager {
private final AttachmentMapperFactory attachmentMapperFactory;
private final MessageIdManager messageIdManager;
@@ -55,10 +57,21 @@ public class StoreAttachmentManager implements AttachmentManager {
return exists(attachment, session);
}
+ public Mono<Boolean> existsReactive(AttachmentId attachmentId, MailboxSession session) {
+ return attachmentMapperFactory.getAttachmentMapper(session)
+ .getAttachmentReactive(attachmentId)
+ .flatMap(attachment -> existsReactive(attachment, session));
+ }
+
public boolean exists(AttachmentMetadata attachment, MailboxSession session) throws MailboxException {
return !messageIdManager.accessibleMessages(ImmutableList.of(attachment.getMessageId()), session).isEmpty();
}
+ public Mono<Boolean> existsReactive(AttachmentMetadata attachment, MailboxSession session) {
+ return Mono.from(messageIdManager.accessibleMessagesReactive(ImmutableList.of(attachment.getMessageId()), session))
+ .map(accessibleMessages -> !accessibleMessages.isEmpty());
+ }
+
@Override
public AttachmentMetadata getAttachment(AttachmentId attachmentId, MailboxSession mailboxSession) throws MailboxException, AttachmentNotFoundException {
AttachmentMetadata attachment = attachmentMapperFactory.getAttachmentMapper(mailboxSession).getAttachment(attachmentId);
@@ -94,4 +107,15 @@ public class StoreAttachmentManager implements AttachmentManager {
}
return attachmentMapperFactory.getAttachmentMapper(mailboxSession).loadAttachmentContent(attachmentId);
}
+
+ @Override
+ public Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId, MailboxSession mailboxSession) {
+ return existsReactive(attachmentId, mailboxSession)
+ .flatMap(exist -> {
+ if (!exist) {
+ return Mono.error(new AttachmentNotFoundException(attachmentId.getId()));
+ }
+ return attachmentMapperFactory.getAttachmentMapper(mailboxSession).loadAttachmentContentReactive(attachmentId);
+ });
+ }
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
index 10967da25c..f566aa5f49 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
@@ -39,8 +39,18 @@ public interface AttachmentMapper extends Mapper {
InputStream loadAttachmentContent(AttachmentId attachmentId) throws AttachmentNotFoundException, IOException;
+ default Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId) {
+ return Mono.fromCallable(() -> loadAttachmentContent(attachmentId))
+ .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+ }
+
AttachmentMetadata getAttachment(AttachmentId attachmentId) throws AttachmentNotFoundException;
+ default Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId) {
+ return Mono.fromCallable(() -> getAttachment(attachmentId))
+ .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+ }
+
List<AttachmentMetadata> getAttachments(Collection<AttachmentId> attachmentIds);
List<MessageAttachmentMetadata> storeAttachments(Collection<ParsedAttachment> attachments, MessageId ownerMessageId) throws MailboxException;
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java
index c3e1d6d34e..5d7211d4b5 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java
@@ -19,6 +19,8 @@
package org.apache.james.jmap.draft.methods;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
@@ -156,11 +158,26 @@ public class BlobManagerImpl implements BlobManager {
BlobId blobId = BlobId.of(attachment.getAttachmentId());
return Blob.builder()
.id(blobId)
- .payload(() -> {
- try {
- return attachmentManager.loadAttachmentContent(attachment.getAttachmentId(), mailboxSession);
- } catch (AttachmentNotFoundException e) {
- throw new BlobNotFoundException(blobId, e);
+ .payload(new Blob.InputStreamSupplier() {
+ @Override
+ public InputStream load() throws IOException, BlobNotFoundException {
+ try {
+ return loadReactive().block();
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ if (e.getCause() instanceof BlobNotFoundException) {
+ throw (BlobNotFoundException) e.getCause();
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public Mono<InputStream> loadReactive() {
+ return attachmentManager.loadAttachmentContentReactive(attachment.getAttachmentId(), mailboxSession)
+ .onErrorResume(AttachmentNotFoundException.class, e -> Mono.error(new BlobNotFoundException(blobId, e)));
}
})
.size(attachment.getSize())
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java
index a4bae5353c..9e718f1ebb 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java
@@ -25,11 +25,14 @@ import java.util.Objects;
import org.apache.james.jmap.draft.exceptions.BlobNotFoundException;
import org.apache.james.mailbox.model.ContentType;
+import org.apache.james.util.ReactorUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Mono;
+
public class Blob {
@FunctionalInterface
@@ -40,6 +43,11 @@ public class Blob {
* The caller is responsible of closing it.
*/
InputStream load() throws IOException, BlobNotFoundException;
+
+ default Mono<InputStream> loadReactive() {
+ return Mono.fromCallable(this::load)
+ .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+ }
}
public static class Builder {
@@ -111,6 +119,10 @@ public class Blob {
return payload.load();
}
+ public Mono<InputStream> getStreamReactive() {
+ return payload.loadReactive();
+ }
+
public long getSize() {
return size;
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java
index 9c55521dd9..40231a7850 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java
@@ -213,11 +213,13 @@ public class DownloadRoutes implements JMAPRoutes {
String blobId = downloadPath.getBlobId();
return Mono.from(blobManager.retrieve(ImmutableList.of(BlobId.of(blobId)), mailboxSession))
+ .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
.switchIfEmpty(Mono.error(() -> new BlobNotFoundException(BlobId.of(blobId))))
.flatMap(blob -> Mono.usingWhen(
- Mono.fromCallable(blob::getStream),
+ blob.getStreamReactive(),
stream -> downloadBlob(downloadPath.getName(), response, blob.getSize(), blob.getContentType(), stream),
- stream -> Mono.fromRunnable(Throwing.runnable(stream::close).sneakyThrow())))
+ stream -> Mono.fromRunnable(Throwing.runnable(stream::close).sneakyThrow()))
+ .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER))
.onErrorResume(BlobNotFoundException.class, e -> {
LOGGER.info("Attachment '{}' not found", blobId, e);
return response.status(NOT_FOUND).send();
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java
index 1b412653ac..5fdf8d096e 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java
@@ -63,6 +63,7 @@ import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
class BlobManagerImplTest {
static final String ID = "abc";
@@ -97,8 +98,8 @@ class BlobManagerImplTest {
.size(BYTES.length)
.type(CONTENT_TYPE)
.build());
- when(attachmentManager.loadAttachmentContent(ATTACHMENT_ID, session))
- .thenReturn(new ByteArrayInputStream(BYTES));
+ when(attachmentManager.loadAttachmentContentReactive(ATTACHMENT_ID, session))
+ .thenReturn(Mono.just(new ByteArrayInputStream(BYTES)));
Blob blob = blobManager.retrieve(BLOB_ID_ATTACHMENT, session);
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
index 8b3446bd55..148111a084 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
@@ -178,8 +178,9 @@ class AttachmentBlobResolver @Inject()(val attachmentManager: AttachmentManager)
AttachmentId.from(blobId.value.value) match {
case attachmentId: AttachmentId =>
Try(attachmentManager.getAttachment(attachmentId, mailboxSession)) match {
- case Success(attachmentMetadata) => Applicable(
- SMono.fromCallable(() => AttachmentBlob(attachmentMetadata, attachmentManager.load(attachmentMetadata, mailboxSession))))
+ case Success(attachmentMetadata) =>
+ Applicable(SMono(attachmentManager.loadReactive(attachmentMetadata, mailboxSession))
+ .map(content => AttachmentBlob(attachmentMetadata, content)))
case Failure(_) => NonApplicable
}
case _ => NonApplicable
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org