You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/06/02 04:54:25 UTC
[james-project] 04/04: [PERFORMANCE] MailboxChangeListener should
be fully reactive
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 119c9f34a6991c9c0db48245de89e53d8f6d0a55
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun May 30 20:08:31 2021 +0700
[PERFORMANCE] MailboxChangeListener should be fully reactive
---
.../org/apache/james/mailbox/MessageIdManager.java | 2 ++
.../james/mailbox/store/StoreMessageIdManager.java | 32 +++++++++---------
.../apache/james/jmap/api/change/EmailChange.java | 39 +++++++++++-----------
.../james/jmap/change/MailboxChangeListener.scala | 5 +--
4 files changed, 39 insertions(+), 39 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 4bb6060..8d42623 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -50,6 +50,8 @@ public interface MessageIdManager {
Set<MessageId> accessibleMessages(Collection<MessageId> messageIds, final MailboxSession mailboxSession) throws MailboxException;
+ Publisher<Set<MessageId>> accessibleMessagesReactive(Collection<MessageId> messageIds, final MailboxSession mailboxSession);
+
void setFlags(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
Publisher<Void> setFlagsReactive(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 47d44a0..71f7ab4 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -144,20 +144,22 @@ public class StoreMessageIdManager implements MessageIdManager {
}
@Override
- public Set<MessageId> accessibleMessages(Collection<MessageId> messageIds, MailboxSession mailboxSession) throws MailboxException {
+ public Set<MessageId> accessibleMessages(Collection<MessageId> messageIds, MailboxSession mailboxSession) {
+ return accessibleMessagesReactive(messageIds, mailboxSession).block();
+ }
+
+ @Override
+ public Mono<Set<MessageId>> accessibleMessagesReactive(Collection<MessageId> messageIds, MailboxSession mailboxSession) {
MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
- ImmutableList<ComposedMessageIdWithMetaData> idList = Flux.fromIterable(messageIds)
+ return Flux.fromIterable(messageIds)
.flatMap(messageIdMapper::findMetadata, DEFAULT_CONCURRENCY)
.collect(Guavate.toImmutableList())
- .block();
-
- ImmutableSet<MailboxId> allowedMailboxIds = getAllowedMailboxIds(mailboxSession, idList.stream()
- .map(id -> id.getComposedMessageId().getMailboxId()), Right.Read);
-
- return idList.stream()
- .filter(id -> allowedMailboxIds.contains(id.getComposedMessageId().getMailboxId()))
- .map(id -> id.getComposedMessageId().getMessageId())
- .collect(Guavate.toImmutableSet());
+ .flatMap(idList -> getAllowedMailboxIds(mailboxSession, idList.stream()
+ .map(id -> id.getComposedMessageId().getMailboxId()), Right.Read)
+ .map(allowedMailboxIds -> idList.stream()
+ .filter(id -> allowedMailboxIds.contains(id.getComposedMessageId().getMailboxId()))
+ .map(id -> id.getComposedMessageId().getMessageId())
+ .collect(Guavate.toImmutableSet())));
}
@Override
@@ -190,11 +192,7 @@ public class StoreMessageIdManager implements MessageIdManager {
.flatMap(Function.identity(), DEFAULT_CONCURRENCY);
}
- private ImmutableSet<MailboxId> getAllowedMailboxIds(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) throws MailboxException {
- return MailboxReactorUtils.block(getAllowedMailboxIdsReactive(mailboxSession, idList, rights));
- }
-
- private Mono<ImmutableSet<MailboxId>> getAllowedMailboxIdsReactive(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) {
+ private Mono<ImmutableSet<MailboxId>> getAllowedMailboxIds(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) {
return Flux.fromStream(idList)
.distinct()
.filterWhen(hasRightsOnMailboxReactive(mailboxSession, rights), DEFAULT_CONCURRENCY)
@@ -229,7 +227,7 @@ public class StoreMessageIdManager implements MessageIdManager {
return messageIdMapper.findReactive(messageIds, MessageMapper.FetchType.Metadata)
.collectList()
.flatMap(messageList ->
- getAllowedMailboxIdsReactive(mailboxSession,
+ getAllowedMailboxIds(mailboxSession,
messageList
.stream()
.map(MailboxMessage::getMailboxId),
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
index 6210da1..00e2733 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
@@ -23,7 +23,6 @@ import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
-import java.util.Set;
import java.util.stream.Stream;
import javax.inject.Inject;
@@ -35,10 +34,8 @@ import org.apache.james.mailbox.SessionProvider;
import org.apache.james.mailbox.events.MailboxEvents.Added;
import org.apache.james.mailbox.events.MailboxEvents.Expunged;
import org.apache.james.mailbox.events.MailboxEvents.FlagsUpdated;
-import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MessageId;
-import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
@@ -46,6 +43,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
public class EmailChange implements JmapChange {
public static class Builder {
@FunctionalInterface
@@ -185,28 +185,27 @@ public class EmailChange implements JmapChange {
.collect(Guavate.toImmutableList());
}
- public List<JmapChange> fromExpunged(Expunged expunged, ZonedDateTime now, List<Username> sharees) throws MailboxException {
+ public Flux<JmapChange> fromExpunged(Expunged expunged, ZonedDateTime now, List<Username> sharees) {
- EmailChange ownerChange = fromExpunged(expunged, now, expunged.getUsername());
+ Mono<EmailChange> ownerChange = fromExpunged(expunged, now, expunged.getUsername());
- Stream<EmailChange> shareeChanges = sharees.stream()
- .map(Throwing.<Username, EmailChange>function(shareeId -> fromExpunged(expunged, now, shareeId)).sneakyThrow());
+ Flux<EmailChange> shareeChanges = Flux.fromIterable(sharees)
+ .flatMap(shareeId -> fromExpunged(expunged, now, shareeId));
- return Stream.concat(Stream.of(ownerChange), shareeChanges)
- .collect(Guavate.toImmutableList());
+ return Flux.concat(ownerChange, shareeChanges);
}
- private EmailChange fromExpunged(Expunged expunged, ZonedDateTime now, Username username) throws MailboxException {
- Set<MessageId> accessibleMessageIds = messageIdManager.accessibleMessages(expunged.getMessageIds(), sessionProvider.createSystemSession(username));
-
- return EmailChange.builder()
- .accountId(AccountId.fromUsername(username))
- .state(stateFactory.generate())
- .date(now)
- .isDelegated(false)
- .updated(Sets.intersection(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
- .destroyed(Sets.difference(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
- .build();
+ private Mono<EmailChange> fromExpunged(Expunged expunged, ZonedDateTime now, Username username) {
+ return Mono.from(messageIdManager.accessibleMessagesReactive(expunged.getMessageIds(),
+ sessionProvider.createSystemSession(username)))
+ .map(accessibleMessageIds -> EmailChange.builder()
+ .accountId(AccountId.fromUsername(username))
+ .state(stateFactory.generate())
+ .date(now)
+ .isDelegated(false)
+ .updated(Sets.intersection(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
+ .destroyed(Sets.difference(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
+ .build());
}
}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
index b4c6aa9..e3cb5b9 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
@@ -90,8 +90,9 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
.concat(emailChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees.asJava).asScala))
case expunged: Expunged =>
getSharees(mailboxId, username)
- .flatMapIterable(sharees => mailboxChangeFactory.fromExpunged(expunged, now, sharees.asJava).asScala
- .concat(emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava).asScala))
+ .flatMapMany(sharees => SFlux.concat(
+ SFlux.fromIterable(mailboxChangeFactory.fromExpunged(expunged, now, sharees.asJava).asScala),
+ emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava)))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org