You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/02 03:16:01 UTC
[james-project] 04/29: MAILBOX-396 Reduce query amount upon listing
message
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2e92b18c914ce58350240585e758c5e3237f773e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Feb 26 09:29:18 2020 +0700
MAILBOX-396 Reduce query amount upon listing message
This can be achieved via a "groupBy" before keepMessageIfMailboxExists to
only do one read per mailbox instead of one read per message.
---
.../cassandra/mail/CassandraMessageIdMapper.java | 20 +++++++++++---------
1 file changed, 11 insertions(+), 9 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 3a206c3..750b6bd 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -44,7 +44,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
import org.apache.james.mailbox.store.mail.ModSeqProvider;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.util.FunctionalUtils;
-import org.apache.james.util.ReactorUtils;
import org.apache.james.util.streams.Limit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +52,7 @@ import com.github.steveash.guavate.Guavate;
import com.google.common.collect.Multimap;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -95,20 +95,22 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
.filter(CassandraMessageDAO.MessageResult::isFound)
.map(CassandraMessageDAO.MessageResult::message)
.flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType))
+ .groupBy(MailboxMessage::getMailboxId)
.flatMap(this::keepMessageIfMailboxExists)
.collectSortedList(Comparator.comparing(MailboxMessage::getUid))
.block();
}
- private Mono<MailboxMessage> keepMessageIfMailboxExists(MailboxMessage message) {
- CassandraId cassandraId = (CassandraId) message.getMailboxId();
+ private Flux<MailboxMessage> keepMessageIfMailboxExists(GroupedFlux<MailboxId, MailboxMessage> groupedFlux) {
+ CassandraId cassandraId = (CassandraId) groupedFlux.key();
return mailboxDAO.retrieveMailbox(cassandraId)
- .map(any -> message)
- .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> {
- LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.",
- cassandraId,
- message.getMailboxId());
- }));
+ .flatMapMany(any -> groupedFlux)
+ .switchIfEmpty(groupedFlux.map(message -> {
+ LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.",
+ cassandraId.serialize(),
+ message.getMessageId().serialize());
+ return message;
+ }).then(Mono.empty()));
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org