You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/06/02 04:54:23 UTC

[james-project] 02/04: [PERFORMANCE] SimpleMessageSearchIndex was iterating a blocking entity

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e6cee29953679d70e55d4c451b0e9ec896c66314
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun May 30 10:11:24 2021 +0700

    [PERFORMANCE] SimpleMessageSearchIndex was iterating a blocking entity
    
    This prevented execution on the parallel scheduler.
---
 .../apache/james/mailbox/store/search/SimpleMessageSearchIndex.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
index 63dc12c..edf1a10 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
@@ -59,6 +59,7 @@ import org.apache.james.util.streams.Iterators;
 import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 /**
@@ -159,7 +160,9 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
     }
 
     private Flux<? extends SearchResult> searchResults(MailboxSession session, Flux<Mailbox> mailboxes, SearchQuery query) throws MailboxException {
-        return mailboxes.concatMap(mailbox -> Flux.fromStream(getSearchResultStream(session, query, mailbox)))
+        return mailboxes.concatMap(mailbox -> Mono.fromCallable(() -> getSearchResultStream(session, query, mailbox))
+                .flatMapMany(Flux::fromStream)
+                .subscribeOn(Schedulers.elastic()))
             .collectSortedList(CombinedComparator.create(query.getSorts()))
             .flatMapMany(list -> Iterators.toFlux(new MessageSearches(list.iterator(), query, textExtractor, attachmentContentLoader, session).iterator()))
             .subscribeOn(Schedulers.elastic());

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org