You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/06/02 04:54:21 UTC

[james-project] branch master updated (78f6db6 -> 119c9f3)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 78f6db6  JAMES-2813 Long running tasks on the MemoryTaskManager generates stackTraces
     new 752b042  [PERFORMANCE] Use reactive version of countMessagesInMailbox in MailboxManager::delete
     new e6cee29  [PERFORMANCE] SimpleMessageSearchIndex was iterating a blocking entity
     new 76e540f  [PERFORMANCE] MailboxACL.union shortcuts
     new 119c9f3  [PERFORMANCE] MailboxChangeListener should be fully reactive

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/james/mailbox/MessageIdManager.java |  2 ++
 .../org/apache/james/mailbox/model/MailboxACL.java |  3 ++
 .../james/mailbox/store/StoreMailboxManager.java   |  3 +-
 .../james/mailbox/store/StoreMessageIdManager.java | 32 +++++++++---------
 .../store/search/SimpleMessageSearchIndex.java     |  5 ++-
 .../apache/james/jmap/api/change/EmailChange.java  | 39 +++++++++++-----------
 .../james/jmap/change/MailboxChangeListener.scala  |  5 +--
 7 files changed, 48 insertions(+), 41 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/04: [PERFORMANCE] MailboxChangeListener should be fully reactive

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 119c9f34a6991c9c0db48245de89e53d8f6d0a55
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun May 30 20:08:31 2021 +0700

    [PERFORMANCE] MailboxChangeListener should be fully reactive
---
 .../org/apache/james/mailbox/MessageIdManager.java |  2 ++
 .../james/mailbox/store/StoreMessageIdManager.java | 32 +++++++++---------
 .../apache/james/jmap/api/change/EmailChange.java  | 39 +++++++++++-----------
 .../james/jmap/change/MailboxChangeListener.scala  |  5 +--
 4 files changed, 39 insertions(+), 39 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 4bb6060..8d42623 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -50,6 +50,8 @@ public interface MessageIdManager {
 
     Set<MessageId> accessibleMessages(Collection<MessageId> messageIds, final MailboxSession mailboxSession) throws MailboxException;
 
+    Publisher<Set<MessageId>> accessibleMessagesReactive(Collection<MessageId> messageIds, final MailboxSession mailboxSession);
+
     void setFlags(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException;
 
     Publisher<Void> setFlagsReactive(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 47d44a0..71f7ab4 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -144,20 +144,22 @@ public class StoreMessageIdManager implements MessageIdManager {
     }
 
     @Override
-    public Set<MessageId> accessibleMessages(Collection<MessageId> messageIds, MailboxSession mailboxSession) throws MailboxException {
+    public Set<MessageId> accessibleMessages(Collection<MessageId> messageIds, MailboxSession mailboxSession) {
+        return accessibleMessagesReactive(messageIds, mailboxSession).block();
+    }
+
+    @Override
+    public Mono<Set<MessageId>> accessibleMessagesReactive(Collection<MessageId> messageIds, MailboxSession mailboxSession) {
         MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
-        ImmutableList<ComposedMessageIdWithMetaData> idList = Flux.fromIterable(messageIds)
+        return Flux.fromIterable(messageIds)
             .flatMap(messageIdMapper::findMetadata, DEFAULT_CONCURRENCY)
             .collect(Guavate.toImmutableList())
-            .block();
-
-        ImmutableSet<MailboxId> allowedMailboxIds = getAllowedMailboxIds(mailboxSession, idList.stream()
-            .map(id -> id.getComposedMessageId().getMailboxId()), Right.Read);
-
-        return idList.stream()
-            .filter(id -> allowedMailboxIds.contains(id.getComposedMessageId().getMailboxId()))
-            .map(id -> id.getComposedMessageId().getMessageId())
-            .collect(Guavate.toImmutableSet());
+            .flatMap(idList -> getAllowedMailboxIds(mailboxSession, idList.stream()
+                .map(id -> id.getComposedMessageId().getMailboxId()), Right.Read)
+                .map(allowedMailboxIds -> idList.stream()
+                    .filter(id -> allowedMailboxIds.contains(id.getComposedMessageId().getMailboxId()))
+                    .map(id -> id.getComposedMessageId().getMessageId())
+                    .collect(Guavate.toImmutableSet())));
     }
 
     @Override
@@ -190,11 +192,7 @@ public class StoreMessageIdManager implements MessageIdManager {
             .flatMap(Function.identity(), DEFAULT_CONCURRENCY);
     }
 
-    private ImmutableSet<MailboxId> getAllowedMailboxIds(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) throws MailboxException {
-        return MailboxReactorUtils.block(getAllowedMailboxIdsReactive(mailboxSession, idList, rights));
-    }
-
-    private Mono<ImmutableSet<MailboxId>> getAllowedMailboxIdsReactive(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) {
+    private Mono<ImmutableSet<MailboxId>> getAllowedMailboxIds(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) {
         return Flux.fromStream(idList)
             .distinct()
             .filterWhen(hasRightsOnMailboxReactive(mailboxSession, rights), DEFAULT_CONCURRENCY)
@@ -229,7 +227,7 @@ public class StoreMessageIdManager implements MessageIdManager {
         return messageIdMapper.findReactive(messageIds, MessageMapper.FetchType.Metadata)
             .collectList()
             .flatMap(messageList ->
-                getAllowedMailboxIdsReactive(mailboxSession,
+                getAllowedMailboxIds(mailboxSession,
                     messageList
                         .stream()
                         .map(MailboxMessage::getMailboxId),
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
index 6210da1..00e2733 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
@@ -23,7 +23,6 @@ import java.time.ZonedDateTime;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
@@ -35,10 +34,8 @@ import org.apache.james.mailbox.SessionProvider;
 import org.apache.james.mailbox.events.MailboxEvents.Added;
 import org.apache.james.mailbox.events.MailboxEvents.Expunged;
 import org.apache.james.mailbox.events.MailboxEvents.FlagsUpdated;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MessageId;
 
-import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
@@ -46,6 +43,9 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class EmailChange implements JmapChange {
     public static class Builder {
         @FunctionalInterface
@@ -185,28 +185,27 @@ public class EmailChange implements JmapChange {
                 .collect(Guavate.toImmutableList());
         }
 
-        public List<JmapChange> fromExpunged(Expunged expunged, ZonedDateTime now, List<Username> sharees) throws MailboxException {
+        public Flux<JmapChange> fromExpunged(Expunged expunged, ZonedDateTime now, List<Username> sharees) {
 
-            EmailChange ownerChange = fromExpunged(expunged, now, expunged.getUsername());
+            Mono<EmailChange> ownerChange = fromExpunged(expunged, now, expunged.getUsername());
 
-            Stream<EmailChange> shareeChanges = sharees.stream()
-                .map(Throwing.<Username, EmailChange>function(shareeId -> fromExpunged(expunged, now, shareeId)).sneakyThrow());
+            Flux<EmailChange> shareeChanges = Flux.fromIterable(sharees)
+                .flatMap(shareeId -> fromExpunged(expunged, now, shareeId));
 
-            return Stream.concat(Stream.of(ownerChange), shareeChanges)
-                .collect(Guavate.toImmutableList());
+            return Flux.concat(ownerChange, shareeChanges);
         }
 
-        private EmailChange fromExpunged(Expunged expunged, ZonedDateTime now, Username username) throws MailboxException {
-            Set<MessageId> accessibleMessageIds = messageIdManager.accessibleMessages(expunged.getMessageIds(), sessionProvider.createSystemSession(username));
-
-            return EmailChange.builder()
-                .accountId(AccountId.fromUsername(username))
-                .state(stateFactory.generate())
-                .date(now)
-                .isDelegated(false)
-                .updated(Sets.intersection(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
-                .destroyed(Sets.difference(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
-                .build();
+        private Mono<EmailChange> fromExpunged(Expunged expunged, ZonedDateTime now, Username username) {
+            return Mono.from(messageIdManager.accessibleMessagesReactive(expunged.getMessageIds(),
+                sessionProvider.createSystemSession(username)))
+                .map(accessibleMessageIds -> EmailChange.builder()
+                    .accountId(AccountId.fromUsername(username))
+                    .state(stateFactory.generate())
+                    .date(now)
+                    .isDelegated(false)
+                    .updated(Sets.intersection(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
+                    .destroyed(Sets.difference(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
+                    .build());
         }
     }
 
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
index b4c6aa9..e3cb5b9 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
@@ -90,8 +90,9 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
           .concat(emailChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees.asJava).asScala))
       case expunged: Expunged =>
         getSharees(mailboxId, username)
-          .flatMapIterable(sharees => mailboxChangeFactory.fromExpunged(expunged, now, sharees.asJava).asScala
-          .concat(emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava).asScala))
+          .flatMapMany(sharees => SFlux.concat(
+            SFlux.fromIterable(mailboxChangeFactory.fromExpunged(expunged, now, sharees.asJava).asScala),
+            emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava)))
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/04: [PERFORMANCE] SimpleMessageSearchIndex was iterating a blocking entity

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e6cee29953679d70e55d4c451b0e9ec896c66314
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun May 30 10:11:24 2021 +0700

    [PERFORMANCE] SimpleMessageSearchIndex was iterating a blocking entity
    
    This prevented execution on the parallel scheduler.
---
 .../apache/james/mailbox/store/search/SimpleMessageSearchIndex.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
index 63dc12c..edf1a10 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
@@ -59,6 +59,7 @@ import org.apache.james.util.streams.Iterators;
 import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 /**
@@ -159,7 +160,9 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
     }
 
     private Flux<? extends SearchResult> searchResults(MailboxSession session, Flux<Mailbox> mailboxes, SearchQuery query) throws MailboxException {
-        return mailboxes.concatMap(mailbox -> Flux.fromStream(getSearchResultStream(session, query, mailbox)))
+        return mailboxes.concatMap(mailbox -> Mono.fromCallable(() -> getSearchResultStream(session, query, mailbox))
+                .flatMapMany(Flux::fromStream)
+                .subscribeOn(Schedulers.elastic()))
             .collectSortedList(CombinedComparator.create(query.getSorts()))
             .flatMapMany(list -> Iterators.toFlux(new MessageSearches(list.iterator(), query, textExtractor, attachmentContentLoader, session).iterator()))
             .subscribeOn(Schedulers.elastic());

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/04: [PERFORMANCE] Use reactive version of countMessagesInMailbox in MailboxManager::delete

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 752b0422f1ac75de0a40d740da4873edec79b079
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun May 30 10:10:21 2021 +0700

    [PERFORMANCE] Use reactive version of countMessagesInMailbox in MailboxManager::delete
    
    This was causing blocking calls on the parallel scheduler and was failing.
---
 .../main/java/org/apache/james/mailbox/store/StoreMailboxManager.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index 89f6e0a..64ff6da 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -449,7 +449,8 @@ public class StoreMailboxManager implements MailboxManager {
         MessageMapper messageMapper = mailboxSessionMapperFactory.getMessageMapper(session);
 
         Mono<QuotaRoot> quotaRootPublisher = Mono.fromCallable(() -> quotaRootResolver.getQuotaRoot(mailbox.generateAssociatedPath()));
-        Mono<Long> messageCountPublisher = Mono.fromCallable(() -> messageMapper.countMessagesInMailbox(mailbox));
+        Mono<Long> messageCountPublisher = Mono.from(messageMapper.getMailboxCountersReactive(mailbox))
+            .map(MailboxCounters::getCount);
 
         return quotaRootPublisher.zipWith(messageCountPublisher).flatMap(quotaRootWithMessageCount -> messageMapper.findInMailboxReactive(mailbox, MessageRange.all(), MessageMapper.FetchType.Metadata, UNLIMITED)
             .map(message -> MetadataWithMailboxId.from(message.metaData(), message.getMailboxId()))

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/04: [PERFORMANCE] MailboxACL.union shortcuts

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 76e540f75d12dd9375a1732773e6264d23361271
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun May 30 10:23:55 2021 +0700

    [PERFORMANCE] MailboxACL.union shortcuts
---
 .../api/src/main/java/org/apache/james/mailbox/model/MailboxACL.java   | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxACL.java b/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxACL.java
index 5fb6637..65daef9 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxACL.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxACL.java
@@ -859,6 +859,9 @@ public class MailboxACL {
      * one of those.
      */
     public MailboxACL union(MailboxACL other) throws UnsupportedRightException {
+        if (entries.isEmpty()) {
+            return other;
+        }
         return new MailboxACL(
             Stream.concat(
                     this.entries.entrySet().stream(),

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org