You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/05/17 02:21:51 UTC

[james-project] 08/11: [REFACTORING] StoreMessageIdManager::setInMailboxes should not block

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 07250a7684737e2d6b8972a6a3e51ff3ba0d5b58
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu May 13 16:30:59 2021 +0700

    [REFACTORING] StoreMessageIdManager::setInMailboxes should not block
---
 .../james/mailbox/store/StoreMessageIdManager.java | 82 ++++++++++++----------
 1 file changed, 43 insertions(+), 39 deletions(-)

diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index b1ac262..11ca8ff 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -39,9 +39,7 @@ import org.apache.james.events.EventBus;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageIdManager;
 import org.apache.james.mailbox.MessageManager;
-import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.MetadataWithMailboxId;
-import org.apache.james.mailbox.ModSeq;
 import org.apache.james.mailbox.RightManager;
 import org.apache.james.mailbox.events.MailboxIdRegistrationKey;
 import org.apache.james.mailbox.exception.MailboxException;
@@ -56,6 +54,7 @@ import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.model.MailboxACL.Right;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.MessageMetaData;
 import org.apache.james.mailbox.model.MessageMoves;
 import org.apache.james.mailbox.model.MessageResult;
 import org.apache.james.mailbox.model.QuotaRoot;
@@ -359,8 +358,7 @@ public class StoreMessageIdManager implements MessageIdManager {
             .collect(Guavate.toImmutableList());
 
         return validateQuota(messageMoves, mailboxMessage.get())
-            .then(Mono.fromRunnable(Throwing.runnable(() ->
-                addMessageToMailboxes(mailboxMessage.get(), messageMoves.addedMailboxes(), mailboxSession)).sneakyThrow()))
+            .then(addMessageToMailboxes(mailboxMessage.get(), messageMoves.addedMailboxes(), mailboxSession))
             .then(removeMessageFromMailboxes(mailboxMessage.get().getMessageId(), messagesToRemove, mailboxSession))
             .then(eventBus.dispatch(EventFactory.moved()
                     .session(mailboxSession)
@@ -456,45 +454,51 @@ public class StoreMessageIdManager implements MessageIdManager {
         }
     }
 
-    private void addMessageToMailboxes(MailboxMessage mailboxMessage, Set<Mailbox> mailboxes, MailboxSession mailboxSession) throws MailboxException {
+    private Mono<Void> addMessageToMailboxes(MailboxMessage mailboxMessage, Set<Mailbox> mailboxes, MailboxSession mailboxSession) {
         MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
 
-        for (Mailbox mailbox : mailboxes) {
-            MailboxACL.Rfc4314Rights myRights = rightManager.myRights(mailbox, mailboxSession);
-            boolean shouldPreserveFlags = myRights.contains(Right.Write);
-            SimpleMailboxMessage copy =
-                SimpleMailboxMessage.from(mailboxMessage)
-                    .mailboxId(mailbox.getMailboxId())
-                    .flags(
-                        FlagsFactory
-                            .builder()
-                            .flags(mailboxMessage.createFlags())
-                            .filteringFlags(
-                                FlagsFilter.builder()
-                                    .systemFlagFilter(f -> shouldPreserveFlags)
-                                    .userFlagFilter(f -> shouldPreserveFlags)
-                                    .build())
-                            .build())
-                    .build();
-            save(messageIdMapper, copy, mailbox);
-
-            eventBus.dispatch(EventFactory.added()
-                    .randomEventId()
-                    .mailboxSession(mailboxSession)
-                    .mailbox(mailbox)
-                    .addMetaData(copy.metaData())
-                    .build(),
-                    new MailboxIdRegistrationKey(mailbox.getMailboxId()))
-                .block();
-        }
+        return Flux.fromIterable(mailboxes)
+            .flatMap(Throwing.<Mailbox, Mono<Void>>function(mailbox -> {
+                MailboxACL.Rfc4314Rights myRights = rightManager.myRights(mailbox, mailboxSession);
+                boolean shouldPreserveFlags = myRights.contains(Right.Write);
+                SimpleMailboxMessage copy =
+                    SimpleMailboxMessage.from(mailboxMessage)
+                        .mailboxId(mailbox.getMailboxId())
+                        .flags(
+                            FlagsFactory
+                                .builder()
+                                .flags(mailboxMessage.createFlags())
+                                .filteringFlags(
+                                    FlagsFilter.builder()
+                                        .systemFlagFilter(f -> shouldPreserveFlags)
+                                        .userFlagFilter(f -> shouldPreserveFlags)
+                                        .build())
+                                .build())
+                        .build();
+
+                return save(messageIdMapper, copy, mailbox)
+                    .flatMap(metadata -> eventBus.dispatch(EventFactory.added()
+                            .randomEventId()
+                            .mailboxSession(mailboxSession)
+                            .mailbox(mailbox)
+                            .addMetaData(metadata)
+                            .build(),
+                        new MailboxIdRegistrationKey(mailbox.getMailboxId())));
+            }).sneakyThrow())
+            .then();
     }
 
-    private void save(MessageIdMapper messageIdMapper, MailboxMessage mailboxMessage, Mailbox mailbox) throws MailboxException {
-        ModSeq modSeq = mailboxSessionMapperFactory.getModSeqProvider().nextModSeq(mailbox.getMailboxId());
-        MessageUid uid = mailboxSessionMapperFactory.getUidProvider().nextUid(mailbox.getMailboxId());
-        mailboxMessage.setModSeq(modSeq);
-        mailboxMessage.setUid(uid);
-        messageIdMapper.copyInMailbox(mailboxMessage, mailbox);
+    private Mono<MessageMetaData> save(MessageIdMapper messageIdMapper, MailboxMessage mailboxMessage, Mailbox mailbox) {
+        return Mono.zip(
+                mailboxSessionMapperFactory.getModSeqProvider().nextModSeqReactive(mailbox.getMailboxId()),
+                mailboxSessionMapperFactory.getUidProvider().nextUidReactive(mailbox.getMailboxId()))
+            .flatMap(modSeqAndUid -> {
+                mailboxMessage.setModSeq(modSeqAndUid.getT1());
+                mailboxMessage.setUid(modSeqAndUid.getT2());
+
+                return messageIdMapper.copyInMailboxReactive(mailboxMessage, mailbox)
+                    .thenReturn(mailboxMessage.metaData());
+            });
     }
 
     private ThrowingFunction<MailboxMessage, MessageResult> messageResultConverter(FetchGroup fetchGroup) {

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org