You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/06/02 04:54:25 UTC

[james-project] 04/04: [PERFORMANCE] MailboxChangeListener should be fully reactive

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 119c9f34a6991c9c0db48245de89e53d8f6d0a55
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun May 30 20:08:31 2021 +0700

    [PERFORMANCE] MailboxChangeListener should be fully reactive
---
 .../org/apache/james/mailbox/MessageIdManager.java |  2 ++
 .../james/mailbox/store/StoreMessageIdManager.java | 32 +++++++++---------
 .../apache/james/jmap/api/change/EmailChange.java  | 39 +++++++++++-----------
 .../james/jmap/change/MailboxChangeListener.scala  |  5 +--
 4 files changed, 39 insertions(+), 39 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 4bb6060..8d42623 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -50,6 +50,8 @@ public interface MessageIdManager {
 
     Set<MessageId> accessibleMessages(Collection<MessageId> messageIds, final MailboxSession mailboxSession) throws MailboxException;
 
+    Publisher<Set<MessageId>> accessibleMessagesReactive(Collection<MessageId> messageIds, final MailboxSession mailboxSession);
+
     void setFlags(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
 
     Publisher<Void> setFlagsReactive(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 47d44a0..71f7ab4 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -144,20 +144,22 @@ public class StoreMessageIdManager implements MessageIdManager {
     }
 
     @Override
-    public Set<MessageId> accessibleMessages(Collection<MessageId> messageIds, MailboxSession mailboxSession) throws MailboxException {
+    public Set<MessageId> accessibleMessages(Collection<MessageId> messageIds, MailboxSession mailboxSession) {
+        return accessibleMessagesReactive(messageIds, mailboxSession).block();
+    }
+
+    @Override
+    public Mono<Set<MessageId>> accessibleMessagesReactive(Collection<MessageId> messageIds, MailboxSession mailboxSession) {
         MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
-        ImmutableList<ComposedMessageIdWithMetaData> idList = Flux.fromIterable(messageIds)
+        return Flux.fromIterable(messageIds)
             .flatMap(messageIdMapper::findMetadata, DEFAULT_CONCURRENCY)
             .collect(Guavate.toImmutableList())
-            .block();
-
-        ImmutableSet<MailboxId> allowedMailboxIds = getAllowedMailboxIds(mailboxSession, idList.stream()
-            .map(id -> id.getComposedMessageId().getMailboxId()), Right.Read);
-
-        return idList.stream()
-            .filter(id -> allowedMailboxIds.contains(id.getComposedMessageId().getMailboxId()))
-            .map(id -> id.getComposedMessageId().getMessageId())
-            .collect(Guavate.toImmutableSet());
+            .flatMap(idList -> getAllowedMailboxIds(mailboxSession, idList.stream()
+                .map(id -> id.getComposedMessageId().getMailboxId()), Right.Read)
+                .map(allowedMailboxIds -> idList.stream()
+                    .filter(id -> allowedMailboxIds.contains(id.getComposedMessageId().getMailboxId()))
+                    .map(id -> id.getComposedMessageId().getMessageId())
+                    .collect(Guavate.toImmutableSet())));
     }
 
     @Override
@@ -190,11 +192,7 @@ public class StoreMessageIdManager implements MessageIdManager {
             .flatMap(Function.identity(), DEFAULT_CONCURRENCY);
     }
 
-    private ImmutableSet<MailboxId> getAllowedMailboxIds(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) throws MailboxException {
-        return MailboxReactorUtils.block(getAllowedMailboxIdsReactive(mailboxSession, idList, rights));
-    }
-
-    private Mono<ImmutableSet<MailboxId>> getAllowedMailboxIdsReactive(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) {
+    private Mono<ImmutableSet<MailboxId>> getAllowedMailboxIds(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) {
         return Flux.fromStream(idList)
             .distinct()
             .filterWhen(hasRightsOnMailboxReactive(mailboxSession, rights), DEFAULT_CONCURRENCY)
@@ -229,7 +227,7 @@ public class StoreMessageIdManager implements MessageIdManager {
         return messageIdMapper.findReactive(messageIds, MessageMapper.FetchType.Metadata)
             .collectList()
             .flatMap(messageList ->
-                getAllowedMailboxIdsReactive(mailboxSession,
+                getAllowedMailboxIds(mailboxSession,
                     messageList
                         .stream()
                         .map(MailboxMessage::getMailboxId),
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
index 6210da1..00e2733 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
@@ -23,7 +23,6 @@ import java.time.ZonedDateTime;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
@@ -35,10 +34,8 @@ import org.apache.james.mailbox.SessionProvider;
 import org.apache.james.mailbox.events.MailboxEvents.Added;
 import org.apache.james.mailbox.events.MailboxEvents.Expunged;
 import org.apache.james.mailbox.events.MailboxEvents.FlagsUpdated;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MessageId;
 
-import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
@@ -46,6 +43,9 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class EmailChange implements JmapChange {
     public static class Builder {
         @FunctionalInterface
@@ -185,28 +185,27 @@ public class EmailChange implements JmapChange {
                 .collect(Guavate.toImmutableList());
         }
 
-        public List<JmapChange> fromExpunged(Expunged expunged, ZonedDateTime now, List<Username> sharees) throws MailboxException {
+        public Flux<JmapChange> fromExpunged(Expunged expunged, ZonedDateTime now, List<Username> sharees) {
 
-            EmailChange ownerChange = fromExpunged(expunged, now, expunged.getUsername());
+            Mono<EmailChange> ownerChange = fromExpunged(expunged, now, expunged.getUsername());
 
-            Stream<EmailChange> shareeChanges = sharees.stream()
-                .map(Throwing.<Username, EmailChange>function(shareeId -> fromExpunged(expunged, now, shareeId)).sneakyThrow());
+            Flux<EmailChange> shareeChanges = Flux.fromIterable(sharees)
+                .flatMap(shareeId -> fromExpunged(expunged, now, shareeId));
 
-            return Stream.concat(Stream.of(ownerChange), shareeChanges)
-                .collect(Guavate.toImmutableList());
+            return Flux.concat(ownerChange, shareeChanges);
         }
 
-        private EmailChange fromExpunged(Expunged expunged, ZonedDateTime now, Username username) throws MailboxException {
-            Set<MessageId> accessibleMessageIds = messageIdManager.accessibleMessages(expunged.getMessageIds(), sessionProvider.createSystemSession(username));
-
-            return EmailChange.builder()
-                .accountId(AccountId.fromUsername(username))
-                .state(stateFactory.generate())
-                .date(now)
-                .isDelegated(false)
-                .updated(Sets.intersection(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
-                .destroyed(Sets.difference(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
-                .build();
+        private Mono<EmailChange> fromExpunged(Expunged expunged, ZonedDateTime now, Username username) {
+            return Mono.from(messageIdManager.accessibleMessagesReactive(expunged.getMessageIds(),
+                sessionProvider.createSystemSession(username)))
+                .map(accessibleMessageIds -> EmailChange.builder()
+                    .accountId(AccountId.fromUsername(username))
+                    .state(stateFactory.generate())
+                    .date(now)
+                    .isDelegated(false)
+                    .updated(Sets.intersection(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
+                    .destroyed(Sets.difference(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
+                    .build());
         }
     }
 
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
index b4c6aa9..e3cb5b9 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
@@ -90,8 +90,9 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
           .concat(emailChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees.asJava).asScala))
       case expunged: Expunged =>
         getSharees(mailboxId, username)
-          .flatMapIterable(sharees => mailboxChangeFactory.fromExpunged(expunged, now, sharees.asJava).asScala
-          .concat(emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava).asScala))
+          .flatMapMany(sharees => SFlux.concat(
+            SFlux.fromIterable(mailboxChangeFactory.fromExpunged(expunged, now, sharees.asJava).asScala),
+            emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava)))
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org