You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/02 03:15:59 UTC

[james-project] 02/29: MAILBOX-396 Windowing IMAP message reads

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 683df7b9bfd43afe2b51ce19ef200ca6fccbb924
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Feb 10 10:55:52 2020 +0700

    MAILBOX-396 Windowing IMAP message reads
    
    This can prevent us from putting undue pressure on Cassandra upon IMAP
    large range message operations
---
 .../james/mailbox/cassandra/mail/CassandraMessageMapper.java       | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 7fc68f1..6f4b7df 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -172,16 +172,17 @@ public class CassandraMessageMapper implements MessageMapper {
     public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return retrieveMessageIds(mailboxId, messageRange)
-            .flatMapMany(ids -> retrieveMessages(ids, ftype, Limit.from(max)))
+            .flatMap(ids -> retrieveMessages(ids, ftype, Limit.from(max)))
             .map(MailboxMessage.class::cast)
             .sort(Comparator.comparing(MailboxMessage::getUid))
             .toIterable()
             .iterator();
     }
 
-    private Mono<List<ComposedMessageIdWithMetaData>> retrieveMessageIds(CassandraId mailboxId, MessageRange messageRange) {
+    private Flux<List<ComposedMessageIdWithMetaData>> retrieveMessageIds(CassandraId mailboxId, MessageRange messageRange) {
         return messageIdDAO.retrieveMessages(mailboxId, messageRange)
-            .collect(Guavate.toImmutableList());
+            .window(cassandraConfiguration.getMessageReadChunkSize())
+            .flatMap(flux -> flux.collect(Guavate.toImmutableList()));
     }
 
     private Flux<MailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org