You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/16 02:11:58 UTC

[james-project] 03/03: JAMES-3149 Reactive GetMessageList

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 58af5c33e911053e9971ff142b26ff640a95a518
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Apr 5 23:50:42 2020 +0700

    JAMES-3149 Reactive GetMessageList
---
 .../versions/CassandraSchemaVersionManager.java    | 22 ++---
 .../SessionWithInitializedTablesFactoryTest.java   | 14 ++--
 .../CassandraSchemaVersionManagerTest.java         |  6 +-
 .../org/apache/james/mailbox/MailboxManager.java   |  2 +-
 .../apache/james/mailbox/MailboxManagerTest.java   | 49 ++++++++----
 .../cassandra/mail/CassandraMailboxMapper.java     | 52 ++++++------
 .../cassandra/mail/CassandraMessageIdMapper.java   | 27 +++++--
 .../task/SolveMailboxInconsistenciesService.java   |  2 +-
 .../cassandra/mail/CassandraMailboxMapperTest.java | 93 ++++++++++++++--------
 .../ElasticSearchListeningMessageSearchIndex.java  |  9 +--
 .../search/ElasticSearchSearcherTest.java          |  3 +-
 .../james/mailbox/jpa/mail/JPAMailboxMapper.java   | 16 ++--
 .../jpa/mail/TransactionalMailboxMapper.java       |  5 +-
 .../lucene/search/LuceneMessageSearchIndex.java    | 10 ++-
 .../LuceneMailboxMessageSearchIndexTest.java       | 12 ++-
 .../mailbox/maildir/mail/MaildirMailboxMapper.java | 18 ++---
 .../inmemory/mail/InMemoryMailboxMapper.java       | 18 ++---
 .../james/vault/DeletedMessageVaultHook.java       | 14 ++--
 .../james/mailbox/store/StoreMailboxManager.java   | 53 ++++++------
 .../james/mailbox/store/mail/MailboxMapper.java    | 26 +++---
 .../store/quota/DefaultUserQuotaRootResolver.java  |  3 +-
 .../store/search/LazyMessageSearchIndex.java       |  4 +-
 .../mailbox/store/search/MessageSearchIndex.java   |  5 +-
 .../store/search/SimpleMessageSearchIndex.java     | 31 ++++----
 .../store/AbstractCombinationManagerTest.java      |  5 +-
 .../store/mail/model/MailboxMapperACLTest.java     | 24 +++---
 .../store/mail/model/MailboxMapperTest.java        | 44 ++--------
 .../quota/DefaultUserQuotaRootResolverTest.java    |  4 +-
 .../search/AbstractMessageSearchIndexTest.java     | 36 ++++++---
 .../CassandraSchemaVersionStartUpCheck.java        |  6 +-
 .../org/apache/james/FakeMessageSearchIndex.java   |  4 +-
 .../cassandra/CassandraRecipientRewriteTable.java  |  2 +-
 .../jmap/draft/methods/GetMessageListMethod.java   | 71 +++++++++--------
 .../james/jmap/draft/methods/ReferenceUpdater.java |  5 +-
 .../mailet/ExtractMDNOriginalJMAPMessageId.java    |  4 +-
 .../routes/DeletedMessagesVaultRoutesTest.java     |  3 +-
 36 files changed, 378 insertions(+), 324 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
index 9b47f39..516fbd3 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
@@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraSchemaVersionManager {
     public static final SchemaVersion MIN_VERSION = new SchemaVersion(5);
     public static final SchemaVersion MAX_VERSION = new SchemaVersion(7);
@@ -65,24 +67,26 @@ public class CassandraSchemaVersionManager {
         this.minVersion = minVersion;
         this.maxVersion = maxVersion;
 
-        this.initialSchemaVersion = computeVersion();
+        this.initialSchemaVersion = computeVersion().block();
     }
 
-    public boolean isBefore(SchemaVersion minimum) {
-        return initialSchemaVersion.isBefore(minimum)
+    public Mono<Boolean> isBefore(SchemaVersion minimum) {
+        if (initialSchemaVersion.isBefore(minimum)) {
             // If we started with a legacy james then maybe schema version had been updated since then
-            && computeVersion().isBefore(minimum);
+            return computeVersion()
+                .map(computedVersion -> computedVersion.isBefore(minimum));
+        }
+        return Mono.just(false);
     }
 
-    public SchemaVersion computeVersion() {
+    public Mono<SchemaVersion> computeVersion() {
         return schemaVersionDAO
             .getCurrentSchemaVersion()
-            .block()
-            .orElseGet(() -> {
+            .map(maybeVersion -> maybeVersion.orElseGet(() -> {
                 LOGGER.warn("No schema version information found on Cassandra, we assume schema is at version {}",
                     CassandraSchemaVersionManager.DEFAULT_VERSION);
                 return DEFAULT_VERSION;
-            });
+            }));
     }
 
     public SchemaVersion getMinimumSupportedVersion() {
@@ -94,7 +98,7 @@ public class CassandraSchemaVersionManager {
     }
 
     public SchemaState computeSchemaState() {
-        SchemaVersion version = computeVersion();
+        SchemaVersion version = computeVersion().block();
         if (version.isBefore(minVersion)) {
             return TOO_OLD;
         } else if (version.isBefore(maxVersion)) {
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java
index 3c3345f..817fe85 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java
@@ -83,39 +83,39 @@ class SessionWithInitializedTablesFactoryTest {
 
     @Test
     void createSessionShouldSetTheLatestSchemaVersionWhenCreatingTypesAndTables() {
-        assertThat(versionManager(testee.get()).computeVersion())
+        assertThat(versionManager(testee.get()).computeVersion().block())
                 .isEqualTo(MAX_VERSION);
     }
 
     @Test
     void createSessionShouldKeepTheSetSchemaVersionWhenTypesAndTablesHaveNotChanged() {
         Session session = testee.get();
-        assertThat(versionManager(session).computeVersion())
+        assertThat(versionManager(session).computeVersion().block())
                 .isEqualTo(MAX_VERSION);
 
         new CassandraTableManager(MODULE, session).clearAllTables();
         versionManagerDAO(session).updateVersion(MIN_VERSION);
-        assertThat(versionManager(session).computeVersion())
+        assertThat(versionManager(session).computeVersion().block())
                 .isEqualTo(MIN_VERSION);
 
-        assertThat(versionManager(testee.get()).computeVersion())
+        assertThat(versionManager(testee.get()).computeVersion().block())
                 .isEqualTo(MIN_VERSION);
     }
 
     @Test
     void createSessionShouldKeepTheSetSchemaVersionWhenTypesAndTablesHavePartiallyChanged() {
         Session session = testee.get();
-        assertThat(versionManager(session).computeVersion())
+        assertThat(versionManager(session).computeVersion().block())
                 .isEqualTo(MAX_VERSION);
 
         new CassandraTableManager(MODULE, session).clearAllTables();
         versionManagerDAO(session).updateVersion(MIN_VERSION);
-        assertThat(versionManager(session).computeVersion())
+        assertThat(versionManager(session).computeVersion().block())
                 .isEqualTo(MIN_VERSION);
         session.execute(SchemaBuilder.dropTable(TABLE_NAME));
         session.execute(SchemaBuilder.dropType(TYPE_NAME));
 
-        assertThat(versionManager(testee.get()).computeVersion())
+        assertThat(versionManager(testee.get()).computeVersion().block())
                 .isEqualTo(MIN_VERSION);
     }
 
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java
index 64c498e..4fb33fb 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java
@@ -74,7 +74,7 @@ class CassandraSchemaVersionManagerTest {
             minVersion,
             maxVersion);
 
-        assertThat(testee.isBefore(maxVersion)).isTrue();
+        assertThat(testee.isBefore(maxVersion).block()).isTrue();
     }
 
     @Test
@@ -89,7 +89,7 @@ class CassandraSchemaVersionManagerTest {
             minVersion,
             maxVersion);
 
-        assertThat(testee.isBefore(maxVersion)).isFalse();
+        assertThat(testee.isBefore(maxVersion).block()).isFalse();
     }
 
     @Test
@@ -107,7 +107,7 @@ class CassandraSchemaVersionManagerTest {
         when(schemaVersionDAO.getCurrentSchemaVersion())
             .thenReturn(Mono.just(Optional.of(maxVersion)));
 
-        assertThat(testee.isBefore(maxVersion)).isFalse();
+        assertThat(testee.isBefore(maxVersion).block()).isFalse();
     }
 
     @Test
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
index 6b83d9f..9f31163 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
@@ -251,7 +251,7 @@ public interface MailboxManager extends RequestAware, RightManager, MailboxAnnot
      * @param session
      *            the context for this call, not null
      */
-    List<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException;
+    Publisher<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException;
 
     /**
      * Does the given mailbox exist?
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
index 5397ed9..f51c124 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
@@ -94,6 +94,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -1208,7 +1209,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .build();
 
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsOnly(cacahueteMessageId, pirouetteMessageId);
         }
 
@@ -1237,7 +1239,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .from(new SearchQuery())
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsOnly(messageId);
         }
 
@@ -1264,7 +1267,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .from(new SearchQuery())
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .isEmpty();
         }
 
@@ -1284,7 +1288,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .from(new SearchQuery())
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .isEmpty();
         }
 
@@ -1305,7 +1310,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .inMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .isEmpty();
         }
 
@@ -1325,7 +1331,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .notInMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .isEmpty();
         }
 
@@ -1346,7 +1353,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .notInMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .isEmpty();
         }
 
@@ -1375,7 +1383,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .inMailboxes(searchedMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId);
         }
 
@@ -1850,9 +1859,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .inMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .isEmpty();
-            assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId1, messageId2);
         }
 
@@ -1889,9 +1900,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .inMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId2);
-            assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(composedMessageId1.getMessageId());
         }
 
@@ -1979,9 +1992,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .inMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId1, messageId2);
-            assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId1, messageId2);
         }
 
@@ -2019,9 +2034,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .inMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId1, messageId2);
-            assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId1);
         }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
index d74ce1c..5a15d9e 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
@@ -20,9 +20,7 @@
 package org.apache.james.mailbox.cassandra.mail;
 
 import java.time.Duration;
-import java.util.Collection;
 import java.util.List;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -82,7 +80,7 @@ public class CassandraMailboxMapper implements MailboxMapper {
         this.versionManager = versionManager;
     }
 
-    private boolean needMailboxPathV1Support() {
+    private Mono<Boolean> needMailboxPathV1Support() {
         return versionManager.isBefore(MAILBOX_PATH_V_2_MIGRATION_PERFORMED_VERSION);
     }
 
@@ -96,12 +94,15 @@ public class CassandraMailboxMapper implements MailboxMapper {
     }
 
     private Flux<Void> deletePath(Mailbox mailbox) {
-        if (needMailboxPathV1Support()) {
-            return Flux.merge(
-                mailboxPathDAO.delete(mailbox.generateAssociatedPath()),
-                mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()));
-        }
-        return Flux.from(mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()));
+        return needMailboxPathV1Support()
+            .flatMapMany(needSupport -> {
+                if (needSupport) {
+                    return Flux.merge(
+                        mailboxPathDAO.delete(mailbox.generateAssociatedPath()),
+                        mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()));
+                }
+                return Flux.from(mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()));
+            });
     }
 
     @Override
@@ -144,11 +145,9 @@ public class CassandraMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public Stream<Mailbox> findMailboxesById(Collection<MailboxId> mailboxIds) {
-        return Flux.fromIterable(mailboxIds)
-            .map(CassandraId.class::cast)
-            .concatMap(this::retrieveMailbox)
-            .toStream();
+    public Mono<Mailbox> findMailboxByIdReactive(MailboxId id) {
+        CassandraId mailboxId = (CassandraId) id;
+        return retrieveMailbox(mailboxId);
     }
 
     private Mono<Mailbox> retrieveMailbox(CassandraId mailboxId) {
@@ -164,24 +163,25 @@ public class CassandraMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
+    public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
         String fixedNamespace = query.getFixedNamespace();
         Username fixedUser = query.getFixedUser();
 
         return listPaths(fixedNamespace, fixedUser)
             .filter(idAndPath -> query.isPathMatch(idAndPath.getMailboxPath()))
             .distinct(CassandraIdAndPath::getMailboxPath)
-            .concatMap(this::retrieveMailbox)
-            .collectList()
-            .block();
+            .concatMap(this::retrieveMailbox);
     }
 
     private Flux<CassandraIdAndPath> listPaths(String fixedNamespace, Username fixedUser) {
-        if (needMailboxPathV1Support()) {
-            return Flux.concat(mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser),
-                mailboxPathDAO.listUserMailboxes(fixedNamespace, fixedUser));
-        }
-        return mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser);
+        return needMailboxPathV1Support()
+            .flatMapMany(needSupport -> {
+                if (needSupport) {
+                    return Flux.concat(mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser),
+                        mailboxPathDAO.listUserMailboxes(fixedNamespace, fixedUser));
+                }
+                return mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser);
+            });
     }
 
     private Mono<Mailbox> retrieveMailbox(CassandraIdAndPath idAndPath) {
@@ -302,12 +302,10 @@ public class CassandraMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
+    public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
         return userMailboxRightsDAO.listRightsForUser(userName)
             .filter(mailboxId -> mailboxId.getRight().contains(right))
             .map(Pair::getLeft)
-            .flatMap(this::retrieveMailbox)
-            .collectList()
-            .block();
+            .flatMap(this::retrieveMailbox);
     }
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 47cd80b..107e37b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -32,6 +32,7 @@ import org.apache.james.mailbox.MessageManager;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.exception.MailboxNotFoundException;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MailboxId;
@@ -48,6 +49,7 @@ import org.apache.james.util.streams.Limit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.fge.lambdas.runnable.ThrowingRunnable;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.Multimap;
 
@@ -129,20 +131,31 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     @Override
     public void save(MailboxMessage mailboxMessage) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
-        mailboxMapper.findMailboxById(mailboxId);
-
-        messageDAO.save(mailboxMessage)
+        unbox(() -> mailboxMapper.findMailboxByIdReactive(mailboxId)
+            .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailboxId)))
+            .then(messageDAO.save(mailboxMessage))
             .thenEmpty(saveMessageMetadata(mailboxMessage, mailboxId))
-            .block();
+            .block());
     }
 
     @Override
     public void copyInMailbox(MailboxMessage mailboxMessage) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
-        mailboxMapper.findMailboxById(mailboxId);
+        unbox(() -> mailboxMapper.findMailboxByIdReactive(mailboxId)
+            .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailboxId)))
+            .then(saveMessageMetadata(mailboxMessage, mailboxId))
+            .block());
+    }
 
-        saveMessageMetadata(mailboxMessage, mailboxId)
-            .block();
+    private void unbox(ThrowingRunnable runnable) throws MailboxNotFoundException {
+        try {
+            runnable.run();
+        } catch (RuntimeException e) {
+            if (e.getCause() instanceof MailboxNotFoundException) {
+                throw (MailboxNotFoundException) e.getCause();
+            }
+            throw e;
+        }
     }
 
     private Mono<Void> saveMessageMetadata(MailboxMessage mailboxMessage, CassandraId mailboxId) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
index 5aeb181..a10fc6d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
@@ -375,7 +375,7 @@ public class SolveMailboxInconsistenciesService {
     }
 
     private void assertValidVersion() {
-        SchemaVersion version = versionManager.computeVersion();
+        SchemaVersion version = versionManager.computeVersion().block();
 
         boolean isVersionValid = version.isAfterOrEquals(MAILBOX_PATH_V_2_MIGRATION_PERFORMED_VERSION);
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
index f6d1dc2..f8e6839 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
@@ -23,8 +23,6 @@ import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
 import static org.apache.james.mailbox.model.MailboxAssertingTool.softly;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
 
 import java.util.List;
 
@@ -61,7 +59,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.github.fge.lambdas.Throwing;
 import com.github.fge.lambdas.runnable.ThrowingRunnable;
-import reactor.core.publisher.Mono;
 
 class CassandraMailboxMapperTest {
     private static final UidValidity UID_VALIDITY = UidValidity.of(52);
@@ -162,7 +159,8 @@ class CassandraMailboxMapperTest {
                     softly(softly)
                         .assertThat(testee.findMailboxByPath(inboxPathRenamed).block())
                         .isEqualTo(inboxRenamed);
-                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                        .collectList().block())
                         .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                             .assertThat(searchMailbox)
                             .isEqualTo(inboxRenamed));
@@ -189,7 +187,8 @@ class CassandraMailboxMapperTest {
                     softly(softly)
                         .assertThat(testee.findMailboxByPath(inboxPathRenamed).block())
                         .isEqualTo(inboxRenamed);
-                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                        .collectList().block())
                         .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                             .assertThat(searchMailbox)
                             .isEqualTo(inboxRenamed));
@@ -212,7 +211,8 @@ class CassandraMailboxMapperTest {
                     softly(softly)
                         .assertThat(testee.findMailboxByPath(inboxPath).block())
                         .isEqualTo(inbox);
-                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                        .collectList().block())
                         .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                             .assertThat(searchMailbox)
                             .isEqualTo(inbox));
@@ -235,7 +235,8 @@ class CassandraMailboxMapperTest {
                         .isInstanceOf(MailboxNotFoundException.class);
                     softly.assertThat(testee.findMailboxByPath(inboxPath).blockOptional())
                         .isEmpty();
-                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                        .collectList().block())
                         .isEmpty();
                 }));
             }
@@ -253,9 +254,11 @@ class CassandraMailboxMapperTest {
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(testee.findMailboxByPath(inboxPath).blockOptional())
                     .isEmpty();
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .isEmpty();
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .isEmpty();
             });
         }
@@ -282,7 +285,8 @@ class CassandraMailboxMapperTest {
                 softly(softly)
                     .assertThat(testee.findMailboxByPath(inboxPath).block())
                     .isEqualTo(inbox);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -304,7 +308,8 @@ class CassandraMailboxMapperTest {
             doQuietly(() -> testee.rename(inboxRenamed));
 
             SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -328,7 +333,8 @@ class CassandraMailboxMapperTest {
             SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
                 softly.assertThatThrownBy(() -> testee.findMailboxByPath(inboxPathRenamed))
                     .isInstanceOf(MailboxNotFoundException.class);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)
+                    .collectList().block())
                     .isEmpty();
             }));
         }
@@ -353,7 +359,8 @@ class CassandraMailboxMapperTest {
                 softly(softly)
                     .assertThat(testee.findMailboxByPath(inboxPath).block())
                     .isEqualTo(inbox);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -375,7 +382,8 @@ class CassandraMailboxMapperTest {
             doQuietly(() -> testee.rename(inboxRenamed));
 
             SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -399,7 +407,8 @@ class CassandraMailboxMapperTest {
             SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
                 softly.assertThatThrownBy(() -> testee.findMailboxByPath(inboxPathRenamed))
                     .isInstanceOf(MailboxNotFoundException.class);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)
+                    .collectList().block())
                     .isEmpty();
             }));
         }
@@ -422,11 +431,13 @@ class CassandraMailboxMapperTest {
                     .doesNotThrowAnyException();
                 softly.assertThatCode(() -> testee.findMailboxByPath(inboxPath))
                     .doesNotThrowAnyException();
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -468,11 +479,13 @@ class CassandraMailboxMapperTest {
                 softly(softly)
                     .assertThat(testee.findMailboxByPath(inboxPath).block())
                     .isEqualTo(inbox);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -495,11 +508,13 @@ class CassandraMailboxMapperTest {
                 softly(softly)
                     .assertThat(testee.findMailboxByPath(inboxPath).block())
                     .isEqualTo(inbox);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -525,9 +540,11 @@ class CassandraMailboxMapperTest {
                     .isInstanceOf(MailboxNotFoundException.class);
                     softly.assertThat(testee.findMailboxByPath(inboxPath).blockOptional())
                         .isEmpty();
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .isEmpty();
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .isEmpty();
             }));
         }
@@ -556,13 +573,16 @@ class CassandraMailboxMapperTest {
                 softly(softly)
                     .assertThat(testee.findMailboxByPath(inboxPathRenamed).block())
                     .isEqualTo(inboxRenamed);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .isEmpty();
-                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inboxRenamed));
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inboxRenamed));
@@ -592,14 +612,17 @@ class CassandraMailboxMapperTest {
                 softly(softly)
                     .assertThat(testee.findMailboxByPath(inboxPathRenamed).block())
                     .isEqualTo(inboxRenamed);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .isEmpty();
-                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox ->
                         softly(softly)
                             .assertThat(searchMailbox)
                             .isEqualTo(inboxRenamed));
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inboxRenamed));
@@ -763,7 +786,8 @@ class CassandraMailboxMapperTest {
             .username(USER)
             .expression(Wildcard.INSTANCE)
             .build()
-            .asUserBound());
+            .asUserBound())
+            .collectList().block();
 
         assertThat(mailboxes).containsOnly(MAILBOX);
     }
@@ -782,7 +806,8 @@ class CassandraMailboxMapperTest {
             .username(USER)
             .expression(Wildcard.INSTANCE)
             .build()
-            .asUserBound());
+            .asUserBound())
+            .collectList().block();
 
         assertThat(mailboxes).containsOnly(MAILBOX);
     }
@@ -799,7 +824,8 @@ class CassandraMailboxMapperTest {
             .username(USER)
             .expression(Wildcard.INSTANCE)
             .build()
-            .asUserBound());
+            .asUserBound())
+            .collectList().block();
 
         assertThat(mailboxes).containsOnly(MAILBOX);
     }
@@ -877,7 +903,8 @@ class CassandraMailboxMapperTest {
                 .username(USER)
                 .expression(Wildcard.INSTANCE)
                 .build()
-                .asUserBound()))
+                .asUserBound())
+            .collectList().block())
             .containsOnly(MAILBOX);
     }
 }
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index b64ecb3..a85467b 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -62,6 +62,7 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSearchIndex {
@@ -118,11 +119,11 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
     }
     
     @Override
-    public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) {
+    public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
 
         if (mailboxIds.isEmpty()) {
-            return ImmutableList.of();
+            return Flux.empty();
         }
 
         return searcher.search(mailboxIds, searchQuery, Optional.empty())
@@ -130,9 +131,7 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
             .map(SearchResult::getMessageId)
             .flatMap(Mono::justOrEmpty)
             .distinct()
-            .take(limit)
-            .collect(Guavate.toImmutableList())
-            .block();
+            .take(limit);
     }
 
     @Override
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
index f7d6564..fcaa97b 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
@@ -154,7 +154,8 @@ class ElasticSearchSearcherTest {
             .stream()
             .map(ComposedMessageId::getMessageId)
             .collect(Guavate.toImmutableList());
-        assertThat(storeMailboxManager.search(multimailboxesSearchQuery, session, numberOfMailboxes + 1))
+        assertThat(storeMailboxManager.search(multimailboxesSearchQuery, session, numberOfMailboxes + 1)
+            .collectList().block())
             .containsExactlyInAnyOrderElementsOf(expectedMessageIds);
     }
 
diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java
index 777edf3..c2d4514 100644
--- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java
+++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java
@@ -48,8 +48,8 @@ import org.apache.james.mailbox.store.mail.MailboxMapper;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -186,15 +186,13 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
     }
 
     @Override
-    public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
+    public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
         try {
             String pathLike = MailboxExpressionBackwardCompatibility.getPathLike(query);
-            return findMailboxWithPathLikeTypedQuery(query.getFixedNamespace(), query.getFixedUser(), pathLike)
-                .getResultList()
-                .stream()
+            return Flux.fromIterable(findMailboxWithPathLikeTypedQuery(query.getFixedNamespace(), query.getFixedUser(), pathLike)
+                .getResultList())
                 .map(JPAMailbox::toMailbox)
-                .filter(query::matches)
-                .collect(Guavate.toImmutableList());
+                .filter(query::matches);
         } catch (PersistenceException e) {
             throw new MailboxException("Search of mailbox " + query + " failed", e);
         }
@@ -250,7 +248,7 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
     }
 
     @Override
-    public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException {
-        return ImmutableList.of();
+    public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
+        return Flux.empty();
     }
 }
diff --git a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java
index 30b4da7..787db5b 100644
--- a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java
+++ b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java
@@ -36,6 +36,7 @@ import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.transaction.Mapper;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class TransactionalMailboxMapper implements MailboxMapper {
@@ -81,7 +82,7 @@ public class TransactionalMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
+    public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
         return wrapped.findMailboxWithPathLike(query);
     }
 
@@ -106,7 +107,7 @@ public class TransactionalMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException {
+    public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
         return wrapped.findNonPersonalMailboxes(userName, right);
     }
 
diff --git a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
index b93306b..793f4a0 100644
--- a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
+++ b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
@@ -123,6 +123,8 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+
 /**
  * Lucene based {@link ListeningMessageSearchIndex} which offers message searching via a Lucene index
  */
@@ -461,18 +463,18 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex {
     }
 
     @Override
-    public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+    public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
         if (mailboxIds.isEmpty()) {
-            return ImmutableList.of();
+            return Flux.empty();
         }
 
-        return searchMultimap(mailboxIds, searchQuery)
+        return Flux.fromIterable(searchMultimap(mailboxIds, searchQuery)
             .stream()
             .map(searchResult -> searchResult.getMessageId().get())
             .filter(SearchUtil.distinct())
             .limit(Long.valueOf(limit).intValue())
-            .collect(Guavate.toImmutableList());
+            .collect(Guavate.toImmutableList()));
     }
     
     private List<SearchResult> searchMultimap(Collection<MailboxId> mailboxIds, SearchQuery searchQuery) throws MailboxException {
diff --git a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
index 6ff52fb..4fa87f4 100644
--- a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
+++ b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
@@ -310,7 +310,8 @@ class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.bodyContains("My Body"));
 
-        List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT);
+        List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT)
+            .collectList().block();
 
         assertThat(result).containsOnly(id1, id2);
     }
@@ -323,7 +324,8 @@ class LuceneMailboxMessageSearchIndexTest {
         List<MessageId> result = index.search(session,
                 ImmutableList.of(mailbox.getMailboxId(), mailbox3.getMailboxId()),
                 query,
-                LIMIT);
+                LIMIT)
+            .collectList().block();
 
         assertThat(result).containsOnly(id1);
     }
@@ -333,7 +335,8 @@ class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.all());
 
-        List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT);
+        List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT)
+            .collectList().block();
 
         // The query is not limited to one mailbox and we have 5 indexed messages
         assertThat(result).hasSize(5);
@@ -345,7 +348,8 @@ class LuceneMailboxMessageSearchIndexTest {
         query.andCriteria(SearchQuery.all());
 
         int limit = 1;
-        List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, limit);
+        List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, limit)
+            .collectList().block();
 
         assertThat(result).hasSize(limit);
     }
diff --git a/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java b/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java
index d9b5c0d..05d9799 100644
--- a/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java
+++ b/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java
@@ -51,9 +51,7 @@ import org.apache.james.mailbox.store.transaction.NonTransactionalMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.steveash.guavate.Guavate;
-import com.google.common.collect.ImmutableList;
-
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class MaildirMailboxMapper extends NonTransactionalMapper implements MailboxMapper {
@@ -128,7 +126,7 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail
     }
     
     @Override
-    public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
+    public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
         String pathLike = MailboxExpressionBackwardCompatibility.getPathLike(query);
         final Pattern searchPattern = Pattern.compile("[" + MaildirStore.maildirDelimiter + "]"
                 + pathLike.replace(".", "\\.").replace(MaildirStore.WILDCARD, ".*"));
@@ -147,9 +145,8 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail
             Mailbox mailbox = maildirStore.loadMailbox(session, root, query.getFixedNamespace(), query.getFixedUser(), "");
             mailboxList.add(0, mailbox);
         }
-        return mailboxList.stream()
-            .filter(query::matches)
-            .collect(Guavate.toImmutableList());
+        return Flux.fromIterable(mailboxList)
+            .filter(query::matches);
     }
 
     @Override
@@ -159,7 +156,8 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail
             .userAndNamespaceFrom(mailbox.generateAssociatedPath())
             .expression(new PrefixedWildcard(mailbox.getName() + delimiter))
             .build()
-            .asUserBound());
+            .asUserBound())
+            .collectList().block();
         return mailboxes.size() > 0;
     }
 
@@ -333,7 +331,7 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail
     }
 
     @Override
-    public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException {
-        return ImmutableList.of();
+    public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
+        return Flux.empty();
     }
 }
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java
index fc55302..77db71d 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java
@@ -40,10 +40,10 @@ import org.apache.james.mailbox.model.UidValidity;
 import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
 
-import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class InMemoryMailboxMapper implements MailboxMapper {
@@ -84,12 +84,10 @@ public class InMemoryMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
-        return mailboxesByPath.values()
-            .stream()
+    public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
+        return Flux.fromIterable(mailboxesByPath.values())
             .filter(query::matches)
-            .map(Mailbox::new)
-            .collect(Guavate.toImmutableList());
+            .map(Mailbox::new);
     }
 
     @Override
@@ -166,11 +164,9 @@ public class InMemoryMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException {
-        return mailboxesByPath.values()
-            .stream()
-            .filter(mailbox -> hasRightOn(mailbox, userName, right))
-            .collect(Guavate.toImmutableList());
+    public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
+        return Flux.fromIterable(mailboxesByPath.values())
+            .filter(mailbox -> hasRightOn(mailbox, userName, right));
     }
 
     private Boolean hasRightOn(Mailbox mailbox, Username userName, Right right) {
diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java
index a69b4bd..cdb5071 100644
--- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java
+++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java
@@ -35,6 +35,7 @@ import org.apache.james.mailbox.MetadataWithMailboxId;
 import org.apache.james.mailbox.SessionProvider;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.extension.PreDeletionHook;
+import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
@@ -154,18 +155,19 @@ public class DeletedMessageVaultHook implements PreDeletionHook {
             .flatMap(groupFlux -> groupFlux.reduce(DeletedMessageMailboxContext::combine));
     }
 
-    private Publisher<DeletedMessageMailboxContext> addOwnerToMetadata(GroupedFlux<MailboxId, MetadataWithMailboxId> groupedFlux) throws MailboxException {
-        Username owner = retrieveMailboxUser(groupedFlux.key());
-        return groupedFlux.map(metadata -> new DeletedMessageMailboxContext(metadata.getMessageMetaData().getMessageId(), owner, ImmutableList.of(metadata.getMailboxId())));
+    private Flux<DeletedMessageMailboxContext> addOwnerToMetadata(GroupedFlux<MailboxId, MetadataWithMailboxId> groupedFlux) throws MailboxException {
+        return retrieveMailboxUser(groupedFlux.key())
+            .flatMapMany(owner -> groupedFlux.map(metadata ->
+                new DeletedMessageMailboxContext(metadata.getMessageMetaData().getMessageId(), owner, ImmutableList.of(metadata.getMailboxId()))));
     }
 
     private Pair<MessageId, Username> toMessageIdUserPair(DeletedMessageMailboxContext deletedMessageMetadata) {
         return Pair.of(deletedMessageMetadata.getMessageId(), deletedMessageMetadata.getOwner());
     }
 
-    private Username retrieveMailboxUser(MailboxId mailboxId) throws MailboxException {
+    private Mono<Username> retrieveMailboxUser(MailboxId mailboxId) throws MailboxException {
         return mapperFactory.getMailboxMapper(session)
-            .findMailboxById(mailboxId)
-            .getUser();
+            .findMailboxByIdReactive(mailboxId)
+            .map(Mailbox::getUser);
     }
 }
\ No newline at end of file
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index c3d7c02..76f8bfb 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -93,6 +93,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -536,7 +537,7 @@ public class StoreMailboxManager implements MailboxManager {
             .build()
             .asUserBound();
         locker.executeWithLock(from, (LockAwareExecution<Void>) () -> {
-            List<Mailbox> subMailboxes = mapper.findMailboxWithPathLike(query);
+            List<Mailbox> subMailboxes = mapper.findMailboxWithPathLike(query).collectList().block();
             for (Mailbox sub : subMailboxes) {
                 String subOriginalName = sub.getName();
                 String subNewName = newMailboxPath.getName() + subOriginalName.substring(from.getName().length());
@@ -596,7 +597,7 @@ public class StoreMailboxManager implements MailboxManager {
     }
 
     private List<MailboxMetaData> searchMailboxesMetadata(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException {
-        List<Mailbox> mailboxes = searchMailboxes(mailboxQuery, session, right);
+        List<Mailbox> mailboxes = searchMailboxes(mailboxQuery, session, right).collectList().block();
 
         ImmutableMap<MailboxId, MailboxCounters> counters = getMailboxCounters(mailboxes, session)
             .stream()
@@ -614,16 +615,13 @@ public class StoreMailboxManager implements MailboxManager {
             .collect(Guavate.toImmutableList());
     }
 
-    private List<Mailbox> searchMailboxes(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException {
+    private Flux<Mailbox> searchMailboxes(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException {
         MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session);
-        Stream<Mailbox> baseMailboxes = mailboxMapper
-            .findMailboxWithPathLike(toSingleUserQuery(mailboxQuery, session))
-            .stream();
-        Stream<Mailbox> delegatedMailboxes = getDelegatedMailboxes(mailboxMapper, mailboxQuery, right, session);
-        return Stream.concat(baseMailboxes, delegatedMailboxes)
+        Flux<Mailbox> baseMailboxes = mailboxMapper.findMailboxWithPathLike(toSingleUserQuery(mailboxQuery, session));
+        Flux<Mailbox> delegatedMailboxes = getDelegatedMailboxes(mailboxMapper, mailboxQuery, right, session);
+        return Flux.merge(baseMailboxes, delegatedMailboxes)
             .distinct()
-            .filter(Throwing.predicate(mailbox -> storeRightManager.hasRight(mailbox, right, session)))
-            .collect(Guavate.toImmutableList());
+            .filter(Throwing.predicate(mailbox -> storeRightManager.hasRight(mailbox, right, session)));
     }
 
     static MailboxQuery.UserBound toSingleUserQuery(MailboxQuery mailboxQuery, MailboxSession mailboxSession) {
@@ -644,12 +642,12 @@ public class StoreMailboxManager implements MailboxManager {
             .build());
     }
 
-    private Stream<Mailbox> getDelegatedMailboxes(MailboxMapper mailboxMapper, MailboxQuery mailboxQuery,
-                                                  Right right, MailboxSession session) throws MailboxException {
+    private Flux<Mailbox> getDelegatedMailboxes(MailboxMapper mailboxMapper, MailboxQuery mailboxQuery,
+                                                Right right, MailboxSession session) {
         if (mailboxQuery.isPrivateMailboxes(session)) {
-            return Stream.of();
+            return Flux.empty();
         }
-        return mailboxMapper.findNonPersonalMailboxes(session.getUser(), right).stream();
+        return mailboxMapper.findNonPersonalMailboxes(session.getUser(), right);
     }
 
     private MailboxMetaData toMailboxMetadata(MailboxSession session, List<Mailbox> mailboxes, Mailbox mailbox, MailboxCounters counters) throws UnsupportedRightException {
@@ -677,32 +675,31 @@ public class StoreMailboxManager implements MailboxManager {
     }
 
     @Override
-    public List<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException {
-        ImmutableSet<MailboxId> wantedMailboxesId =
-            getInMailboxes(expression.getInMailboxes(), session)
-                .filter(id -> !expression.getNotInMailboxes().contains(id))
-                .collect(Guavate.toImmutableSet());
-
-        return index.search(session, wantedMailboxesId, expression.getSearchQuery(), limit);
+    public Flux<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException {
+        return getInMailboxes(expression.getInMailboxes(), session)
+            .filter(id -> !expression.getNotInMailboxes().contains(id))
+            .collect(Guavate.toImmutableSet())
+            .flatMapMany(Throwing.function(ids -> index.search(session, ids, expression.getSearchQuery(), limit)));
     }
 
-    private Stream<MailboxId> getInMailboxes(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException {
-        if (inMailboxes.isEmpty()) {
+
+    private Flux<MailboxId> getInMailboxes(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException {
+       if (inMailboxes.isEmpty()) {
             return getAllReadableMailbox(session);
         } else {
             return filterReadable(inMailboxes, session);
         }
     }
 
-    private Stream<MailboxId> getAllReadableMailbox(MailboxSession session) throws MailboxException {
+    private Flux<MailboxId> getAllReadableMailbox(MailboxSession session) throws MailboxException {
         return searchMailboxes(MailboxQuery.builder().matchesAllMailboxNames().build(), session, Right.Read)
-            .stream()
             .map(Mailbox::getMailboxId);
     }
 
-    private Stream<MailboxId> filterReadable(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException {
-        return mailboxSessionMapperFactory.getMailboxMapper(session)
-            .findMailboxesById(inMailboxes)
+    private Flux<MailboxId> filterReadable(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException {
+        MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session);
+        return Flux.fromIterable(inMailboxes)
+            .concatMap(mailboxMapper::findMailboxByIdReactive)
             .filter(Throwing.<Mailbox>predicate(mailbox -> storeRightManager.hasRight(mailbox, Right.Read, session)).sneakyThrow())
             .map(Mailbox::getMailboxId);
     }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
index 7ace127..6c5e3b8 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
@@ -18,9 +18,7 @@
  ****************************************************************/
 package org.apache.james.mailbox.store.mail;
 
-import java.util.Collection;
 import java.util.List;
-import java.util.stream.Stream;
 
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.acl.ACLDiff;
@@ -35,8 +33,7 @@ import org.apache.james.mailbox.model.UidValidity;
 import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.mailbox.store.transaction.Mapper;
 
-import com.github.fge.lambdas.Throwing;
-
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -87,26 +84,25 @@ public interface MailboxMapper extends Mapper {
     Mailbox findMailboxById(MailboxId mailboxId)
             throws MailboxException, MailboxNotFoundException;
 
-    default Stream<Mailbox> findMailboxesById(Collection<MailboxId> mailboxIds) throws MailboxException {
-        return mailboxIds.stream()
-            .flatMap(Throwing.<MailboxId, Stream<Mailbox>>function(id -> {
-                try {
-                    return Stream.of(findMailboxById(id));
-                } catch (MailboxNotFoundException e) {
-                    return Stream.empty();
-                }
-            }).sneakyThrow());
+    default Mono<Mailbox> findMailboxByIdReactive(MailboxId id) {
+        try {
+            return Mono.justOrEmpty(findMailboxById(id));
+        } catch (MailboxNotFoundException e) {
+            return Mono.empty();
+        } catch (MailboxException e) {
+            return Mono.error(e);
+        }
     }
 
     /**
      * Return a List of {@link Mailbox} for the given userName and matching the right
      */
-    List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException;
+    Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right);
 
     /**
      * Return a List of {@link Mailbox} which name is like the given name
      */
-    List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query)
+    Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query)
             throws MailboxException;
 
     /**
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
index 9c29ac4..6ea1f88 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
@@ -118,6 +118,7 @@ public class DefaultUserQuotaRootResolver implements UserQuotaRootResolver {
                 .user(Username.of(user))
                 .matchesAllMailboxNames()
                 .build()
-                .asUserBound());
+                .asUserBound())
+            .collectList().block();
     }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
index 4a06c64..6e75f26 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
@@ -47,6 +47,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Flux;
+
 /**
  * {@link ListeningMessageSearchIndex} implementation which wraps another {@link ListeningMessageSearchIndex} and will forward all calls to it.
  * 
@@ -141,7 +143,7 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
     
 
     @Override
-    public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+    public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
         throw new UnsupportedSearchException();
     }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
index eb1dcee..4873099 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
@@ -21,7 +21,6 @@ package org.apache.james.mailbox.store.search;
 
 import java.util.Collection;
 import java.util.EnumSet;
-import java.util.List;
 import java.util.Optional;
 import java.util.stream.Stream;
 
@@ -34,6 +33,8 @@ import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.SearchQuery;
 
+import reactor.core.publisher.Flux;
+
 /**
  * An index which can be used to search for MailboxMessage UID's that match a {@link SearchQuery}.
  * 
@@ -50,7 +51,7 @@ public interface MessageSearchIndex {
     /**
      * Return all uids of all {@link Mailbox}'s the current user has access to which match the {@link SearchQuery}
      */
-    List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException;
+    Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException;
 
     EnumSet<MailboxManager.SearchCapabilities> getSupportedCapabilities(EnumSet<MailboxManager.MessageCapabilities> messageCapabilities);
 
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
index 26b602d..ed6fd17 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
@@ -53,10 +53,11 @@ import org.apache.james.mailbox.store.mail.MessageMapperFactory;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 
 import com.github.fge.lambdas.Throwing;
-import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+
 /**
  * {@link MessageSearchIndex} which just fetch {@link MailboxMessage}'s from the {@link MessageMapper} and use {@link MessageSearcher}
  * to match them against the {@link SearchQuery}.
@@ -108,10 +109,10 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
     @Override
     public Stream<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
-        return searchResults(session, ImmutableList.of(mailbox).stream(), query)
-            .stream()
+        return searchResults(session, Flux.just(mailbox), query)
             .filter(searchResult -> searchResult.getMailboxId().equals(mailbox.getMailboxId()))
-            .map(SearchResult::getMessageUid);
+            .map(SearchResult::getMessageUid)
+            .toStream();
     }
 
     private List<SearchResult> searchResults(MailboxSession session, Mailbox mailbox, SearchQuery query) throws MailboxException {
@@ -142,19 +143,17 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
     }
 
     @Override
-    public List<MessageId> search(MailboxSession session, final Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
-        MailboxMapper mailboxManager = mailboxMapperFactory.getMailboxMapper(session);
+    public Flux<MessageId> search(MailboxSession session, final Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+        MailboxMapper mailboxMapper = mailboxMapperFactory.getMailboxMapper(session);
 
-        Stream<Mailbox> filteredMailboxes = mailboxIds
-            .stream()
-            .map(Throwing.function(mailboxManager::findMailboxById).sneakyThrow());
+        Flux<Mailbox> filteredMailboxes = Flux.fromIterable(mailboxIds)
+            .concatMap(Throwing.function(mailboxMapper::findMailboxByIdReactive).sneakyThrow());
 
         return getAsMessageIds(searchResults(session, filteredMailboxes, searchQuery), limit);
     }
 
-    private List<SearchResult> searchResults(MailboxSession session, Stream<Mailbox> mailboxes, SearchQuery query) throws MailboxException {
-        return mailboxes.flatMap(mailbox -> getSearchResultStream(session, query, mailbox))
-            .collect(Guavate.toImmutableList());
+    private Flux<SearchResult> searchResults(MailboxSession session, Flux<Mailbox> mailboxes, SearchQuery query) throws MailboxException {
+        return mailboxes.concatMap(mailbox -> Flux.fromStream(getSearchResultStream(session, query, mailbox)));
     }
 
     private Stream<? extends SearchResult> getSearchResultStream(MailboxSession session, SearchQuery query, Mailbox mailbox) {
@@ -165,12 +164,10 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
         }
     }
 
-    private List<MessageId> getAsMessageIds(List<SearchResult> temp, long limit) {
-        return temp.stream()
-            .map(searchResult -> searchResult.getMessageId().get())
+    private Flux<MessageId> getAsMessageIds(Flux<SearchResult> temp, long limit) {
+        return temp.map(searchResult -> searchResult.getMessageId().get())
             .filter(SearchUtil.distinct())
-            .limit(Long.valueOf(limit).intValue())
-            .collect(Guavate.toImmutableList());
+            .take(Long.valueOf(limit).intValue());
     }
 
 }
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
index 30dedcf..03f5900 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
@@ -52,6 +52,8 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+
 public abstract class AbstractCombinationManagerTest {
 
     private static final int DEFAULT_MAXIMUM_LIMIT = 256;
@@ -163,7 +165,8 @@ public abstract class AbstractCombinationManagerTest {
 
         messageIdManager.setInMailboxes(messageId, ImmutableList.of(mailbox1.getMailboxId(), mailbox2.getMailboxId()), session);
 
-        assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)).containsOnly(messageId);
+        assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            .collectList().block()).containsOnly(messageId);
     }
 
     @Test
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
index d4dfaec..c8aef54 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
@@ -233,7 +233,7 @@ public abstract class MailboxMapperACLTest {
 
     @Test
     void findMailboxesShouldReturnEmptyWhenNone() throws MailboxException {
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer)).isEmpty();
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block()).isEmpty();
     }
 
     @Test
@@ -246,7 +246,7 @@ public abstract class MailboxMapperACLTest {
                 .rights(rights)
                 .asReplacement());
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read)).isEmpty();
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read).collectList().block()).isEmpty();
     }
 
     @Test
@@ -258,7 +258,7 @@ public abstract class MailboxMapperACLTest {
                 .rights(rights)
                 .asAddition());
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
             .containsOnly(benwaInboxMailbox);
     }
 
@@ -278,10 +278,10 @@ public abstract class MailboxMapperACLTest {
                 .rights(newRights)
                 .asReplacement());
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read).collectList().block())
             .containsOnly(benwaInboxMailbox);
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
             .isEmpty();
     }
 
@@ -302,7 +302,7 @@ public abstract class MailboxMapperACLTest {
                 .rights(new Rfc4314Rights())
                 .build());
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
             .isEmpty();
     }
 
@@ -321,7 +321,7 @@ public abstract class MailboxMapperACLTest {
                 .rights(initialRights)
                 .asRemoval());
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
             .isEmpty();
     }
 
@@ -336,7 +336,7 @@ public abstract class MailboxMapperACLTest {
                 .asReplacement());
         mailboxMapper.delete(benwaInboxMailbox);
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
             .isEmpty();
     }
 
@@ -349,9 +349,9 @@ public abstract class MailboxMapperACLTest {
             new MailboxACL.Entry(user1, new Rfc4314Rights(Right.Administer)),
             new MailboxACL.Entry(user2, new Rfc4314Rights(Right.Read))));
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer).collectList().block())
             .containsOnly(benwaInboxMailbox);
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Read))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Read).collectList().block())
             .containsOnly(benwaInboxMailbox);
     }
 
@@ -367,7 +367,7 @@ public abstract class MailboxMapperACLTest {
         mailboxMapper.setACL(benwaInboxMailbox, new MailboxACL(
             new MailboxACL.Entry(user2, new Rfc4314Rights(Right.Read))));
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer).collectList().block())
             .isEmpty();
     }
 
@@ -383,7 +383,7 @@ public abstract class MailboxMapperACLTest {
         mailboxMapper.setACL(benwaInboxMailbox, new MailboxACL(
             new MailboxACL.Entry(user2, new Rfc4314Rights(Right.Write))));
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Write))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Write).collectList().block())
             .containsOnly(benwaInboxMailbox);
     }
 
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java
index 3fdf808..998a862 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java
@@ -193,7 +193,8 @@ public abstract class MailboxMapperTest {
             .build()
             .asUserBound();
 
-        List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery);
+        List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery)
+            .collectList().block();
 
         assertMailboxes(mailboxes).containOnly(bobInboxMailbox);
     }
@@ -216,7 +217,8 @@ public abstract class MailboxMapperTest {
             .build()
             .asUserBound();
 
-        List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery);
+        List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery)
+            .collectList().block();
 
         assertMailboxes(mailboxes).containOnly(benwaWorkMailbox, benwaWorkDoneMailbox, benwaWorkTodoMailbox);
     }
@@ -230,7 +232,8 @@ public abstract class MailboxMapperTest {
             .build()
             .asUserBound();
 
-        List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery);
+        List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery)
+            .collectList().block();
 
         assertMailboxes(mailboxes).containOnly(benwaInboxMailbox);
     }
@@ -244,7 +247,8 @@ public abstract class MailboxMapperTest {
             .build()
             .asUserBound();
 
-        assertThat(mailboxMapper.findMailboxWithPathLike(mailboxQuery)).isEmpty();
+        assertThat(mailboxMapper.findMailboxWithPathLike(mailboxQuery)
+            .collectList().block()).isEmpty();
     }
 
     @Test
@@ -253,38 +257,6 @@ public abstract class MailboxMapperTest {
         Mailbox actual = mailboxMapper.findMailboxById(benwaInboxMailbox.getMailboxId());
         assertThat(actual).isEqualTo(benwaInboxMailbox);
     }
-
-    @Test
-    void findMailboxesByIdShouldReturnEmptyWhenNoIdSupplied() throws MailboxException {
-        createAll();
-
-        Stream<Mailbox> mailboxes = mailboxMapper.findMailboxesById(ImmutableList.of());
-
-        assertThat(mailboxes).isEmpty();
-    }
-
-    @Test
-    void findMailboxesByIdShouldReturnMailboxOfSuppliedId() throws MailboxException {
-        createAll();
-
-        Stream<Mailbox> mailboxes = mailboxMapper.findMailboxesById(ImmutableList.of(
-            benwaInboxMailbox.getMailboxId(),
-            benwaWorkMailbox.getMailboxId()));
-
-        assertThat(mailboxes).containsOnly(benwaWorkMailbox, benwaInboxMailbox);
-    }
-
-    @Test
-    void findMailboxesByIdShouldFilterOutNonExistingMailbox() throws MailboxException {
-        createAll();
-        mailboxMapper.delete(benwaWorkMailbox);
-
-        Stream<Mailbox> mailboxes = mailboxMapper.findMailboxesById(ImmutableList.of(
-            benwaInboxMailbox.getMailboxId(),
-            benwaWorkMailbox.getMailboxId()));
-
-        assertThat(mailboxes).containsOnly(benwaInboxMailbox);
-    }
     
     @Test
     void findMailboxByIdShouldFailWhenAbsent() throws MailboxException {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java
index d744e15..36cb715 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java
@@ -42,7 +42,7 @@ import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import com.google.common.collect.Lists;
+import reactor.core.publisher.Flux;
 
 class DefaultUserQuotaRootResolverTest {
 
@@ -92,7 +92,7 @@ class DefaultUserQuotaRootResolverTest {
     void retrieveAssociatedMailboxesShouldWork() throws Exception {
         MailboxMapper mockedMapper = mock(MailboxMapper.class);
         when(mockedFactory.getMailboxMapper(MAILBOX_SESSION)).thenReturn(mockedMapper);
-        when(mockedMapper.findMailboxWithPathLike(any())).thenReturn(Lists.newArrayList(MAILBOX, MAILBOX_2));
+        when(mockedMapper.findMailboxWithPathLike(any())).thenReturn(Flux.just(MAILBOX, MAILBOX_2));
 
         assertThat(testee.retrieveAssociatedMailboxes(QUOTA_ROOT, MAILBOX_SESSION)).containsOnly(MAILBOX, MAILBOX_2);
     }
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
index 8271602..6da8aa0 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
@@ -248,7 +248,8 @@ public abstract class AbstractMessageSearchIndexTest {
         List<MessageId> result = messageSearchIndex.search(session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(result)
             .hasSize(12)
@@ -280,7 +281,8 @@ public abstract class AbstractMessageSearchIndexTest {
         List<MessageId> result = messageSearchIndex.search(session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(result)
             .containsOnly(m1.getMessageId(),
@@ -316,7 +318,8 @@ public abstract class AbstractMessageSearchIndexTest {
         List<MessageId> result = messageSearchIndex.search(session,
             ImmutableList.of(mailbox2.getMailboxId(), mailbox.getMailboxId()),
             searchQuery,
-            limit);
+            limit)
+            .collectList().block();
 
         assertThat(result)
             .hasSize(limit);
@@ -329,7 +332,8 @@ public abstract class AbstractMessageSearchIndexTest {
         List<MessageId> result = messageSearchIndex.search(session,
             ImmutableList.of(),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(result)
             .isEmpty();
@@ -353,7 +357,8 @@ public abstract class AbstractMessageSearchIndexTest {
         List<MessageId> result = messageSearchIndex.search(session,
             ImmutableList.of(mailbox2.getMailboxId(), mailbox.getMailboxId()),
             searchQuery,
-            limit);
+            limit)
+            .collectList().block();
 
         assertThat(result)
                 .hasSize(limit);
@@ -549,7 +554,8 @@ public abstract class AbstractMessageSearchIndexTest {
             session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(actual).containsOnly(mOther.getMessageId(), m6.getMessageId());
     }
@@ -558,7 +564,8 @@ public abstract class AbstractMessageSearchIndexTest {
     void multimailboxSearchShouldReturnUidOfMessageMarkedAsSeenInOneMailbox() throws MailboxException {
         SearchQuery searchQuery = new SearchQuery(SearchQuery.flagIsSet(Flags.Flag.SEEN));
 
-        List<MessageId> actual = messageSearchIndex.search(session, ImmutableList.of(mailbox.getMailboxId()), searchQuery, LIMIT);
+        List<MessageId> actual = messageSearchIndex.search(session, ImmutableList.of(mailbox.getMailboxId()), searchQuery, LIMIT)
+            .collectList().block();
 
         assertThat(actual).containsOnly(m6.getMessageId());
     }
@@ -571,7 +578,8 @@ public abstract class AbstractMessageSearchIndexTest {
             session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(actual).containsOnly(mOther.getMessageId(), m8.getMessageId());
     }
@@ -584,7 +592,8 @@ public abstract class AbstractMessageSearchIndexTest {
             session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(actual).containsOnly(mOther.getMessageId(), m6.getMessageId());
     }
@@ -598,7 +607,8 @@ public abstract class AbstractMessageSearchIndexTest {
             session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            limit);
+            limit)
+            .collectList().block();
         // Two messages matches this query : mOther and m6
 
         assertThat(actual).hasSize(1);
@@ -614,7 +624,8 @@ public abstract class AbstractMessageSearchIndexTest {
             session,
             ImmutableList.of(otherMailbox.getMailboxId()),
             searchQuery,
-            limit);
+            limit)
+            .collectList().block();
 
         assertThat(actual).contains(m10.getMessageId());
     }
@@ -1408,7 +1419,8 @@ public abstract class AbstractMessageSearchIndexTest {
             session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(actual).containsOnly(m1.getMessageId(), m2.getMessageId(), m3.getMessageId(), m4.getMessageId(), m5.getMessageId(),
             m6.getMessageId(), m7.getMessageId(), m8.getMessageId(), m9.getMessageId(), mOther.getMessageId(), mailWithAttachment.getMessageId(), mailWithInlinedAttachment.getMessageId());
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java
index 2c992cb..1cf5ebb 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java
@@ -69,7 +69,7 @@ public class CassandraSchemaVersionStartUpCheck implements StartUpCheck {
     private CheckResult checkUpgradeAbleState() {
         String upgradeVersionMessage =
             String.format("Current schema version is %d. Recommended version is %d",
-                versionManager.computeVersion().getValue(),
+                versionManager.computeVersion().block().getValue(),
                 versionManager.getMaximumSupportedVersion().getValue());
         LOGGER.warn(upgradeVersionMessage);
         return CheckResult.builder()
@@ -93,7 +93,7 @@ public class CassandraSchemaVersionStartUpCheck implements StartUpCheck {
         String versionExceedMaximumSupportedMessage =
             String.format("Current schema version is %d whereas the maximum supported version is %d. " +
                 "Recommended version is %d.",
-                versionManager.computeVersion().getValue(),
+                versionManager.computeVersion().block().getValue(),
                 versionManager.getMaximumSupportedVersion().getValue(),
                 versionManager.getMaximumSupportedVersion().getValue());
         LOGGER.error(versionExceedMaximumSupportedMessage);
@@ -108,7 +108,7 @@ public class CassandraSchemaVersionStartUpCheck implements StartUpCheck {
         String versionToOldMessage =
             String.format("Current schema version is %d whereas minimum required version is %d. " +
                 "Recommended version is %d",
-                versionManager.computeVersion().getValue(),
+                versionManager.computeVersion().block().getValue(),
                 versionManager.getMinimumSupportedVersion().getValue(),
                 versionManager.getMaximumSupportedVersion().getValue());
         LOGGER.error(versionToOldMessage);
diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
index e272402..f9b866c 100644
--- a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
+++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
@@ -38,6 +38,8 @@ import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
 
+import reactor.core.publisher.Flux;
+
 public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
     private static class FakeMessageSearchIndexGroup extends Group {
 
@@ -80,7 +82,7 @@ public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
     }
 
     @Override
-    public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+    public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) {
         throw new NotImplementedException("not implemented");
     }
 
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
index 6dc233b..8167ace 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
@@ -97,7 +97,7 @@ public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTabl
         Preconditions.checkArgument(listSourcesSupportedType.contains(mapping.getType()),
             "Not supported mapping of type %s", mapping.getType());
 
-        if (versionManager.isBefore(MAPPINGS_SOURCES_SUPPORTED_VERSION)) {
+        if (versionManager.isBefore(MAPPINGS_SOURCES_SUPPORTED_VERSION).block()) {
             return super.listSources(mapping);
         }
 
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java
index 19c7503..cd31826 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.jmap.draft.methods;
 
+import static org.apache.james.util.ReactorUtils.context;
+
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -39,7 +41,6 @@ import org.apache.james.jmap.draft.utils.FilterToSearchQuery;
 import org.apache.james.jmap.draft.utils.SortConverter;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxId.Factory;
 import org.apache.james.mailbox.model.MultimailboxesSearchQuery;
@@ -47,10 +48,15 @@ import org.apache.james.mailbox.model.SearchQuery;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.util.MDCBuilder;
 
+import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 public class GetMessageListMethod implements Method {
 
     private static final long DEFAULT_POSITION = 0;
@@ -89,11 +95,18 @@ public class GetMessageListMethod implements Method {
     }
 
     @Override
-    public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+    public Flux<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
         Preconditions.checkArgument(request instanceof GetMessageListRequest);
 
         GetMessageListRequest messageListRequest = (GetMessageListRequest) request;
 
+        return metricFactory.runPublishingTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(),
+            () -> process(methodCallId, mailboxSession, messageListRequest)
+                .subscriberContext(context("GET_MESSAGE_LIST", mdc(messageListRequest))))
+            .subscribeOn(Schedulers.elastic());
+    }
+
+    private MDCBuilder mdc(GetMessageListRequest messageListRequest) {
         return MDCBuilder.create()
             .addContext(MDCBuilder.ACTION, "GET_MESSAGE_LIST")
             .addContext("accountId", messageListRequest.getAccountId())
@@ -105,38 +118,30 @@ public class GetMessageListMethod implements Method {
             .addContext("filters", messageListRequest.getFilter())
             .addContext("sorts", messageListRequest.getSort())
             .addContext("isFetchMessage", messageListRequest.isFetchMessages())
-            .addContext("isCollapseThread", messageListRequest.isCollapseThreads())
-            .wrapArround(
-                () -> metricFactory.runPublishingTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(),
-                    () -> process(methodCallId, mailboxSession, messageListRequest)))
-            .get();
+            .addContext("isCollapseThread", messageListRequest.isCollapseThreads());
     }
 
-    private Stream<JmapResponse> process(MethodCallId methodCallId, MailboxSession mailboxSession, GetMessageListRequest messageListRequest) {
-        GetMessageListResponse messageListResponse = getMessageListResponse(messageListRequest, mailboxSession);
-        Stream<JmapResponse> jmapResponse = Stream.of(JmapResponse.builder().methodCallId(methodCallId)
-            .response(messageListResponse)
-            .responseName(RESPONSE_NAME)
-            .build());
-        return Stream.concat(jmapResponse,
-            processGetMessages(messageListRequest, messageListResponse, methodCallId, mailboxSession));
+    private Flux<JmapResponse> process(MethodCallId methodCallId, MailboxSession mailboxSession, GetMessageListRequest messageListRequest) {
+        return getMessageListResponse(messageListRequest, mailboxSession)
+                .flatMapMany(messageListResponse -> Flux.concat(
+                    Mono.just(JmapResponse.builder().methodCallId(methodCallId)
+                        .response(messageListResponse)
+                        .responseName(RESPONSE_NAME)
+                        .build()),
+                    processGetMessages(messageListRequest, messageListResponse, methodCallId, mailboxSession)));
     }
 
-    private GetMessageListResponse getMessageListResponse(GetMessageListRequest messageListRequest, MailboxSession mailboxSession) {
-        GetMessageListResponse.Builder builder = GetMessageListResponse.builder();
-        try {
-            MultimailboxesSearchQuery searchQuery = convertToSearchQuery(messageListRequest);
-            Long postionValue = messageListRequest.getPosition().map(Number::asLong).orElse(DEFAULT_POSITION);
-            mailboxManager.search(searchQuery,
-                mailboxSession,
-                postionValue + messageListRequest.getLimit().map(Number::asLong).orElse(maximumLimit))
-                .stream()
-                .skip(postionValue)
-                .forEach(builder::messageId);
-            return builder.build();
-        } catch (MailboxException e) {
-            throw new RuntimeException(e);
-        }
+    private Mono<GetMessageListResponse> getMessageListResponse(GetMessageListRequest messageListRequest, MailboxSession mailboxSession) {
+        Mono<MultimailboxesSearchQuery> searchQuery = Mono.fromCallable(() -> convertToSearchQuery(messageListRequest))
+            .subscribeOn(Schedulers.parallel());
+        Long positionValue = messageListRequest.getPosition().map(Number::asLong).orElse(DEFAULT_POSITION);
+        long limit = positionValue + messageListRequest.getLimit().map(Number::asLong).orElse(maximumLimit);
+
+        return searchQuery
+            .flatMapMany(Throwing.function(query -> mailboxManager.search(query, mailboxSession, limit)))
+            .skip(positionValue)
+            .reduce(GetMessageListResponse.builder(), GetMessageListResponse.Builder::messageId)
+            .map(GetMessageListResponse.Builder::build);
     }
 
     private MultimailboxesSearchQuery convertToSearchQuery(GetMessageListRequest messageListRequest) {
@@ -174,15 +179,15 @@ public class GetMessageListMethod implements Method {
                 });
     }
     
-    private Stream<JmapResponse> processGetMessages(GetMessageListRequest messageListRequest, GetMessageListResponse messageListResponse, MethodCallId methodCallId, MailboxSession mailboxSession) {
+    private Flux<JmapResponse> processGetMessages(GetMessageListRequest messageListRequest, GetMessageListResponse messageListResponse, MethodCallId methodCallId, MailboxSession mailboxSession) {
         if (shouldChainToGetMessages(messageListRequest)) {
             GetMessagesRequest getMessagesRequest = GetMessagesRequest.builder()
                     .ids(messageListResponse.getMessageIds())
                     .properties(messageListRequest.getFetchMessageProperties())
                     .build();
-            return getMessagesMethod.processToStream(getMessagesRequest, methodCallId, mailboxSession);
+            return getMessagesMethod.process(getMessagesRequest, methodCallId, mailboxSession);
         }
-        return Stream.empty();
+        return Flux.empty();
     }
 
     private boolean shouldChainToGetMessages(GetMessageListRequest messageListRequest) {
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
index dca90fc..4abc540 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
@@ -49,6 +49,8 @@ import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.Iterables;
 
+import reactor.core.publisher.Flux;
+
 public class ReferenceUpdater {
     public static final String X_FORWARDED_ID_HEADER = "X-Forwarded-Message-Id";
     public static final Flags FORWARDED_FLAG = new Flags("$Forwarded");
@@ -90,7 +92,8 @@ public class ReferenceUpdater {
         MultimailboxesSearchQuery searchByRFC822MessageId = MultimailboxesSearchQuery
             .from(new SearchQuery(SearchQuery.mimeMessageID(messageId)))
             .build();
-        List<MessageId> references = mailboxManager.search(searchByRFC822MessageId, session, limit);
+        List<MessageId> references = Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit))
+            .collectList().block();
         try {
             MessageId reference = Iterables.getOnlyElement(references);
             List<MailboxId> mailboxIds = messageIdManager.getMessage(reference, FetchGroup.MINIMAL, session).stream()
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java
index 1679fd9..823867d 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java
@@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
+import reactor.core.publisher.Flux;
+
 /**
  * This mailet handles MDN messages and define a header X-JAMES-MDN-JMAP-MESSAGE-ID referencing
  * the original message (by its Jmap Id) asking for the recipient to send an MDN.
@@ -107,7 +109,7 @@ public class ExtractMDNOriginalJMAPMessageId extends GenericMailet {
             MultimailboxesSearchQuery searchByRFC822MessageId = MultimailboxesSearchQuery
                 .from(new SearchQuery(SearchQuery.mimeMessageID(messageId)))
                 .build();
-            return mailboxManager.search(searchByRFC822MessageId, session, limit).stream().findFirst();
+            return Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit)).toStream().findFirst();
         } catch (MailboxException | UsersRepositoryException e) {
             LOGGER.error("unable to find message with Message-Id: " + messageId, e);
         }
diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
index 5990161..3e7dcb7 100644
--- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
@@ -2262,7 +2262,8 @@ class DeletedMessagesVaultRoutesTest {
         MailboxSession session = mailboxManager.createSystemSession(username);
         int limitToOneMessage = 1;
 
-        return !mailboxManager.search(MultimailboxesSearchQuery.from(new SearchQuery()).build(), session, limitToOneMessage)
+        return !Flux.from(mailboxManager.search(MultimailboxesSearchQuery.from(new SearchQuery()).build(), session, limitToOneMessage))
+            .collectList().block()
             .isEmpty();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org