You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/02 03:15:58 UTC

[james-project] 01/29: MAILBOX-396 Some reactor improvement in CassandraMessageMapper

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9a1d36b71c1b6af7cc3a71b7635c6f32f5f7e5f7
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Feb 7 14:14:58 2020 +0700

    MAILBOX-396 Some reactor improvement in CassandraMessageMapper
    
    Avoid calling several time 'block' upon `add`, `setInMailboxes`, `findInMailbox`
---
 .../cassandra/mail/CassandraMessageMapper.java     | 44 ++++++++++++----------
 1 file changed, 25 insertions(+), 19 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 695c38f..7fc68f1 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -171,17 +171,17 @@ public class CassandraMessageMapper implements MessageMapper {
     @Override
     public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        return retrieveMessages(retrieveMessageIds(mailboxId, messageRange), ftype, Limit.from(max))
+        return retrieveMessageIds(mailboxId, messageRange)
+            .flatMapMany(ids -> retrieveMessages(ids, ftype, Limit.from(max)))
             .map(MailboxMessage.class::cast)
             .sort(Comparator.comparing(MailboxMessage::getUid))
             .toIterable()
             .iterator();
     }
 
-    private List<ComposedMessageIdWithMetaData> retrieveMessageIds(CassandraId mailboxId, MessageRange messageRange) {
+    private Mono<List<ComposedMessageIdWithMetaData>> retrieveMessageIds(CassandraId mailboxId, MessageRange messageRange) {
         return messageIdDAO.retrieveMessages(mailboxId, messageRange)
-            .collect(Guavate.toImmutableList())
-            .block();
+            .collect(Guavate.toImmutableList());
     }
 
     private Flux<MailboxMessage> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
@@ -267,13 +267,15 @@ public class CassandraMessageMapper implements MessageMapper {
     public MessageMetaData add(Mailbox mailbox, MailboxMessage message) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
-        save(mailbox, addUidAndModseq(message, mailboxId))
-            .thenEmpty(indexTableHandler.updateIndexOnAdd(message, mailboxId))
-            .block();
-        return message.metaData();
+        return block(addUidAndModseq(message, mailboxId)
+            .flatMap(Throwing.function(messageWithUidAndModSeq -> save(mailbox, messageWithUidAndModSeq)
+                .thenReturn(messageWithUidAndModSeq)))
+            .flatMap(messageWithUidAndModSeq -> indexTableHandler.updateIndexOnAdd(message, mailboxId)
+                .thenReturn(messageWithUidAndModSeq))
+            .map(MailboxMessage::metaData));
     }
 
-    private MailboxMessage addUidAndModseq(MailboxMessage message, CassandraId mailboxId) throws MailboxException {
+    private Mono<MailboxMessage> addUidAndModseq(MailboxMessage message, CassandraId mailboxId) {
         Mono<MessageUid> messageUidMono = uidProvider
             .nextUid(mailboxId)
             .switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a UID to save " + message.getMessageId() + " in " + mailboxId)));
@@ -281,20 +283,23 @@ public class CassandraMessageMapper implements MessageMapper {
         Mono<ModSeq> nextModSeqMono = modSeqProvider.nextModSeq(mailboxId)
             .switchIfEmpty(Mono.error(() -> new MailboxException("Can not find a MODSEQ to save " + message.getMessageId() + " in " + mailboxId)));
 
-        try {
-            Mono.zip(messageUidMono, nextModSeqMono)
+        return Mono.zip(messageUidMono, nextModSeqMono)
                 .doOnNext(tuple -> {
                     message.setUid(tuple.getT1());
                     message.setModSeq(tuple.getT2());
                 })
-                .block();
+                .thenReturn(message);
+    }
+
+    private <T> T block(Mono<T> mono) throws MailboxException {
+        try {
+            return mono.block();
         } catch (RuntimeException e) {
             if (e.getCause() instanceof MailboxException) {
-                throw (MailboxException)e.getCause();
+                throw (MailboxException) e.getCause();
             }
             throw e;
         }
-        return message;
     }
 
     @Override
@@ -384,11 +389,12 @@ public class CassandraMessageMapper implements MessageMapper {
 
     private MessageMetaData setInMailbox(Mailbox mailbox, MailboxMessage message) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-
-        insertIds(addUidAndModseq(message, mailboxId), mailboxId)
-                .thenEmpty(indexTableHandler.updateIndexOnAdd(message, mailboxId))
-                .block();
-        return message.metaData();
+        return block(addUidAndModseq(message, mailboxId)
+            .flatMap(messageWithUidAndModseq -> insertIds(messageWithUidAndModseq, mailboxId)
+                .thenReturn(messageWithUidAndModseq))
+            .flatMap(messageWithUidAndModseq -> indexTableHandler.updateIndexOnAdd(message, mailboxId)
+                .thenReturn(messageWithUidAndModseq))
+            .map(MailboxMessage::metaData));
     }
 
     private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org