You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by rc...@apache.org on 2020/05/04 10:01:58 UTC
[james-project] 11/14: JAMES-3149 Reactify GetMessages mailbox
store layer
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 03ceeddc552c456c9956ce1c1c19a89591f661ed
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 15 11:00:46 2020 +0700
JAMES-3149 Reactify GetMessages mailbox store layer
---
.../org/apache/james/mailbox/MessageIdManager.java | 3 +-
.../org/apache/james/mailbox/RightManager.java | 10 +++--
.../cassandra/mail/CassandraMessageIdMapper.java | 12 ++++--
.../cassandra/CassandraTestSystemFixture.java | 1 -
.../manager/InMemoryIntegrationResources.java | 2 +-
.../james/mailbox/store/StoreMailboxManager.java | 5 +++
.../james/mailbox/store/StoreMessageIdManager.java | 43 ++++++++++++++--------
.../james/mailbox/store/StoreRightManager.java | 13 +++++++
.../james/mailbox/store/mail/MessageIdMapper.java | 6 +++
.../cassandra/host/CassandraHostSystem.java | 2 +-
.../model/message/view/MessageFastViewFactory.java | 2 +-
.../model/message/view/MessageFullViewFactory.java | 2 +-
.../message/view/MessageHeaderViewFactory.java | 2 +-
.../message/view/MessageMetadataViewFactory.java | 2 +-
.../model/message/view/MessageViewFactory.java | 2 +-
.../message/view/MessageFastViewFactoryTest.java | 19 ++++++----
.../message/view/MessageFullViewFactoryTest.java | 4 +-
.../message/view/MessageHeaderViewFactoryTest.java | 4 +-
.../view/MessageMetadataViewFactoryTest.java | 4 +-
...mputeMessageFastViewProjectionListenerTest.java | 7 ++--
20 files changed, 97 insertions(+), 48 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index b91ec24..92895c4 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -32,6 +32,7 @@ import org.apache.james.mailbox.model.FetchGroup;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MessageResult;
+import org.reactivestreams.Publisher;
import com.google.common.collect.ImmutableList;
@@ -45,7 +46,7 @@ public interface MessageIdManager {
List<MessageResult> getMessages(Collection<MessageId> messageIds, FetchGroup minimal, MailboxSession mailboxSession) throws MailboxException;
- default Flux<MessageResult> getMessagesReactive(Collection<MessageId> messageIds, FetchGroup minimal, MailboxSession mailboxSession) {
+ default Publisher<MessageResult> getMessagesReactive(Collection<MessageId> messageIds, FetchGroup minimal, MailboxSession mailboxSession) {
try {
return Flux.fromIterable(getMessages(messageIds, minimal, mailboxSession));
} catch (MailboxException e) {
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/RightManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/RightManager.java
index 9ba7f87..b37b212 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/RightManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/RightManager.java
@@ -21,9 +21,11 @@ package org.apache.james.mailbox;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxACL;
+import org.apache.james.mailbox.model.MailboxACL.Rfc4314Rights;
import org.apache.james.mailbox.model.MailboxACL.Right;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
+import org.reactivestreams.Publisher;
public interface RightManager {
/**
@@ -76,7 +78,7 @@ public interface RightManager {
* @return result suitable for the LISTRIGHTS IMAP command
* @throws MailboxException in case of unknown mailbox or unsupported right
*/
- MailboxACL.Rfc4314Rights[] listRights(MailboxPath mailboxPath, MailboxACL.EntryKey identifier, MailboxSession session) throws MailboxException;
+ Rfc4314Rights[] listRights(MailboxPath mailboxPath, MailboxACL.EntryKey identifier, MailboxSession session) throws MailboxException;
MailboxACL listRights(MailboxPath mailboxPath, MailboxSession session) throws MailboxException;
@@ -91,7 +93,7 @@ public interface RightManager {
* {@code session.getUser()} is null.
* @throws MailboxException in case of unknown mailbox or unsupported right
*/
- MailboxACL.Rfc4314Rights myRights(MailboxPath mailboxPath, MailboxSession session) throws MailboxException;
+ Rfc4314Rights myRights(MailboxPath mailboxPath, MailboxSession session) throws MailboxException;
/**
@@ -105,7 +107,9 @@ public interface RightManager {
* {@code session.getUser()} is null.
* @throws MailboxException in case of unknown mailbox or unsupported right
*/
- MailboxACL.Rfc4314Rights myRights(MailboxId mailboxId, MailboxSession session) throws MailboxException;
+ Rfc4314Rights myRights(MailboxId mailboxId, MailboxSession session) throws MailboxException;
+
+ Publisher<Rfc4314Rights> myRightsReactive(MailboxId mailboxId, MailboxSession session);
/**
* Update the Mailbox ACL of the designated mailbox. We can either ADD REPLACE or REMOVE entries.
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 25c473c..43b3c7c 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -92,15 +92,19 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
@Override
public List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType fetchType) {
+ return findReactive(messageIds, fetchType)
+ .collectList()
+ .block();
+ }
+
+ @Override
+ public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, FetchType fetchType) {
return Flux.fromStream(messageIds.stream())
- .publishOn(Schedulers.elastic())
.flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize())
.flatMap(composedMessageId -> messageDAO.retrieveMessage(composedMessageId, fetchType), cassandraConfiguration.getMessageReadChunkSize())
.flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType), cassandraConfiguration.getMessageReadChunkSize())
.groupBy(MailboxMessage::getMailboxId)
- .flatMap(this::keepMessageIfMailboxExists)
- .collectList()
- .block();
+ .flatMap(this::keepMessageIfMailboxExists);
}
private Flux<MailboxMessage> keepMessageIfMailboxExists(GroupedFlux<MailboxId, MailboxMessage> groupedFlux) {
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
index bf2c2e8..6524206 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraTestSystemFixture.java
@@ -91,7 +91,6 @@ public class CassandraTestSystemFixture {
mailboxManager,
mapperFactory,
eventBus,
- new CassandraMessageId.Factory(),
quotaManager,
new DefaultUserQuotaRootResolver(mailboxManager.getSessionProvider(), mapperFactory),
preDeletionHooks);
diff --git a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/manager/InMemoryIntegrationResources.java b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/manager/InMemoryIntegrationResources.java
index 0160d39..a80ff7a 100644
--- a/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/manager/InMemoryIntegrationResources.java
+++ b/mailbox/memory/src/test/java/org/apache/james/mailbox/inmemory/manager/InMemoryIntegrationResources.java
@@ -317,7 +317,7 @@ public class InMemoryIntegrationResources implements IntegrationResources<StoreM
MailboxManagerPreInstanciationStage preInstanciationStage = new MailboxManagerPreInstanciationStage(mailboxSessionMapperFactory, sessionProvider);
PreDeletionHooks hooks = createHooks(preInstanciationStage);
StoreMessageIdManager messageIdManager = new StoreMessageIdManager(storeRightManager, mailboxSessionMapperFactory,
- eventBus, messageIdFactory, quotaManager, quotaRootResolver, hooks);
+ eventBus, quotaManager, quotaRootResolver, hooks);
StoreAttachmentManager attachmentManager = new StoreAttachmentManager(mailboxSessionMapperFactory, messageIdManager);
MailboxManagerSearchIndexStage searchIndexStage = new MailboxManagerSearchIndexStage(mailboxSessionMapperFactory, sessionProvider, attachmentManager);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index 165cda0..1f72b2c 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -773,6 +773,11 @@ public class StoreMailboxManager implements MailboxManager {
}
@Override
+ public Mono<Rfc4314Rights> myRightsReactive(MailboxId mailboxId, MailboxSession session) {
+ return storeRightManager.myRightsReactive(mailboxId, session);
+ }
+
+ @Override
public Rfc4314Rights[] listRights(MailboxPath mailboxPath, MailboxACL.EntryKey key, MailboxSession session) throws MailboxException {
return storeRightManager.listRights(mailboxPath, key, session);
}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 086cb43..707924a 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
import java.util.function.Predicate;
import javax.inject.Inject;
@@ -94,19 +95,17 @@ public class StoreMessageIdManager implements MessageIdManager {
private final RightManager rightManager;
private final MailboxSessionMapperFactory mailboxSessionMapperFactory;
private final EventBus eventBus;
- private final MessageId.Factory messageIdFactory;
private final QuotaManager quotaManager;
private final QuotaRootResolver quotaRootResolver;
private final PreDeletionHooks preDeletionHooks;
@Inject
public StoreMessageIdManager(RightManager rightManager, MailboxSessionMapperFactory mailboxSessionMapperFactory,
- EventBus eventBus, MessageId.Factory messageIdFactory,
- QuotaManager quotaManager, QuotaRootResolver quotaRootResolver, PreDeletionHooks preDeletionHooks) {
+ EventBus eventBus, QuotaManager quotaManager, QuotaRootResolver quotaRootResolver,
+ PreDeletionHooks preDeletionHooks) {
this.rightManager = rightManager;
this.mailboxSessionMapperFactory = mailboxSessionMapperFactory;
this.eventBus = eventBus;
- this.messageIdFactory = messageIdFactory;
this.quotaManager = quotaManager;
this.quotaRootResolver = quotaRootResolver;
this.preDeletionHooks = preDeletionHooks;
@@ -138,18 +137,26 @@ public class StoreMessageIdManager implements MessageIdManager {
}
@Override
- public List<MessageResult> getMessages(Collection<MessageId> messageIds, FetchGroup fetchGroup, MailboxSession mailboxSession) throws MailboxException {
- MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
-
- MessageMapper.FetchType fetchType = FetchGroupConverter.getFetchType(fetchGroup);
- List<MailboxMessage> messageList = messageIdMapper.find(messageIds, fetchType);
-
- ImmutableSet<MailboxId> allowedMailboxIds = getAllowedMailboxIds(mailboxSession, messageList, Right.Read);
+ public List<MessageResult> getMessages(Collection<MessageId> messageIds, FetchGroup fetchGroup, MailboxSession mailboxSession) {
+ return getMessagesReactive(messageIds, fetchGroup, mailboxSession)
+ .collectList()
+ .block();
+ }
- return messageList.stream()
- .filter(inMailboxes(allowedMailboxIds))
- .map(Throwing.function(messageResultConverter(fetchGroup)).sneakyThrow())
- .collect(Guavate.toImmutableList());
+ @Override
+ public Flux<MessageResult> getMessagesReactive(Collection<MessageId> messageIds, FetchGroup fetchGroup, MailboxSession mailboxSession) {
+ try {
+ MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
+
+ MessageMapper.FetchType fetchType = FetchGroupConverter.getFetchType(fetchGroup);
+ return messageIdMapper.findReactive(messageIds, fetchType)
+ .groupBy(MailboxMessage::getMailboxId)
+ .filterWhen(groupedFlux -> hasRightsOnMailboxReactive(mailboxSession, Right.Read).apply(groupedFlux.key()))
+ .flatMap(Function.identity())
+ .map(Throwing.function(messageResultConverter(fetchGroup)).sneakyThrow());
+ } catch (MailboxException e) {
+ return Flux.error(e);
+ }
}
private ImmutableSet<MailboxId> getAllowedMailboxIds(MailboxSession mailboxSession, List<MailboxMessage> messageList, Right... rights) {
@@ -426,6 +433,12 @@ public class StoreMessageIdManager implements MessageIdManager {
.fallbackTo(any -> false);
}
+ private Function<MailboxId, Mono<Boolean>> hasRightsOnMailboxReactive(MailboxSession session, Right... rights) {
+ return mailboxId -> Mono.from(rightManager.myRightsReactive(mailboxId, session))
+ .map(myRights -> myRights.contains(rights))
+ .onErrorResume(any -> Mono.just(false));
+ }
+
private void assertRightsOnMailboxes(Collection<MailboxId> mailboxIds, MailboxSession mailboxSession, Right... rights) throws MailboxNotFoundException {
Optional<MailboxId> mailboxForbidden = mailboxIds.stream()
.filter(hasRightsOnMailbox(mailboxSession, rights).negate())
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
index 5707f15..1d33f99 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
@@ -53,6 +53,8 @@ import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
+import reactor.core.publisher.Mono;
+
public class StoreRightManager implements RightManager {
public static final boolean GROUP_FOLDER = true;
@@ -100,6 +102,17 @@ public class StoreRightManager implements RightManager {
return myRights(mailbox, session);
}
+ @Override
+ public Mono<Rfc4314Rights> myRightsReactive(MailboxId mailboxId, MailboxSession session) {
+ try {
+ MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(session);
+ return mapper.findMailboxByIdReactive(mailboxId)
+ .map(Throwing.function(mailbox -> myRights(mailbox, session)));
+ } catch (MailboxException e) {
+ return Mono.error(e);
+ }
+ }
+
public Rfc4314Rights myRights(Mailbox mailbox, MailboxSession session) throws UnsupportedRightException {
Username username = session.getUser();
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
index 094ce72..eeecd30 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
@@ -34,10 +34,16 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import com.google.common.collect.Multimap;
+import reactor.core.publisher.Flux;
+
public interface MessageIdMapper {
List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType fetchType);
+ default Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, FetchType fetchType) {
+ return Flux.fromIterable(find(messageIds, fetchType));
+ }
+
List<MailboxId> findMailboxes(MessageId messageId);
void save(MailboxMessage mailboxMessage) throws MailboxNotFoundException, MailboxException;
diff --git a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java b/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
index 4cb76d0..9bd072b 100644
--- a/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
+++ b/mpt/impl/imap-mailbox/cassandra/src/test/java/org/apache/james/mpt/imapmailbox/cassandra/host/CassandraHostSystem.java
@@ -111,7 +111,7 @@ public class CassandraHostSystem extends JamesImapHostSystem {
ListeningCurrentQuotaUpdater quotaUpdater = new ListeningCurrentQuotaUpdater(currentQuotaManager, quotaRootResolver, eventBus, quotaManager);
QuotaComponents quotaComponents = new QuotaComponents(perUserMaxQuotaManager, quotaManager, quotaRootResolver);
- StoreMessageIdManager messageIdManager = new StoreMessageIdManager(storeRightManager, mapperFactory, eventBus, messageIdFactory, quotaManager, quotaRootResolver, PreDeletionHooks.NO_PRE_DELETION_HOOK);
+ StoreMessageIdManager messageIdManager = new StoreMessageIdManager(storeRightManager, mapperFactory, eventBus, quotaManager, quotaRootResolver, PreDeletionHooks.NO_PRE_DELETION_HOOK);
StoreAttachmentManager attachmentManager = new StoreAttachmentManager(mapperFactory, messageIdManager);
MessageSearchIndex index = new SimpleMessageSearchIndex(mapperFactory, mapperFactory, new DefaultTextExtractor(), attachmentManager);
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java
index 514e666..5de639c 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactory.java
@@ -137,7 +137,7 @@ public class MessageFastViewFactory implements MessageViewFactory<MessageFastVie
if (messageIds.isEmpty()) {
return Flux.empty();
}
- return messageIdManager.getMessagesReactive(messageIds, fetchGroup, mailboxSession)
+ return Flux.from(messageIdManager.getMessagesReactive(messageIds, fetchGroup, mailboxSession))
.onErrorResume(MailboxException.class, ex -> {
LOGGER.error("cannot read messages {}", messageIds, ex);
return Flux.empty();
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java
index ee61d4f..2752b79 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactory.java
@@ -88,7 +88,7 @@ public class MessageFullViewFactory implements MessageViewFactory<MessageFullVie
@Override
public Flux<MessageFullView> fromMessageIds(List<MessageId> messageIds, MailboxSession mailboxSession) {
- Flux<MessageResult> messages = messageIdManager.getMessagesReactive(messageIds, FetchGroup.FULL_CONTENT, mailboxSession);
+ Flux<MessageResult> messages = Flux.from(messageIdManager.getMessagesReactive(messageIds, FetchGroup.FULL_CONTENT, mailboxSession));
return Helpers.toMessageViews(messages, this::fromMessageResults);
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactory.java
index 89c6398..4d4df61 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactory.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactory.java
@@ -55,7 +55,7 @@ public class MessageHeaderViewFactory implements MessageViewFactory<MessageHeade
@Override
public Flux<MessageHeaderView> fromMessageIds(List<MessageId> messageIds, MailboxSession mailboxSession) {
- Flux<MessageResult> messages = messageIdManager.getMessagesReactive(messageIds, FetchGroup.HEADERS, mailboxSession);
+ Flux<MessageResult> messages = Flux.from(messageIdManager.getMessagesReactive(messageIds, FetchGroup.HEADERS, mailboxSession));
return Helpers.toMessageViews(messages, this::fromMessageResults);
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactory.java
index 3c9d2cf..15ce8ee 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactory.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactory.java
@@ -51,7 +51,7 @@ public class MessageMetadataViewFactory implements MessageViewFactory<MessageMet
@Override
public Flux<MessageMetadataView> fromMessageIds(List<MessageId> messageIds, MailboxSession session) {
- Flux<MessageResult> messages = messageIdManager.getMessagesReactive(messageIds, FetchGroup.MINIMAL, session);
+ Flux<MessageResult> messages = Flux.from(messageIdManager.getMessagesReactive(messageIds, FetchGroup.MINIMAL, session));
return Helpers.toMessageViews(messages, this::fromMessageResults);
}
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageViewFactory.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageViewFactory.java
index 76b8c9a..88b8efe 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageViewFactory.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/message/view/MessageViewFactory.java
@@ -133,8 +133,8 @@ public interface MessageViewFactory<T extends MessageView> {
static <T extends MessageView> Flux<T> toMessageViews(Flux<MessageResult> messageResults, FromMessageResult<T> converter) {
return messageResults
.groupBy(MessageResult::getMessageId)
- .filterWhen(Flux::hasElements)
.flatMap(Flux::collectList)
+ .filter(list -> !list.isEmpty())
.flatMap(toMessageViews(converter));
}
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactoryTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactoryTest.java
index 3c4d2de..8db9d4a 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactoryTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFastViewFactoryTest.java
@@ -27,7 +27,7 @@ import static org.apache.james.jmap.draft.model.message.view.MessageViewFixture.
import static org.apache.james.jmap.draft.model.message.view.MessageViewFixture.JACOB_EMAIL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import java.util.List;
@@ -64,6 +64,7 @@ import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
class MessageFastViewFactoryTest {
@@ -152,7 +153,7 @@ class MessageFastViewFactoryTest {
@Test
void fromMessageIdsShouldReturnAMessageWithComputedFastProperties() throws Exception {
- MessageFastView actual = messageFastViewFactory.fromMessageIds(ImmutableList.of(previewComputedMessage1.getMessageId()), session).get(0);
+ MessageFastView actual = messageFastViewFactory.fromMessageIds(ImmutableList.of(previewComputedMessage1.getMessageId()), session).collectList().block().get(0);
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(actual.getId()).isEqualTo(previewComputedMessage1.getMessageId());
softly.assertThat(actual.getMailboxIds()).containsExactly(bobInbox.getId());
@@ -177,7 +178,7 @@ class MessageFastViewFactoryTest {
@Test
void fromMessageIdsShouldReturnAMessageWithPropertiesComputedFromFullMessageWhenNotPreComputed() throws Exception {
- MessageFastView actual = messageFastViewFactory.fromMessageIds(ImmutableList.of(missingPreviewComputedMessage1.getMessageId()), session).get(0);
+ MessageFastView actual = messageFastViewFactory.fromMessageIds(ImmutableList.of(missingPreviewComputedMessage1.getMessageId()), session).collectList().block().get(0);
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(actual.getId()).isEqualTo(missingPreviewComputedMessage1.getMessageId());
softly.assertThat(actual.getMailboxIds()).containsExactly(bobInbox.getId());
@@ -208,7 +209,7 @@ class MessageFastViewFactoryTest {
previewComputedMessage3.getMessageId(),
missingPreviewComputedMessage1.getMessageId(),
previewComputedMessage1.getMessageId()),
- session);
+ session).collectList().block();
assertThat(actual)
.hasSize(4)
@@ -221,16 +222,18 @@ class MessageFastViewFactoryTest {
}
@Test
- void fromMessageIdsShouldKeepProcessingEvenWhenFetchingFail() throws Exception {
- doThrow(new MailboxException("mock exception"))
+ void fromMessageIdsShouldKeepProcessingEvenWhenFetchingFail() {
+ doReturn(Flux.error(new MailboxException("mock exception")))
.doCallRealMethod()
- .when(messageIdManager).getMessages(any(), any(), any());
+ .when(messageIdManager).getMessagesReactive(any(), any(), any());
List<MessageFastView> actual = messageFastViewFactory
.fromMessageIds(ImmutableList.of(
missingPreviewComputedMessage1.getMessageId(),
previewComputedMessage1.getMessageId()),
- session);
+ session)
+ .collectList().block();
+
assertThat(actual)
.hasSize(1)
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactoryTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactoryTest.java
index 02b67d2..67eeb92 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactoryTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageFullViewFactoryTest.java
@@ -132,7 +132,7 @@ class MessageFullViewFactoryTest {
@Test
void fromMessageResultsShouldReturnCorrectView() throws Exception {
- MessageFullView actual = messageFullViewFactory.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).get(0);
+ MessageFullView actual = messageFullViewFactory.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).collectList().block().get(0);
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(actual.getId()).isEqualTo(message1.getMessageId());
softly.assertThat(actual.getMailboxIds()).containsExactly(bobInbox.getId());
@@ -163,7 +163,7 @@ class MessageFullViewFactoryTest {
messageIdManager.setInMailboxes(message1.getMessageId(), ImmutableList.of(bobInbox.getId(), bobMailbox.getId()), session);
bobMailbox.setFlags(new Flags(Flags.Flag.FLAGGED), MessageManager.FlagsUpdateMode.REPLACE, MessageRange.all(), session);
- MessageFullView actual = messageFullViewFactory.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).get(0);
+ MessageFullView actual = messageFullViewFactory.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).collectList().block().get(0);
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(actual.getId()).isEqualTo(message1.getMessageId());
softly.assertThat(actual.getKeywords()).isEqualTo(Keywords.strictFactory().from(Keyword.SEEN, Keyword.FLAGGED).asMap());
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactoryTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactoryTest.java
index b10f4fc..9eda7bc 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactoryTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageHeaderViewFactoryTest.java
@@ -81,7 +81,7 @@ class MessageHeaderViewFactoryTest {
@Test
void fromMessageResultsShouldReturnCorrectView() throws Exception {
- MessageHeaderView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).get(0);
+ MessageHeaderView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).collectList().block().get(0);
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(actual.getId()).isEqualTo(message1.getMessageId());
softly.assertThat(actual.getMailboxIds()).containsExactly(bobInbox.getId());
@@ -106,7 +106,7 @@ class MessageHeaderViewFactoryTest {
messageIdManager.setInMailboxes(message1.getMessageId(), ImmutableList.of(bobInbox.getId(), bobMailbox.getId()), session);
bobMailbox.setFlags(new Flags(Flags.Flag.FLAGGED), MessageManager.FlagsUpdateMode.REPLACE, MessageRange.all(), session);
- MessageHeaderView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).get(0);
+ MessageHeaderView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).collectList().block().get(0);
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(actual.getId()).isEqualTo(message1.getMessageId());
softly.assertThat(actual.getKeywords()).isEqualTo(Keywords.strictFactory().from(Keyword.SEEN, Keyword.FLAGGED).asMap());
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactoryTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactoryTest.java
index 5297c72..19651ec 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactoryTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/model/message/view/MessageMetadataViewFactoryTest.java
@@ -73,7 +73,7 @@ class MessageMetadataViewFactoryTest {
@Test
void fromMessageResultsShouldReturnCorrectView() throws Exception {
- MessageMetadataView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).get(0);
+ MessageMetadataView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).collectList().block().get(0);
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(actual.getId()).isEqualTo(message1.getMessageId());
softly.assertThat(actual.getMailboxIds()).containsExactly(bobInbox.getId());
@@ -89,7 +89,7 @@ class MessageMetadataViewFactoryTest {
messageIdManager.setInMailboxes(message1.getMessageId(), ImmutableList.of(bobInbox.getId(), bobMailbox.getId()), session);
bobMailbox.setFlags(new Flags(Flags.Flag.FLAGGED), MessageManager.FlagsUpdateMode.REPLACE, MessageRange.all(), session);
- MessageMetadataView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).get(0);
+ MessageMetadataView actual = testee.fromMessageIds(ImmutableList.of(message1.getMessageId()), session).collectList().block().get(0);
SoftAssertions.assertSoftly(softly -> {
softly.assertThat(actual.getId()).isEqualTo(message1.getMessageId());
softly.assertThat(actual.getKeywords()).isEqualTo(Keywords.strictFactory().from(Keyword.SEEN, Keyword.FLAGGED).asMap());
diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListenerTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListenerTest.java
index 98794c1..d6affb2 100644
--- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListenerTest.java
+++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/event/ComputeMessageFastViewProjectionListenerTest.java
@@ -21,6 +21,7 @@ package org.apache.james.jmap.event;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
@@ -65,6 +66,7 @@ import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
class ComputeMessageFastViewProjectionListenerTest {
@@ -266,9 +268,8 @@ class ComputeMessageFastViewProjectionListenerTest {
@Test
void shouldStoreEventInDeadLettersWhenGetMessagesException() throws Exception {
- doThrow(new MailboxException())
- .when(messageIdManager)
- .getMessages(any(), any(), any());
+ doReturn(Flux.error(new MailboxException("mock exception")))
+ .when(messageIdManager).getMessagesReactive(any(), any(), any());
inboxMessageManager.appendMessage(
MessageManager.AppendCommand.builder()
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org