You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/08 00:40:04 UTC

[james-project] 04/08: JAMES-3575 Rewrite StoreMessageIdManager::setFlags in a more reactive way

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d4647112f6e71186f8f039c95e94d3425cfc4962
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat May 1 22:34:04 2021 +0700

    JAMES-3575 Rewrite StoreMessageIdManager::setFlags in a more reactive way
---
 .../james/mailbox/store/StoreMessageIdManager.java | 32 ++++++++++------------
 1 file changed, 14 insertions(+), 18 deletions(-)

diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 6ab1ffc..5031573 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -82,7 +82,6 @@ import com.github.fge.lambdas.functions.ThrowingFunction;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
 import reactor.core.publisher.Flux;
@@ -125,20 +124,19 @@ public class StoreMessageIdManager implements MessageIdManager {
         MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
 
         int concurrency = 4;
-        List<Mailbox> targetMailboxes = Flux.fromIterable(mailboxIds)
+
+        MailboxReactorUtils.block(Flux.fromIterable(mailboxIds)
             .flatMap(mailboxMapper::findMailboxById, concurrency)
             .collect(Guavate.toImmutableList())
-            .subscribeOn(Schedulers.elastic())
-            .block();
-
-        assertRightsOnMailboxes(targetMailboxes, mailboxSession, Right.Write);
+            .flatMap(Throwing.<List<Mailbox>, Mono<Void>>function(targetMailboxes -> {
+                assertRightsOnMailboxes(targetMailboxes, mailboxSession, Right.Write);
 
-        Multimap<MailboxId, UpdatedFlags> updatedFlags = messageIdMapper.setFlags(messageId, mailboxIds, newState, replace)
-            .subscribeOn(Schedulers.elastic())
-            .block();
-        for (Map.Entry<MailboxId, Collection<UpdatedFlags>> entry : updatedFlags.asMap().entrySet()) {
-            dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes);
-        }
+                return messageIdMapper.setFlags(messageId, mailboxIds, newState, replace)
+                    .flatMapIterable(updatedFlags -> updatedFlags.asMap().entrySet())
+                    .concatMap(entry -> dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes))
+                    .then();
+            }).sneakyThrow())
+            .subscribeOn(Schedulers.elastic()));
     }
 
     @Override
@@ -389,8 +387,7 @@ public class StoreMessageIdManager implements MessageIdManager {
                 .then());
     }
     
-    private void dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags,
-                                     List<Mailbox> knownMailboxes) throws MailboxException {
+    private Mono<Void> dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags, List<Mailbox> knownMailboxes) {
         if (updatedFlags.stream().anyMatch(UpdatedFlags::flagsChanged)) {
             Mailbox mailbox = knownMailboxes.stream()
                 .filter(knownMailbox -> knownMailbox.getMailboxId().equals(mailboxId))
@@ -399,16 +396,15 @@ public class StoreMessageIdManager implements MessageIdManager {
                         .findMailboxById(mailboxId)
                         .subscribeOn(Schedulers.elastic())))
                     .sneakyThrow());
-            eventBus.dispatch(EventFactory.flagsUpdated()
+            return eventBus.dispatch(EventFactory.flagsUpdated()
                         .randomEventId()
                         .mailboxSession(mailboxSession)
                         .mailbox(mailbox)
                         .updatedFlags(updatedFlags)
                         .build(),
-                    new MailboxIdRegistrationKey(mailboxId))
-                .subscribeOn(Schedulers.elastic())
-                .block();
+                    new MailboxIdRegistrationKey(mailboxId));
         }
+        return Mono.empty();
     }
 
     private void validateQuota(MessageMovesWithMailbox messageMoves, MailboxMessage mailboxMessage) throws MailboxException {

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org