You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/05/04 10:01:58 UTC

[james-project] 11/14: JAMES-3149 Reactify GetMessages mailbox store layer

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 03ceeddc552c456c9956ce1c1c19a89591f661ed
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 15 11:00:46 2020 +0700

    JAMES-3149 Reactify GetMessages mailbox store layer
---
 .../org/apache/james/mailbox/MessageIdManager.java |  3 +-
 .../org/apache/james/mailbox/RightManager.java     | 10 +++--
 .../cassandra/mail/CassandraMessageIdMapper.java   | 12 ++++--
 .../cassandra/CassandraTestSystemFixture.java      |  1 -
 .../manager/InMemoryIntegrationResources.java      |  2 +-
 .../james/mailbox/store/StoreMailboxManager.java   |  5 +++
 .../james/mailbox/store/StoreMessageIdManager.java | 43 ++++++++++++++--------
 .../james/mailbox/store/StoreRightManager.java     | 13 +++++++
 .../james/mailbox/store/mail/MessageIdMapper.java  |  6 +++
 .../cassandra/host/CassandraHostSystem.java        |  2 +-
 .../model/message/view/MessageFastViewFactory.java |  2 +-
 .../model/message/view/MessageFullViewFactory.java |  2 +-
 .../message/view/MessageHeaderViewFactory.java     |  2 +-
 .../message/view/MessageMetadataViewFactory.java   |  2 +-
 .../model/message/view/MessageViewFactory.java     |  2 +-
 .../message/view/MessageFastViewFactoryTest.java   | 19 ++++++----
 .../message/view/MessageFullViewFactoryTest.java   |  4 +-
 .../message/view/MessageHeaderViewFactoryTest.java |  4 +-
 .../view/MessageMetadataViewFactoryTest.java       |  4 +-
 ...mputeMessageFastViewProjectionListenerTest.java |  7 ++--
 20 files changed, 97 insertions(+), 48 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index b91ec24..92895c4 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -32,6 +32,7 @@ import org.apache.james.mailbox.model.FetchGroup;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.MessageResult;
+import org.reactivestreams.Publisher;
 
 import com.google.common.collect.ImmutableList;
 
@@ -45,7 +46,7 @@ public interface MessageIdManager {
 
     List<MessageResult> getMessages(Collection<MessageId> messageIds, FetchGroup minimal, MailboxSession mailboxSession) throws MailboxException;
 
-    default Flux<MessageResult> getMessagesReactive(Collection<MessageId> messageIds, FetchGroup minimal, MailboxSession mailboxSession) {
+    default Publisher<MessageResult> getMessagesReactive(Collection<MessageId> messageIds, FetchGroup minimal, MailboxSession mailboxSession) {
         try {
             return Flux.fromIterable(getMessages(messageIds, minimal, mailboxSession));
         } catch (MailboxException e) {
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/RightManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/RightManager.java
index 9ba7f87..b37b212 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/RightManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/RightManager.java
@@ -21,9 +21,11 @@ package org.apache.james.mailbox;
 
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxACL;
+import org.apache.james.mailbox.model.MailboxACL.Rfc4314Rights;
 import org.apache.james.mailbox.model.MailboxACL.Right;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.reactivestreams.Publisher;
 
 public interface RightManager {
     /**
@@ -76,7 +78,7 @@ public interface RightManager {
      * @return result suitable for the LISTRIGHTS IMAP command
      * @throws MailboxException in case of unknown mailbox or unsupported right
      */
-    MailboxACL.Rfc4314Rights[] listRights(MailboxPath mailboxPath, MailboxACL.EntryKey identifier, MailboxSession session) throws MailboxException;
+    Rfc4314Rights[] listRights(MailboxPath mailboxPath, MailboxACL.EntryKey identifier, MailboxSession session) throws MailboxException;
 
     MailboxACL listRights(MailboxPath mailboxPath, MailboxSession session) throws MailboxException;
 
@@ -91,7 +93,7 @@ public interface RightManager {
      *         {@code session.getUser()} is null.
      * @throws MailboxException in case of unknown mailbox or unsupported right
      */
-    MailboxACL.Rfc4314Rights myRights(MailboxPath mailboxPath, MailboxSession session) throws MailboxException;
+    Rfc4314Rights myRights(MailboxPath mailboxPath, MailboxSession session) throws MailboxException;
 
 
     /**
@@ -105,7 +107,9 @@ public interface RightManager {
      *         {@code session.getUser()} is null.
      * @throws MailboxException in case of unknown mailbox or unsupported right
      */
-    MailboxACL.Rfc4314Rights myRights(MailboxId mailboxId, MailboxSession session) throws MailboxException;
+    Rfc4314Rights myRights(MailboxId mailboxId, MailboxSession session) throws MailboxException;
+
+    Publisher<Rfc4314Rights> myRightsReactive(MailboxId mailboxId, MailboxSession session);
 
     /**
      * Update the Mailbox ACL of the designated mailbox. We can either ADD REPLACE or REMOVE entries.
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 25c473c..43b3c7c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -92,15 +92,19 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
 
     @Override
     public List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType fetchType) {
+        return findReactive(messageIds, fetchType)
+            .collectList()
+            .block();
+    }
+
+    @Override
+    public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, FetchType fetchType) {
         return Flux.fromStream(messageIds.stream())
-            .publishOn(Schedulers.elastic())
             .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize())
             .flatMap(composedMessageId -> messageDAO.retrieveMessage(composedMessageId, fetchType), cassandraConfiguration.getMessageReadChunkSize())
             .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType), cassandraConfiguration.getMessageReadChunkSize())
             .groupBy(MailboxMessage::getMailboxId)
-            .flatMap(this::keepMessageIfMailboxExists)
-            .collectList()
-            .block();
+            .flatMap(this::keepMessageIfMailboxExists);
     }
 
     private Flux<MailboxMessage> keepMessageIfMailboxExists(GroupedFlux<MailboxId, MailboxMessage> groupedFlux) {
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
index bf2c2e8..6524206 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
@@ -91,7 +91,6 @@ public class CassandraTestSystemFixture {
             mailboxManager,
             mapperFactory,
             eventBus,
-            new CassandraMessageId.Factory(),
             quotaManager,
             new DefaultUserQuotaRootResolver(mailboxManager.getSessionProvider(), mapperFactory),
             preDeletionHooks);
diff --git a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/manager/InMemoryIntegrationResources.java b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/manager/InMemoryIntegrationResources.java
index 0160d39..a80ff7a 100644
--- a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/manager/InMemoryIntegrationResources.java
+++ b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/manager/InMemoryIntegrationResources.java
@@ -317,7 +317,7 @@ public class InMemoryIntegrationResources implements IntegrationResources<StoreM
             MailboxManagerPreInstanciationStage preInstanciationStage = new MailboxManagerPreInstanciationStage(mailboxSessionMapperFactory, sessionProvider);
             PreDeletionHooks hooks = createHooks(preInstanciationStage);
             StoreMessageIdManager messageIdManager = new StoreMessageIdManager(storeRightManager, mailboxSessionMapperFactory,
-                eventBus, messageIdFactory, quotaManager, quotaRootResolver, hooks);
+                eventBus, quotaManager, quotaRootResolver, hooks);
 
             StoreAttachmentManager attachmentManager = new StoreAttachmentManager(mailboxSessionMapperFactory, messageIdManager);
             MailboxManagerSearchIndexStage searchIndexStage = new MailboxManagerSearchIndexStage(mailboxSessionMapperFactory, sessionProvider, attachmentManager);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index 165cda0..1f72b2c 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -773,6 +773,11 @@ public class StoreMailboxManager implements MailboxManager {
     }
 
     @Override
+    public Mono<Rfc4314Rights> myRightsReactive(MailboxId mailboxId, MailboxSession session) {
+        return storeRightManager.myRightsReactive(mailboxId, session);
+    }
+
+    @Override
     public Rfc4314Rights[] listRights(MailboxPath mailboxPath, MailboxACL.EntryKey key, MailboxSession session) throws MailboxException {
         return storeRightManager.listRights(mailboxPath, key, session);
     }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 086cb43..707924a 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.function.Predicate;
 
 import javax.inject.Inject;
@@ -94,19 +95,17 @@ public class StoreMessageIdManager implements MessageIdManager {
     private final RightManager rightManager;
     private final MailboxSessionMapperFactory mailboxSessionMapperFactory;
     private final EventBus eventBus;
-    private final MessageId.Factory messageIdFactory;
     private final QuotaManager quotaManager;
     private final QuotaRootResolver quotaRootResolver;
     private final PreDeletionHooks preDeletionHooks;
 
     @Inject
     public StoreMessageIdManager(RightManager rightManager, MailboxSessionMapperFactory mailboxSessionMapperFactory,
-                                 EventBus eventBus, MessageId.Factory messageIdFactory,
-                                 QuotaManager quotaManager, QuotaRootResolver quotaRootResolver, PreDeletionHooks preDeletionHooks) {
+                                 EventBus eventBus, QuotaManager quotaManager, QuotaRootResolver quotaRootResolver,
+                                 PreDeletionHooks preDeletionHooks) {
         this.rightManager = rightManager;
         this.mailboxSessionMapperFactory = mailboxSessionMapperFactory;
         this.eventBus = eventBus;
-        this.messageIdFactory = messageIdFactory;
         this.quotaManager = quotaManager;
         this.quotaRootResolver = quotaRootResolver;
         this.preDeletionHooks = preDeletionHooks;
@@ -138,18 +137,26 @@ public class StoreMessageIdManager implements MessageIdManager {
     }
 
     @Override
-    public List<MessageResult> getMessages(Collection<MessageId> messageIds, FetchGroup fetchGroup, MailboxSession mailboxSession) throws MailboxException {
-        MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
-
-        MessageMapper.FetchType fetchType = FetchGroupConverter.getFetchType(fetchGroup);
-        List<MailboxMessage> messageList = messageIdMapper.find(messageIds, fetchType);
-
-        ImmutableSet<MailboxId> allowedMailboxIds = getAllowedMailboxIds(mailboxSession, messageList, Right.Read);
+    public List<MessageResult> getMessages(Collection<MessageId> messageIds, FetchGroup fetchGroup, MailboxSession mailboxSession) {
+        return getMessagesReactive(messageIds, fetchGroup, mailboxSession)
+            .collectList()
+            .block();
+    }
 
-        return messageList.stream()
-            .filter(inMailboxes(allowedMailboxIds))
-            .map(Throwing.function(messageResultConverter(fetchGroup)).sneakyThrow())
-            .collect(Guavate.toImmutableList());
+    @Override
+    public Flux<MessageResult> getMessagesReactive(Collection<MessageId> messageIds, FetchGroup fetchGroup, MailboxSession mailboxSession) {
+        try {
+            MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
+
+            MessageMapper.FetchType fetchType = FetchGroupConverter.getFetchType(fetchGroup);
+            return messageIdMapper.findReactive(messageIds, fetchType)
+                .groupBy(MailboxMessage::getMailboxId)
+                .filterWhen(groupedFlux -> hasRightsOnMailboxReactive(mailboxSession, Right.Read).apply(groupedFlux.key()))
+                .flatMap(Function.identity())
+                .map(Throwing.function(messageResultConverter(fetchGroup)).sneakyThrow());
+        } catch (MailboxException e) {
+            return Flux.error(e);
+        }
     }
 
     private ImmutableSet<MailboxId> getAllowedMailboxIds(MailboxSession mailboxSession, List<MailboxMessage> messageList, Right... rights) {
@@ -426,6 +433,12 @@ public class StoreMessageIdManager implements MessageIdManager {
             .fallbackTo(any -> false);
     }
 
+    private Function<MailboxId, Mono<Boolean>> hasRightsOnMailboxReactive(MailboxSession session, Right... rights) {
+        return mailboxId -> Mono.from(rightManager.myRightsReactive(mailboxId, session))
+            .map(myRights -> myRights.contains(rights))
+            .onErrorResume(any -> Mono.just(false));
+    }
+
     private void assertRightsOnMailboxes(Collection<MailboxId> mailboxIds, MailboxSession mailboxSession, Right... rights) throws MailboxNotFoundException {
         Optional<MailboxId> mailboxForbidden = mailboxIds.stream()
             .filter(hasRightsOnMailbox(mailboxSession, rights).negate())
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
index 5707f15..1d33f99 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
@@ -53,6 +53,8 @@ import com.github.fge.lambdas.Throwing;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 
+import reactor.core.publisher.Mono;
+
 public class StoreRightManager implements RightManager {
     public static final boolean GROUP_FOLDER = true;
 
@@ -100,6 +102,17 @@ public class StoreRightManager implements RightManager {
         return myRights(mailbox, session);
     }
 
+    @Override
+    public Mono<Rfc4314Rights> myRightsReactive(MailboxId mailboxId, MailboxSession session) {
+        try {
+            MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(session);
+            return mapper.findMailboxByIdReactive(mailboxId)
+                .map(Throwing.function(mailbox -> myRights(mailbox, session)));
+        } catch (MailboxException e) {
+            return Mono.error(e);
+        }
+    }
+
     public Rfc4314Rights myRights(Mailbox mailbox, MailboxSession session) throws UnsupportedRightException {
         Username username = session.getUser();
 
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
index 094ce72..eeecd30 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
@@ -34,10 +34,16 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 
 import com.google.common.collect.Multimap;
 
+import reactor.core.publisher.Flux;
+
 public interface MessageIdMapper {
 
     List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType fetchType);
 
+    default Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, FetchType fetchType) {
+        return Flux.fromIterable(find(messageIds, fetchType));
+    }
+
     List<MailboxId> findMailboxes(MessageId messageId);
 
     void save(MailboxMessage mailboxMessage) throws MailboxNotFoundException, MailboxException;
diff --git a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java b/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
index 4cb76d0..9bd072b 100644
--- a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
+++ b/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
@@ -111,7 +111,7 @@ public class CassandraHostSystem extends JamesImapHostSystem {
         ListeningCurrentQuotaUpdater quotaUpdater = new ListeningCurrentQuotaUpdater(currentQuotaManager, quotaRootResolver, eventBus, quotaManager);
         QuotaComponents quotaComponents = new QuotaComponents(perUserMaxQuotaManager, quotaManager, quotaRootResolver);
 
-        StoreMessageIdManager messageIdManager = new StoreMessageIdManager(storeRightManager, mapperFactory, eventBus, messageIdFactory, quotaManager, quotaRootResolver, PreDeletionHooks.NO_PRE_DELETION_HOOK);
+        StoreMessageIdManager messageIdManager = new StoreMessageIdManager(storeRightManager, mapperFactory, eventBus, quotaManager, quotaRootResolver, PreDeletionHooks.NO_PRE_DELETION_HOOK);
         StoreAttachmentManager attachmentManager = new StoreAttachmentManager(mapperFactory, messageIdManager);
 
         MessageSearchIndex index = new SimpleMessageSearchIndex(mapperFactory, mapperFactory, new DefaultTextExtractor(), attachmentManager);
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java
index 514e666..5de639c 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java
@@ -137,7 +137,7 @@ public class MessageFastViewFactory implements MessageViewFactory<MessageFastVie
         if (messageIds.isEmpty()) {
             return Flux.empty();
         }
-        return messageIdManager.getMessagesReactive(messageIds, fetchGroup, mailboxSession)
+        return Flux.from(messageIdManager.getMessagesReactive(messageIds, fetchGroup, mailboxSession))
             .onErrorResume(MailboxException.class, ex -> {
                 LOGGER.error("cannot read messages {}", messageIds, ex);
                 return Flux.empty();
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java
index ee61d4f..2752b79 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java
@@ -88,7 +88,7 @@ public class MessageFullViewFactory implements MessageViewFactory<MessageFullVie
 
     @Override
     public Flux<MessageFullView> fromMessageIds(List<MessageId> messageIds, MailboxSession mailboxSession) {
-        Flux<MessageResult> messages = messageIdManager.getMessagesReactive(messageIds, FetchGroup.FULL_CONTENT, mailboxSession);
+        Flux<MessageResult> messages = Flux.from(messageIdManager.getMessagesReactive(messageIds, FetchGroup.FULL_CONTENT, mailboxSession));
         return Helpers.toMessageViews(messages, this::fromMessageResults);
     }
 
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactory.java
index 89c6398..4d4df61 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactory.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactory.java
@@ -55,7 +55,7 @@ public class MessageHeaderViewFactory implements MessageViewFactory<MessageHeade
 
     @Override
     public Flux<MessageHeaderView> fromMessageIds(List<MessageId> messageIds, MailboxSession mailboxSession) {
-        Flux<MessageResult> messages = messageIdManager.getMessagesReactive(messageIds, FetchGroup.HEADERS, mailboxSession);
+        Flux<MessageResult> messages = Flux.from(messageIdManager.getMessagesReactive(messageIds, FetchGroup.HEADERS, mailboxSession));
         return Helpers.toMessageViews(messages, this::fromMessageResults);
     }
 
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactory.java
index 3c9d2cf..15ce8ee 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactory.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactory.java
@@ -51,7 +51,7 @@ public class MessageMetadataViewFactory implements MessageViewFactory<MessageMet
 
     @Override
     public Flux<MessageMetadataView> fromMessageIds(List<MessageId> messageIds, MailboxSession session) {
-        Flux<MessageResult> messages = messageIdManager.getMessagesReactive(messageIds, FetchGroup.MINIMAL, session);
+        Flux<MessageResult> messages = Flux.from(messageIdManager.getMessagesReactive(messageIds, FetchGroup.MINIMAL, session));
         return Helpers.toMessageViews(messages, this::fromMessageResults);
     }
 
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageViewFactory.java
index 76b8c9a..88b8efe 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageViewFactory.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageViewFactory.java
@@ -133,8 +133,8 @@ public interface MessageViewFactory<T extends MessageView> {
         static <T extends MessageView> Flux<T> toMessageViews(Flux<MessageResult> messageResults, FromMessageResult<T> converter) {
             return messageResults
                 .groupBy(MessageResult::getMessageId)
-                .filterWhen(Flux::hasElements)
                 .flatMap(Flux::collectList)
+                .filter(list -> !list.isEmpty())
                 .flatMap(toMessageViews(converter));
         }
 
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactoryTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactoryTest.java
index 3c4d2de..8db9d4a 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactoryTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactoryTest.java
@@ -27,7 +27,7 @@ import static org.apache.james.jmap.draft.model.message.view.MessageViewFixture.
 import static org.apache.james.jmap.draft.model.message.view.MessageViewFixture.JACOB_EMAIL;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
 import java.util.List;
@@ -64,6 +64,7 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 class MessageFastViewFactoryTest {
@@ -152,7 +153,7 @@ class MessageFastViewFactoryTest {
 
     @Test
     void fromMessageIdsShouldReturnAMessageWithComputedFastProperties() throws Exception {
-        MessageFastView actual = messageFastViewFactory.fromMessageIds(ImmutableList.of(previewComputedMessage1.getMessageId()), session).get(0);
+        MessageFastView actual = messageFastViewFactory.fromMessageIds(ImmutableList.of(previewComputedMessage1.getMessageId()), session).collectList().block().get(0);
         SoftAssertions.assertSoftly(softly -> {
             softly.assertThat(actual.getId()).isEqualTo(previewComputedMessage1.getMessageId());
             softly.assertThat(actual.getMailboxIds()).containsExactly(bobInbox.getId());
@@ -177,7 +178,7 @@ class MessageFastViewFactoryTest {
 
     @Test
     void fromMessageIdsShouldReturnAMessageWithPropertiesComputedFromFullMessageWhenNotPreComputed() throws Exception {
-        MessageFastView actual = messageFastViewFactory.fromMessageIds(ImmutableList.of(missingPreviewComputedMessage1.getMessageId()), session).get(0);
+        MessageFastView actual = messageFastViewFactory.fromMessageIds(ImmutableList.of(missingPreviewComputedMessage1.getMessageId()), session).collectList().block().get(0);
         SoftAssertions.assertSoftly(softly -> {
             softly.assertThat(actual.getId()).isEqualTo(missingPreviewComputedMessage1.getMessageId());
             softly.assertThat(actual.getMailboxIds()).containsExactly(bobInbox.getId());
@@ -208,7 +209,7 @@ class MessageFastViewFactoryTest {
                     previewComputedMessage3.getMessageId(),
                     missingPreviewComputedMessage1.getMessageId(),
                     previewComputedMessage1.getMessageId()),
-                session);
+                session).collectList().block();
 
         assertThat(actual)
             .hasSize(4)
@@ -221,16 +222,18 @@ class MessageFastViewFactoryTest {
     }
 
     @Test
-    void fromMessageIdsShouldKeepProcessingEvenWhenFetchingFail() throws Exception {
-        doThrow(new MailboxException("mock exception"))
+    void fromMessageIdsShouldKeepProcessingEvenWhenFetchingFail() {
+        doReturn(Flux.error(new MailboxException("mock exception")))
             .doCallRealMethod()
-            .when(messageIdManager).getMessages(any(), any(), any());
+            .when(messageIdManager).getMessagesReactive(any(), any(), any());
 
         List<MessageFastView> actual = messageFastViewFactory
             .fromMessageIds(ImmutableList.of(
                     missingPreviewComputedMessage1.getMessageId(),
                     previewComputedMessage1.getMessageId()),
-                session);
+                session)
+            .collectList().block();
+
 
         assertThat(actual)
             .hasSize(1)
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactoryTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactoryTest.java
index 02b67d2..67eeb92 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactoryTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactoryTest.java
@@ -132,7 +132,7 @@ class MessageFullViewFactoryTest {
 
     @Test
     void fromMessageResultsShouldReturnCorrectView() throws Exception {
-        MessageFullView actual = messageFullViewFactory.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).get(0);
+        MessageFullView actual = messageFullViewFactory.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).collectList().block().get(0);
         SoftAssertions.assertSoftly(softly -> {
             softly.assertThat(actual.getId()).isEqualTo(message1.getMessageId());
             softly.assertThat(actual.getMailboxIds()).containsExactly(bobInbox.getId());
@@ -163,7 +163,7 @@ class MessageFullViewFactoryTest {
         messageIdManager.setInMailboxes(message1.getMessageId(), ImmutableList.of(bobInbox.getId(), bobMailbox.getId()), session);
         bobMailbox.setFlags(new Flags(Flags.Flag.FLAGGED), MessageManager.FlagsUpdateMode.REPLACE, MessageRange.all(), session);
 
-        MessageFullView actual = messageFullViewFactory.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).get(0);
+        MessageFullView actual = messageFullViewFactory.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).collectList().block().get(0);
         SoftAssertions.assertSoftly(softly -> {
             softly.assertThat(actual.getId()).isEqualTo(message1.getMessageId());
             softly.assertThat(actual.getKeywords()).isEqualTo(Keywords.strictFactory().from(Keyword.SEEN, Keyword.FLAGGED).asMap());
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactoryTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactoryTest.java
index b10f4fc..9eda7bc 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactoryTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactoryTest.java
@@ -81,7 +81,7 @@ class MessageHeaderViewFactoryTest {
 
     @Test
     void fromMessageResultsShouldReturnCorrectView() throws Exception {
-        MessageHeaderView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).get(0);
+        MessageHeaderView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).collectList().block().get(0);
         SoftAssertions.assertSoftly(softly -> {
             softly.assertThat(actual.getId()).isEqualTo(message1.getMessageId());
             softly.assertThat(actual.getMailboxIds()).containsExactly(bobInbox.getId());
@@ -106,7 +106,7 @@ class MessageHeaderViewFactoryTest {
         messageIdManager.setInMailboxes(message1.getMessageId(), ImmutableList.of(bobInbox.getId(), bobMailbox.getId()), session);
         bobMailbox.setFlags(new Flags(Flags.Flag.FLAGGED), MessageManager.FlagsUpdateMode.REPLACE, MessageRange.all(), session);
 
-        MessageHeaderView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).get(0);
+        MessageHeaderView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).collectList().block().get(0);
         SoftAssertions.assertSoftly(softly -> {
             softly.assertThat(actual.getId()).isEqualTo(message1.getMessageId());
             softly.assertThat(actual.getKeywords()).isEqualTo(Keywords.strictFactory().from(Keyword.SEEN, Keyword.FLAGGED).asMap());
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactoryTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactoryTest.java
index 5297c72..19651ec 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactoryTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactoryTest.java
@@ -73,7 +73,7 @@ class MessageMetadataViewFactoryTest {
 
     @Test
     void fromMessageResultsShouldReturnCorrectView() throws Exception {
-        MessageMetadataView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).get(0);
+        MessageMetadataView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).collectList().block().get(0);
         SoftAssertions.assertSoftly(softly -> {
            softly.assertThat(actual.getId()).isEqualTo(message1.getMessageId());
            softly.assertThat(actual.getMailboxIds()).containsExactly(bobInbox.getId());
@@ -89,7 +89,7 @@ class MessageMetadataViewFactoryTest {
         messageIdManager.setInMailboxes(message1.getMessageId(), ImmutableList.of(bobInbox.getId(), bobMailbox.getId()), session);
         bobMailbox.setFlags(new Flags(Flags.Flag.FLAGGED), MessageManager.FlagsUpdateMode.REPLACE, MessageRange.all(), session);
 
-        MessageMetadataView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).get(0);
+        MessageMetadataView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).collectList().block().get(0);
         SoftAssertions.assertSoftly(softly -> {
            softly.assertThat(actual.getId()).isEqualTo(message1.getMessageId());
            softly.assertThat(actual.getKeywords()).isEqualTo(Keywords.strictFactory().from(Keyword.SEEN, Keyword.FLAGGED).asMap());
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListenerTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListenerTest.java
index 98794c1..d6affb2 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListenerTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListenerTest.java
@@ -21,6 +21,7 @@ package org.apache.james.jmap.event;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 
@@ -65,6 +66,7 @@ import org.assertj.core.api.SoftAssertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 class ComputeMessageFastViewProjectionListenerTest {
@@ -266,9 +268,8 @@ class ComputeMessageFastViewProjectionListenerTest {
 
     @Test
     void shouldStoreEventInDeadLettersWhenGetMessagesException() throws Exception {
-        doThrow(new MailboxException())
-            .when(messageIdManager)
-            .getMessages(any(), any(), any());
+        doReturn(Flux.error(new MailboxException("mock exception")))
+            .when(messageIdManager).getMessagesReactive(any(), any(), any());
 
         inboxMessageManager.appendMessage(
             MessageManager.AppendCommand.builder()


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org