You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/17 15:30:44 UTC
[james-project] 10/16: [REFACTORING] Avoid intermediate collect
when retrieving messages using JMAP
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 48a698e6afc12cce27ba0a6eb7d31cd3df9f5cf0
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Apr 12 10:05:37 2020 +0700
[REFACTORING] Avoid intermediate collect when retrieving messages using JMAP
---
.../apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java | 8 ++++++--
.../james/mailbox/cassandra/mail/CassandraMessageIdMapper.java | 6 ++----
.../james/mailbox/cassandra/mail/CassandraMessageMapper.java | 3 +--
3 files changed, 9 insertions(+), 8 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 621a685..d861ab9 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -237,11 +237,15 @@ public class CassandraMessageDAO {
.collect(Guavate.toImmutableList());
}
+ public Mono<MessageResult> retrieveMessage(ComposedMessageIdWithMetaData id, FetchType fetchType) {
+ return retrieveRow(id, fetchType)
+ .flatMap(resultSet -> message(resultSet, id, fetchType));
+ }
+
public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct()))
.publishOn(Schedulers.elastic())
- .flatMap(id -> retrieveRow(id, fetchType)
- .flatMap(resultSet -> message(resultSet, id, fetchType)), configuration.getMessageReadChunkSize());
+ .flatMap(id -> retrieveMessage(id, fetchType), configuration.getMessageReadChunkSize());
}
private Mono<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 107e37b..8e07fda 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -45,7 +45,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
import org.apache.james.mailbox.store.mail.ModSeqProvider;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.util.FunctionalUtils;
-import org.apache.james.util.streams.Limit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,11 +95,10 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
return Flux.fromStream(messageIds.stream())
.publishOn(Schedulers.elastic())
.flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize())
- .collectList()
- .flatMapMany(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
+ .flatMap(composedMessageId -> messageDAO.retrieveMessage(composedMessageId, fetchType), cassandraConfiguration.getMessageReadChunkSize())
.filter(CassandraMessageDAO.MessageResult::isFound)
.map(CassandraMessageDAO.MessageResult::message)
- .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType))
+ .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType), cassandraConfiguration.getMessageReadChunkSize())
.groupBy(MailboxMessage::getMailboxId)
.flatMap(this::keepMessageIfMailboxExists)
.collectSortedList(Comparator.comparing(MailboxMessage::getUid))
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 7faa9cc..7f7a0b7 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -220,8 +220,7 @@ public class CassandraMessageMapper implements MessageMapper {
private Flux<SimpleMailboxMessage> expungeOne(CassandraId mailboxId, MessageUid messageUid) {
return retrieveComposedId(mailboxId, messageUid)
.flatMap(idWithMetadata -> deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata))
- .flatMapMany(idWithMetadata ->
- messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited()))
+ .flatMapMany(idWithMetadata -> messageDAO.retrieveMessage(idWithMetadata, FetchType.Metadata))
.filter(CassandraMessageDAO.MessageResult::isFound)
.map(CassandraMessageDAO.MessageResult::message)
.map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()));
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org