You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/03/19 07:44:48 UTC

[james-project] branch master updated (85d4adf8bc -> a120e6a1a3)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from 85d4adf8bc JAMES-3754 MDC context for LIST EXTENDED (RFC-5258)
     new a3fed7cc07 [FIX] Reactify attachments
     new a120e6a1a3 [FIX] Sequential execution for LIST commands

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../james/mailbox/AttachmentContentLoader.java     |  7 ++++++
 .../apache/james/mailbox/AttachmentManager.java    |  9 ++++++++
 .../cassandra/mail/CassandraAttachmentMapper.java  | 14 +++++++++++
 .../mailbox/store/StoreAttachmentManager.java      | 24 +++++++++++++++++++
 .../james/mailbox/store/mail/AttachmentMapper.java | 10 ++++++++
 .../apache/james/imap/processor/ListProcessor.java |  4 ++--
 .../james/jmap/draft/methods/BlobManagerImpl.java  | 27 ++++++++++++++++++----
 .../org/apache/james/jmap/draft/model/Blob.java    | 12 ++++++++++
 .../org/apache/james/jmap/http/DownloadRoutes.java |  6 +++--
 .../jmap/draft/methods/BlobManagerImplTest.java    |  5 ++--
 .../apache/james/jmap/routes/DownloadRoutes.scala  |  5 ++--
 11 files changed, 110 insertions(+), 13 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/02: [FIX] Sequential execution for LIST commands

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a120e6a1a3d4a81f63ccfc355bff83f325204760
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Mar 14 18:19:28 2023 +0700

    [FIX] Sequential execution for LIST commands
    
    Being non-sequential leads to data races
    writing the data at the transport layer, causing
    massive issues.
---
 .../src/main/java/org/apache/james/imap/processor/ListProcessor.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/ListProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/ListProcessor.java
index 7f7ad6b6be..a98a27e1ea 100644
--- a/protocols/imap/src/main/java/org/apache/james/imap/processor/ListProcessor.java
+++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/ListProcessor.java
@@ -227,7 +227,7 @@ public class ListProcessor<T extends ListRequest> extends AbstractMailboxProcess
                 }
             })
             .doOnNext(metaData -> respondMyRights(request, responder, mailboxSession, metaData))
-            .flatMap(metaData -> request.getStatusDataItems().map(statusDataItems -> statusProcessor.sendStatus(retrieveMessageManager(metaData, mailboxSession), statusDataItems, responder, session, mailboxSession)).orElse(Mono.empty()))
+            .concatMap(metaData -> request.getStatusDataItems().map(statusDataItems -> statusProcessor.sendStatus(retrieveMessageManager(metaData, mailboxSession), statusDataItems, responder, session, mailboxSession)).orElse(Mono.empty()))
             .then();
     }
 
@@ -247,7 +247,7 @@ public class ListProcessor<T extends ListRequest> extends AbstractMailboxProcess
             .flatMapIterable(list -> list)
             .doOnNext(pathAndResponse -> responder.respond(pathAndResponse.getMiddle()))
             .doOnNext(pathAndResponse -> pathAndResponse.getRight().ifPresent(mailboxMetaData -> respondMyRights(request, responder, mailboxSession, mailboxMetaData)))
-            .flatMap(pathAndResponse -> sendStatusWhenSubscribed(session, request, responder, mailboxSession, pathAndResponse))
+            .concatMap(pathAndResponse -> sendStatusWhenSubscribed(session, request, responder, mailboxSession, pathAndResponse))
             .then();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/02: [FIX] Reactify attachments

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a3fed7cc07c5591e582a00702ef667bcdacc6586
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Mar 14 18:18:27 2023 +0700

    [FIX] Reactify attachments
    
    This avoids blocking calls into a parrallel thread,
    or into a Cassandra driver thread...
---
 .../james/mailbox/AttachmentContentLoader.java     |  7 ++++++
 .../apache/james/mailbox/AttachmentManager.java    |  9 ++++++++
 .../cassandra/mail/CassandraAttachmentMapper.java  | 14 +++++++++++
 .../mailbox/store/StoreAttachmentManager.java      | 24 +++++++++++++++++++
 .../james/mailbox/store/mail/AttachmentMapper.java | 10 ++++++++
 .../james/jmap/draft/methods/BlobManagerImpl.java  | 27 ++++++++++++++++++----
 .../org/apache/james/jmap/draft/model/Blob.java    | 12 ++++++++++
 .../org/apache/james/jmap/http/DownloadRoutes.java |  6 +++--
 .../jmap/draft/methods/BlobManagerImplTest.java    |  5 ++--
 .../apache/james/jmap/routes/DownloadRoutes.scala  |  5 ++--
 10 files changed, 108 insertions(+), 11 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java
index 9536c722a1..5417a56197 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java
@@ -24,9 +24,16 @@ import java.io.InputStream;
 
 import org.apache.james.mailbox.exception.AttachmentNotFoundException;
 import org.apache.james.mailbox.model.AttachmentMetadata;
+import org.apache.james.util.ReactorUtils;
+
+import reactor.core.publisher.Mono;
 
 public interface AttachmentContentLoader {
 
     InputStream load(AttachmentMetadata attachment, MailboxSession mailboxSession) throws IOException, AttachmentNotFoundException;
 
+    default Mono<InputStream> loadReactive(AttachmentMetadata attachment, MailboxSession mailboxSession) {
+        return Mono.fromCallable(() -> load(attachment, mailboxSession))
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+    }
 }
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
index 5ed6de1ad3..2387ce722c 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
@@ -28,6 +28,8 @@ import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.AttachmentMetadata;
 
+import reactor.core.publisher.Mono;
+
 public interface AttachmentManager extends AttachmentContentLoader {
 
     boolean exists(AttachmentId attachmentId, MailboxSession session) throws MailboxException;
@@ -38,9 +40,16 @@ public interface AttachmentManager extends AttachmentContentLoader {
 
     InputStream loadAttachmentContent(AttachmentId attachmentId, MailboxSession mailboxSession) throws AttachmentNotFoundException, IOException;
 
+    Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId, MailboxSession mailboxSession);
+
     @Override
     default InputStream load(AttachmentMetadata attachment, MailboxSession mailboxSession) throws IOException, AttachmentNotFoundException {
         return loadAttachmentContent(attachment.getAttachmentId(), mailboxSession);
     }
 
+    @Override
+    default Mono<InputStream> loadReactive(AttachmentMetadata attachment, MailboxSession mailboxSession) {
+        return loadAttachmentContentReactive(attachment.getAttachmentId(), mailboxSession);
+    }
+
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index a1200efc53..b1b3e16fb0 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -72,6 +72,13 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
             .orElseThrow(() -> new AttachmentNotFoundException(attachmentId.getId()));
     }
 
+    @Override
+    public Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId) {
+        Preconditions.checkArgument(attachmentId != null);
+        return getAttachmentInternal(attachmentId)
+            .switchIfEmpty(Mono.error(() -> new AttachmentNotFoundException(attachmentId.getId())));
+    }
+
     @Override
     public List<AttachmentMetadata> getAttachments(Collection<AttachmentId> attachmentIds) {
         Preconditions.checkArgument(attachmentIds != null);
@@ -89,6 +96,13 @@ public class CassandraAttachmentMapper implements AttachmentMapper {
             .orElseThrow(() -> new AttachmentNotFoundException(attachmentId.toString()));
     }
 
+    @Override
+    public Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId) {
+        return attachmentDAOV2.getAttachment(attachmentId, messageIdFallback(attachmentId))
+            .flatMap(daoAttachment -> Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), daoAttachment.getBlobId(), LOW_COST)))
+            .switchIfEmpty(Mono.error(() -> new AttachmentNotFoundException(attachmentId.toString())));
+    }
+
     private Mono<CassandraMessageId> messageIdFallback(AttachmentId attachmentId) {
         return attachmentMessageIdDAO.getOwnerMessageIds(attachmentId)
             .map(CassandraMessageId.class::cast)
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
index c095c9a0e4..b11ee9eccc 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
@@ -38,6 +38,8 @@ import org.apache.james.mailbox.store.mail.AttachmentMapperFactory;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Mono;
+
 public class StoreAttachmentManager implements AttachmentManager {
     private final AttachmentMapperFactory attachmentMapperFactory;
     private final MessageIdManager messageIdManager;
@@ -55,10 +57,21 @@ public class StoreAttachmentManager implements AttachmentManager {
         return exists(attachment, session);
     }
 
+    public Mono<Boolean> existsReactive(AttachmentId attachmentId, MailboxSession session) {
+        return attachmentMapperFactory.getAttachmentMapper(session)
+            .getAttachmentReactive(attachmentId)
+            .flatMap(attachment -> existsReactive(attachment, session));
+    }
+
     public boolean exists(AttachmentMetadata attachment, MailboxSession session) throws MailboxException {
         return !messageIdManager.accessibleMessages(ImmutableList.of(attachment.getMessageId()), session).isEmpty();
     }
 
+    public Mono<Boolean> existsReactive(AttachmentMetadata attachment, MailboxSession session) {
+        return Mono.from(messageIdManager.accessibleMessagesReactive(ImmutableList.of(attachment.getMessageId()), session))
+            .map(accessibleMessages -> !accessibleMessages.isEmpty());
+    }
+
     @Override
     public AttachmentMetadata getAttachment(AttachmentId attachmentId, MailboxSession mailboxSession) throws MailboxException, AttachmentNotFoundException {
         AttachmentMetadata attachment = attachmentMapperFactory.getAttachmentMapper(mailboxSession).getAttachment(attachmentId);
@@ -94,4 +107,15 @@ public class StoreAttachmentManager implements AttachmentManager {
         }
         return attachmentMapperFactory.getAttachmentMapper(mailboxSession).loadAttachmentContent(attachmentId);
     }
+
+    @Override
+    public Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId, MailboxSession mailboxSession) {
+        return existsReactive(attachmentId, mailboxSession)
+            .flatMap(exist -> {
+                    if (!exist) {
+                        return Mono.error(new AttachmentNotFoundException(attachmentId.getId()));
+                    }
+                    return attachmentMapperFactory.getAttachmentMapper(mailboxSession).loadAttachmentContentReactive(attachmentId);
+                });
+    }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
index 10967da25c..f566aa5f49 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java
@@ -39,8 +39,18 @@ public interface AttachmentMapper extends Mapper {
 
     InputStream loadAttachmentContent(AttachmentId attachmentId) throws AttachmentNotFoundException, IOException;
 
+    default Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId) {
+        return Mono.fromCallable(() -> loadAttachmentContent(attachmentId))
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+    }
+
     AttachmentMetadata getAttachment(AttachmentId attachmentId) throws AttachmentNotFoundException;
 
+    default Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId) {
+        return Mono.fromCallable(() -> getAttachment(attachmentId))
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+    }
+
     List<AttachmentMetadata> getAttachments(Collection<AttachmentId> attachmentIds);
 
     List<MessageAttachmentMetadata> storeAttachments(Collection<ParsedAttachment> attachments, MessageId ownerMessageId) throws MailboxException;
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java
index c3e1d6d34e..5d7211d4b5 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.jmap.draft.methods;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
@@ -156,11 +158,26 @@ public class BlobManagerImpl implements BlobManager {
         BlobId blobId = BlobId.of(attachment.getAttachmentId());
         return Blob.builder()
             .id(blobId)
-            .payload(() -> {
-                try {
-                    return attachmentManager.loadAttachmentContent(attachment.getAttachmentId(), mailboxSession);
-                } catch (AttachmentNotFoundException e) {
-                    throw new BlobNotFoundException(blobId, e);
+            .payload(new Blob.InputStreamSupplier() {
+                @Override
+                public InputStream load() throws IOException, BlobNotFoundException {
+                    try {
+                        return loadReactive().block();
+                    } catch (RuntimeException e) {
+                        if (e.getCause() instanceof IOException) {
+                            throw (IOException) e.getCause();
+                        }
+                        if (e.getCause() instanceof BlobNotFoundException) {
+                            throw (BlobNotFoundException) e.getCause();
+                        }
+                        throw e;
+                    }
+                }
+
+                @Override
+                public Mono<InputStream> loadReactive() {
+                    return attachmentManager.loadAttachmentContentReactive(attachment.getAttachmentId(), mailboxSession)
+                        .onErrorResume(AttachmentNotFoundException.class, e -> Mono.error(new BlobNotFoundException(blobId, e)));
                 }
             })
             .size(attachment.getSize())
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java
index a4bae5353c..9e718f1ebb 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java
@@ -25,11 +25,14 @@ import java.util.Objects;
 
 import org.apache.james.jmap.draft.exceptions.BlobNotFoundException;
 import org.apache.james.mailbox.model.ContentType;
+import org.apache.james.util.ReactorUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Mono;
+
 public class Blob {
 
     @FunctionalInterface
@@ -40,6 +43,11 @@ public class Blob {
          * The caller is responsible of closing it.
          */
         InputStream load() throws IOException, BlobNotFoundException;
+
+        default Mono<InputStream> loadReactive() {
+            return Mono.fromCallable(this::load)
+                .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+        }
     }
 
     public static class Builder {
@@ -111,6 +119,10 @@ public class Blob {
         return payload.load();
     }
 
+    public Mono<InputStream> getStreamReactive() {
+        return payload.loadReactive();
+    }
+
     public long getSize() {
         return size;
     }
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java
index 9c55521dd9..40231a7850 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java
@@ -213,11 +213,13 @@ public class DownloadRoutes implements JMAPRoutes {
         String blobId = downloadPath.getBlobId();
 
         return Mono.from(blobManager.retrieve(ImmutableList.of(BlobId.of(blobId)), mailboxSession))
+            .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
             .switchIfEmpty(Mono.error(() -> new BlobNotFoundException(BlobId.of(blobId))))
             .flatMap(blob -> Mono.usingWhen(
-                Mono.fromCallable(blob::getStream),
+                blob.getStreamReactive(),
                 stream -> downloadBlob(downloadPath.getName(), response, blob.getSize(), blob.getContentType(), stream),
-                stream -> Mono.fromRunnable(Throwing.runnable(stream::close).sneakyThrow())))
+                stream -> Mono.fromRunnable(Throwing.runnable(stream::close).sneakyThrow()))
+                .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER))
             .onErrorResume(BlobNotFoundException.class, e -> {
                 LOGGER.info("Attachment '{}' not found", blobId, e);
                 return response.status(NOT_FOUND).send();
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java
index 1b412653ac..5fdf8d096e 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java
@@ -63,6 +63,7 @@ import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 class BlobManagerImplTest {
     static final String ID = "abc";
@@ -97,8 +98,8 @@ class BlobManagerImplTest {
                 .size(BYTES.length)
                 .type(CONTENT_TYPE)
                 .build());
-        when(attachmentManager.loadAttachmentContent(ATTACHMENT_ID, session))
-            .thenReturn(new ByteArrayInputStream(BYTES));
+        when(attachmentManager.loadAttachmentContentReactive(ATTACHMENT_ID, session))
+            .thenReturn(Mono.just(new ByteArrayInputStream(BYTES)));
 
         Blob blob = blobManager.retrieve(BLOB_ID_ATTACHMENT, session);
 
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
index 8b3446bd55..148111a084 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
@@ -178,8 +178,9 @@ class AttachmentBlobResolver @Inject()(val attachmentManager: AttachmentManager)
     AttachmentId.from(blobId.value.value) match {
       case attachmentId: AttachmentId =>
         Try(attachmentManager.getAttachment(attachmentId, mailboxSession)) match {
-          case Success(attachmentMetadata) => Applicable(
-            SMono.fromCallable(() => AttachmentBlob(attachmentMetadata, attachmentManager.load(attachmentMetadata, mailboxSession))))
+          case Success(attachmentMetadata) =>
+            Applicable(SMono(attachmentManager.loadReactive(attachmentMetadata, mailboxSession))
+              .map(content => AttachmentBlob(attachmentMetadata, content)))
           case Failure(_) => NonApplicable
         }
       case _ => NonApplicable


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org