You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/02 03:16:01 UTC

[james-project] 04/29: MAILBOX-396 Reduce query amount upon listing message

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2e92b18c914ce58350240585e758c5e3237f773e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Feb 26 09:29:18 2020 +0700

    MAILBOX-396 Reduce query amount upon listing message
    
    This can be achieved via a "groupBy" before keepMessageIfMailboxExists to
    only do one read per mailbox instead of one read per message.
---
 .../cassandra/mail/CassandraMessageIdMapper.java     | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 3a206c3..750b6bd 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -44,7 +44,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.ModSeqProvider;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.util.FunctionalUtils;
-import org.apache.james.util.ReactorUtils;
 import org.apache.james.util.streams.Limit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,6 +52,7 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.Multimap;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.GroupedFlux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -95,20 +95,22 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
             .filter(CassandraMessageDAO.MessageResult::isFound)
             .map(CassandraMessageDAO.MessageResult::message)
             .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType))
+            .groupBy(MailboxMessage::getMailboxId)
             .flatMap(this::keepMessageIfMailboxExists)
             .collectSortedList(Comparator.comparing(MailboxMessage::getUid))
             .block();
     }
 
-    private Mono<MailboxMessage> keepMessageIfMailboxExists(MailboxMessage message) {
-        CassandraId cassandraId = (CassandraId) message.getMailboxId();
+    private Flux<MailboxMessage> keepMessageIfMailboxExists(GroupedFlux<MailboxId, MailboxMessage> groupedFlux) {
+        CassandraId cassandraId = (CassandraId) groupedFlux.key();
         return mailboxDAO.retrieveMailbox(cassandraId)
-            .map(any -> message)
-            .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> {
-                    LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.",
-                        cassandraId,
-                        message.getMailboxId());
-                }));
+            .flatMapMany(any -> groupedFlux)
+            .switchIfEmpty(groupedFlux.map(message -> {
+                LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.",
+                    cassandraId.serialize(),
+                    message.getMessageId().serialize());
+                return message;
+            }).then(Mono.empty()));
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org