You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/17 15:30:44 UTC

[james-project] 10/16: [REFACTORING] Avoid intermediate collect when retrieving messages using JMAP

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 48a698e6afc12cce27ba0a6eb7d31cd3df9f5cf0
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Apr 12 10:05:37 2020 +0700

    [REFACTORING] Avoid intermediate collect when retrieving messages using JMAP
---
 .../apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java  | 8 ++++++--
 .../james/mailbox/cassandra/mail/CassandraMessageIdMapper.java    | 6 ++----
 .../james/mailbox/cassandra/mail/CassandraMessageMapper.java      | 3 +--
 3 files changed, 9 insertions(+), 8 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 621a685..d861ab9 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -237,11 +237,15 @@ public class CassandraMessageDAO {
             .collect(Guavate.toImmutableList());
     }
 
+    public Mono<MessageResult> retrieveMessage(ComposedMessageIdWithMetaData id, FetchType fetchType) {
+        return retrieveRow(id, fetchType)
+                .flatMap(resultSet -> message(resultSet, id, fetchType));
+    }
+
     public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
         return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct()))
             .publishOn(Schedulers.elastic())
-            .flatMap(id -> retrieveRow(id, fetchType)
-                .flatMap(resultSet -> message(resultSet, id, fetchType)), configuration.getMessageReadChunkSize());
+            .flatMap(id -> retrieveMessage(id, fetchType), configuration.getMessageReadChunkSize());
     }
 
     private Mono<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 107e37b..8e07fda 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -45,7 +45,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.ModSeqProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.util.FunctionalUtils;
-import org.apache.james.util.streams.Limit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,11 +95,10 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
         return Flux.fromStream(messageIds.stream())
             .publishOn(Schedulers.elastic())
             .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize())
-            .collectList()
-            .flatMapMany(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
+            .flatMap(composedMessageId -> messageDAO.retrieveMessage(composedMessageId, fetchType), cassandraConfiguration.getMessageReadChunkSize())
             .filter(CassandraMessageDAO.MessageResult::isFound)
             .map(CassandraMessageDAO.MessageResult::message)
-            .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType))
+            .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType), cassandraConfiguration.getMessageReadChunkSize())
             .groupBy(MailboxMessage::getMailboxId)
             .flatMap(this::keepMessageIfMailboxExists)
             .collectSortedList(Comparator.comparing(MailboxMessage::getUid))
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 7faa9cc..7f7a0b7 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -220,8 +220,7 @@ public class CassandraMessageMapper implements MessageMapper {
     private Flux<SimpleMailboxMessage> expungeOne(CassandraId mailboxId, MessageUid messageUid) {
         return retrieveComposedId(mailboxId, messageUid)
             .flatMap(idWithMetadata -> deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata))
-            .flatMapMany(idWithMetadata ->
-                messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited()))
+            .flatMapMany(idWithMetadata -> messageDAO.retrieveMessage(idWithMetadata, FetchType.Metadata))
             .filter(CassandraMessageDAO.MessageResult::isFound)
             .map(CassandraMessageDAO.MessageResult::message)
             .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()));


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org