You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/16 02:11:55 UTC

[james-project] branch master updated (6507a5c -> 58af5c3)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git.


    from 6507a5c  JAMES-2888: add BadRequestException removed by rebase work
     new c4d5620  [ADR] Distributed blob garbage collector
     new 4babd0b  JAMES-3058 SolveMailboxInconbsistencies: Flux.merge leads to undeterministic results
     new 58af5c3  JAMES-3149 Reactive GetMessageList

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../versions/CassandraSchemaVersionManager.java    |  22 +-
 .../SessionWithInitializedTablesFactoryTest.java   |  14 +-
 .../CassandraSchemaVersionManagerTest.java         |   6 +-
 .../org/apache/james/mailbox/MailboxManager.java   |   2 +-
 .../apache/james/mailbox/MailboxManagerTest.java   |  49 ++-
 .../cassandra/mail/CassandraMailboxMapper.java     |  52 ++-
 .../cassandra/mail/CassandraMessageIdMapper.java   |  27 +-
 .../task/SolveMailboxInconsistenciesService.java   |   4 +-
 .../cassandra/mail/CassandraMailboxMapperTest.java |  93 +++--
 .../SolveMailboxInconsistenciesServiceTest.java    |   3 +-
 .../ElasticSearchListeningMessageSearchIndex.java  |   9 +-
 .../search/ElasticSearchSearcherTest.java          |   3 +-
 .../james/mailbox/jpa/mail/JPAMailboxMapper.java   |  16 +-
 .../jpa/mail/TransactionalMailboxMapper.java       |   5 +-
 .../lucene/search/LuceneMessageSearchIndex.java    |  10 +-
 .../LuceneMailboxMessageSearchIndexTest.java       |  12 +-
 .../mailbox/maildir/mail/MaildirMailboxMapper.java |  18 +-
 .../inmemory/mail/InMemoryMailboxMapper.java       |  18 +-
 .../james/vault/DeletedMessageVaultHook.java       |  14 +-
 .../james/mailbox/store/StoreMailboxManager.java   |  53 ++-
 .../james/mailbox/store/mail/MailboxMapper.java    |  26 +-
 .../store/quota/DefaultUserQuotaRootResolver.java  |   3 +-
 .../store/search/LazyMessageSearchIndex.java       |   4 +-
 .../mailbox/store/search/MessageSearchIndex.java   |   5 +-
 .../store/search/SimpleMessageSearchIndex.java     |  31 +-
 .../store/AbstractCombinationManagerTest.java      |   5 +-
 .../store/mail/model/MailboxMapperACLTest.java     |  24 +-
 .../store/mail/model/MailboxMapperTest.java        |  44 +-
 .../quota/DefaultUserQuotaRootResolverTest.java    |   4 +-
 .../search/AbstractMessageSearchIndexTest.java     |  36 +-
 .../CassandraSchemaVersionStartUpCheck.java        |   6 +-
 .../org/apache/james/FakeMessageSearchIndex.java   |   4 +-
 .../cassandra/CassandraRecipientRewriteTable.java  |   2 +-
 .../jmap/draft/methods/GetMessageListMethod.java   |  71 ++--
 .../james/jmap/draft/methods/ReferenceUpdater.java |   5 +-
 .../mailet/ExtractMDNOriginalJMAPMessageId.java    |   4 +-
 .../routes/DeletedMessagesVaultRoutesTest.java     |   3 +-
 src/adr/0029-distributed-blob-garbage-collector.md | 450 +++++++++++++++++++++
 38 files changed, 831 insertions(+), 326 deletions(-)
 create mode 100644 src/adr/0029-distributed-blob-garbage-collector.md


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 02/03: JAMES-3058 SolveMailboxInconbsistencies: Flux.merge leads to undeterministic results

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4babd0b48025f2595b7e936cb21bd6a0b5170892
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Apr 14 15:08:21 2020 +0700

    JAMES-3058 SolveMailboxInconbsistencies: Flux.merge leads to undeterministic results
    
    Depending on wether for a given mailbox we fix possible inconsistencies
    by path first or by id first, the result won't be the same.
    
    Flux merge interleave publishers leading to such undeterministic results
    while Flux.concat avoid such issues.
---
 .../cassandra/mail/task/SolveMailboxInconsistenciesService.java        | 2 +-
 .../cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java    | 3 ++-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
index 4777ed8..5aeb181 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
@@ -368,7 +368,7 @@ public class SolveMailboxInconsistenciesService {
 
     Mono<Result> fixMailboxInconsistencies(Context context) {
         assertValidVersion();
-        return Flux.merge(
+        return Flux.concat(
                 processMailboxDaoInconsistencies(context),
                 processMailboxPathDaoInconsistencies(context))
             .reduce(Result.COMPLETED, Task::combine);
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java
index 5b9b1ec..491dce4 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesServiceTest.java
@@ -202,6 +202,7 @@ class SolveMailboxInconsistenciesServiceTest {
         assertThat(context.snapshot())
             .isEqualTo(Context.builder()
                 .processedMailboxEntries(1)
+                .processedMailboxPathEntries(1)
                 .addFixedInconsistencies(MAILBOX.getMailboxId())
                 .build()
                 .snapshot());
@@ -253,7 +254,7 @@ class SolveMailboxInconsistenciesServiceTest {
         assertThat(context.snapshot())
             .isEqualTo(Context.builder()
                 .processedMailboxEntries(1)
-                .processedMailboxPathEntries(1)
+                .processedMailboxPathEntries(2)
                 .addFixedInconsistencies(CASSANDRA_ID_1)
                 .addConflictingEntry(ConflictingEntry.builder()
                     .mailboxDaoEntry(MAILBOX)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 01/03: [ADR] Distributed blob garbage collector

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c4d5620ecddceec23f499c42f9829265580502bb
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Tue Feb 18 13:43:30 2020 +0100

    [ADR] Distributed blob garbage collector
---
 src/adr/0029-distributed-blob-garbage-collector.md | 450 +++++++++++++++++++++
 1 file changed, 450 insertions(+)

diff --git a/src/adr/0029-distributed-blob-garbage-collector.md b/src/adr/0029-distributed-blob-garbage-collector.md
new file mode 100644
index 0000000..36b3bdc
--- /dev/null
+++ b/src/adr/0029-distributed-blob-garbage-collector.md
@@ -0,0 +1,450 @@
+# 29. Distributed blob garbage collector
+
+Date: 2020-02-18
+
+## Status
+
+Proposed
+
+## Context
+
+The body, headers, attachments of the mails are stored as blobs in a blob store.
+In order to save space in those stores, those blobs are de-duplicated using a hash of their content.
+To attain that the current blob store will read the content of the blob before saving it, and generate its id based on
+a hash of this content. This way two blobs with the same content will share the same id and thus be saved only once.
+This makes the safe deletion of one of those blobs a non trivial problem as we can't delete one blob without ensuring
+that all references to it are themselves deleted. For example if two messages share the same blob, when we delete
+one message there is at the time being no way to tell if the blob is still referenced by another message.
+
+## Decision
+
+To address this issue, we propose to implement a distributed blob garbage collector built upon the previously developed
+Distributed Task Manager.
+The de-duplicating blob store will keep track of the references pointing toward a blob in a `References` table.
+It will also keep track of the deletion requests for a blob in a `Deletions` table.
+When the garbage collector algorithm runs it will fetch from the `Deletions` table the blobs considered to be effectively deleted,
+and will check in the `References` table if there are still some references to them. If there is no more reference to a blob,
+it will be effectively deleted from the blob store.
+
+To avoid concurrency issues, where we could garbage collect a blob at the same time a new reference to it appear,
+a `reference generation` notion will be added. The de-duplicating id of the blobs which before where constructed
+using only the hash of their content,  will now include this `reference generation` too.
+At a given interval a new `reference generation` will be emitted, since then all new blobs will point to this new generation.
+
+So a `garbage collection iteration` will run only on the `reference generation` `n-2` to avoid concurrency issues.
+
+The switch of generation will be triggered by a task running on the distributed task manager. This task will
+emit an event into the event sourcing system to increment the `reference generation`.
+
+
+## Alternatives
+
+Not de-duplicating the blobs' content, this simple approach which involves storing the same
+blob a lot of times can in some scenario be really slow and costly. Albeit it can in some case be preferred for the sake of
+simplicity, data security...
+
+## Consequences
+
+This change will necessitate to extract the base blob store responsibilities (store a blob, delete a blob, read a blob)
+from the current blob store implementation which is doing the de-duplication, id generation...
+The garbage collector will use this low level blob store in order to effectively delete the blobs.
+
+One other consequence of this work, is the fact that there will be no  de-duplication on different `reference generation`,
+i.e two blobs with the same content will be stored twice now, if they were created during two different `reference generation`.
+
+When writing a blob into the de-duplicating blob store, we will need to specify the reference to the object (MessageId, AttachmentId...) we
+store the blob for. This can make some components harder to implement as we will have to propagate the references.
+
+Since we will not build a distributed task scheduler. To increment the `reference generation` and launch periodically a
+`garbage collection iteration`, the scheduling will be done by an external scheduler (cron job, kubernetes cronjob ...)
+ which will call a webadmin endpoint to launch this task periodically.
+
+## Algorithm visualisation
+
+### Generation 1 and Iteration 1
+
+ * Events
+   * `rg1` reference generation is emitted
+   * `gci1` garbage collection iteration is emitted
+   * An email is sent to `user1`, a `m1` message, and a blob `b1` are stored with `rg1`
+   * An email is sent to `user1` and `user2`, `m2` and `m3` messages, and a blob `b2` are stored with `rg1`
+
+#### Tables
+
+##### Generations
+
+| reference generation id |
+|-------------------------|
+| rg1                     |
+
+| garbage collection iteration id |
+|---------------------------------|
+| gci1                            |
+
+##### Blobs
+
+| blob id | reference generation id |
+|---------|-------------------------|
+| b1      | rg1                     |
+| b2      | rg1                     |
+
+##### References
+
+| message id | blob id | reference generation id |
+|------------|---------|-------------------------|
+| m1         | b1      | rg1                     |
+| m2         | b2      | rg1                     |
+| m3         | b2      | rg1                     |
+
+##### Deletions
+
+Empty
+
+### Generation 2 / Iteration 2
+
+ * Events
+   * `rg2` reference generation is emitted
+   * `gci2` garbage collection iteration is emitted
+   * An email is sent to `user1`, a `m4` message, and a blob `b3` are stored with `rg2`
+   * An email is sent to `user1` and `user2`, `m5` and `m6` messages, and a blob `b4` are stored with `rg2`
+
+#### Tables
+
+##### Generations
+
+
+| reference generation id |
+|-------------------------|
+| rg1                     |
+| rg2                     |
+
+| garbage collection iteration id |
+|---------------------------------|
+| gci1                            |
+| gci2                            |
+
+##### Blobs
+
+| blob id | reference generation id |
+|---------|-------------------------|
+| b1      | rg1                     |
+| b2      | rg1                     |
+| b3      | rg2                     |
+| b4      | rg2                     |
+
+##### References
+
+| message id | blob id | reference generation id |
+|------------|---------|-------------------------|
+| m1         | b1      | rg1                     |
+| m2         | b2      | rg1                     |
+| m3         | b2      | rg1                     |
+| m4         | b3      | rg2                     |
+| m5         | b4      | rg2                     |
+| m6         | b4      | rg2                     |
+
+##### Deletions
+
+Empty
+
+### Generation 3 / Iteration 3
+
+ * Events
+   * `rg3` reference generation is emitted
+   * `gci3` garbage collection iteration is emitted
+   * An email is sent to `user1`, a `m7` message, and a blob `b5` are stored with `rg3`
+   * An email is sent to `user1` and `user2`, `m8` and `m9` messages, and a blob `b6` are stored with `rg3`
+   * `user1` deletes `m1`, `m2`, `m7`, and `m8` with `gi3`
+   * `user2` deletes `m3` with `gi3`
+
+#### Tables: before deletions
+
+##### Generations
+
+| reference generation id |
+|-------------------------|
+| rg1                     |
+| rg2                     |
+| rg3                     |
+
+| garbage collection iteration id |
+|---------------------------------|
+| gci1                            |
+| gci2                            |
+| gci3                            |
+
+##### Blobs
+
+| blob id | reference generation id |
+|---------|-------------------------|
+| b1      | rg1                     |
+| b2      | rg1                     |
+| b3      | rg2                     |
+| b4      | rg2                     |
+| b5      | rg3                     |
+| b6      | rg3                     |
+
+##### References
+
+| message id | blob id | reference generation id |
+|------------|---------|-------------------------|
+| m1         | b1      | rg1                     |
+| m2         | b2      | rg1                     |
+| m3         | b2      | rg1                     |
+| m4         | b3      | rg2                     |
+| m5         | b4      | rg2                     |
+| m6         | b4      | rg2                     |
+| m7         | b5      | rg3                     |
+| m8         | b6      | rg3                     |
+| m9         | b6      | rg3                     |
+
+##### Deletions
+
+Empty
+
+
+#### Tables: after deletions
+
+
+##### Generations
+
+| reference generation id |
+|-------------------------|
+| rg1                     |
+| rg2                     |
+| rg3                     |
+
+| garbage collection iteration id |
+|---------------------------------|
+| gci1                            |
+| gci2                            |
+| gci3                            |
+
+##### Blobs
+
+| blob id | reference generation id |
+|---------|-------------------------|
+| b1      | rg1                     |
+| b2      | rg1                     |
+| b3      | rg2                     |
+| b4      | rg2                     |
+| b5      | rg3                     |
+| b6      | rg3                     |
+
+##### References
+
+| message id | blob id | reference generation id |
+|------------|---------|-------------------------|
+| m4         | b3      | rg2                     |
+| m5         | b4      | rg2                     |
+| m6         | b4      | rg2                     |
+| m9         | b6      | rg3                     |
+
+##### Deletions
+
+| blob id | reference generation id | date  | garbage collection iteration id |
+|---------|-------------------------|-------|---------------------------------|
+| b1      | rg1                     | 10:42 | gci3                            |
+| b2      | rg1                     | 10:42 | gci3                            |
+| b2      | rg1                     | 13:37 | gci3                            |
+| b5      | rg3                     | 10:42 | gci3                            |
+| b6      | rg3                     | 10:42 | gci3                            |
+
+#### Running the algorithm
+
+ * fetch `Deletions` for `gci3` in `deletions`
+ * find distinct `reference-generation-id` of `deletions` in `generations = {rg1, rg3}`
+ * For each generation
+   * *rg1*
+     * filter `deletions` to keep only `rg1` entries and extract `blob-ids` in `concernedBlobs = {b1, b2}`
+     * fetch all references to `concernedBlobs` and build a Bloom-Filter in `foundedReferences = {}`
+     * filter `concernedBlobs` to keep only those which are not present in `foundedReferences` in `blobsToDelete = {b1, b2}`
+     * Remove `blobsToDelete` from `Blobs` and `Deletions`
+   * *rg3*
+     * filter `deletions` to keep only `rg3` entries and extract `blob-ids` in `concernedBlobs = {b5, b6}`
+     * fetch all references to `concernedBlobs` and build a Bloom-Filter in `foundedReferences = {b6}`
+     * filter `concernedBlobs` to keep only those which are not present in `foundedReferences` in `blobsToDelete = {b5}`
+     * Remove `blobsToDelete` from `Blobs` and `Deletions`
+
+
+#### Tables: after garbage collection
+
+##### Generations
+
+
+| reference generation id |
+|-------------------------|
+| rg1                     |
+| rg2                     |
+| rg3                     |
+
+| garbage collection iteration id |
+|---------------------------------|
+| gci1                            |
+| gci2                            |
+| gci3                            |
+
+##### Blobs
+
+| blob id | reference generation id |
+|---------|-------------------------|
+| b3      | rg2                     |
+| b4      | rg2                     |
+| b6      | rg3                     |
+
+##### References
+
+| message id | blob id | generation id |
+|------------|---------|---------------|
+| m4         | b3      | g2            |
+| m5         | b4      | g2            |
+| m6         | b4      | g2            |
+| m9         | b6      | g3            |
+
+##### Deletions
+
+| blob id | reference generation id | date  | garbage collection iteration id |
+|---------|-------------------------|-------|---------------------------------|
+| b6      | rg3                     | 10:42 | gci3                            |
+
+### Generations 4
+
+ * Events
+   * `rg4` reference generation is emitted
+   * `gci4` garbage collection iteration is emitted
+   * `user2` deletes `m9` with `gcg4`
+
+#### Tables: before deletions
+
+##### Generations
+
+
+| reference generation id |
+|-------------------------|
+| rg1                     |
+| rg2                     |
+| rg3                     |
+| rg4                     |
+
+| garbage collection iteration id |
+|---------------------------------|
+| gci1                            |
+| gci2                            |
+| gci3                            |
+| gci4                            |
+
+##### Blobs
+
+| blob id | reference generation id |
+|---------|-------------------------|
+| b3      | rg2                     |
+| b4      | rg2                     |
+| b6      | rg3                     |
+
+##### References
+
+| message id | blob id | reference generation id |
+|------------|---------|-------------------------|
+| m4         | b3      | rg2                     |
+| m5         | b4      | rg2                     |
+| m6         | b4      | rg2                     |
+| m9         | b6      | rg3                     |
+
+##### Deletions
+
+| blob id | reference generation id | date  | garbage collection iteration id |
+|---------|-------------------------|-------|---------------------------------|
+| b6      | rg3                     | 10:42 | gci3                            |
+
+#### Tables: after deletions
+
+##### Generations
+
+| reference generation id |
+|-------------------------|
+| rg1                     |
+| rg2                     |
+| rg3                     |
+| rg4                     |
+
+| garbage collection iteration id |
+|---------------------------------|
+| gci1                            |
+| gci2                            |
+| gci3                            |
+| gci4                            |
+
+##### Blobs
+
+| blob id | reference generation id |
+|---------|-------------------------|
+| b3      | rg2                     |
+| b4      | rg2                     |
+| b6      | rg3                     |
+
+##### References
+
+| message id | blob id | reference generation id |
+|------------|---------|-------------------------|
+| m4         | b3      | rg2                     |
+| m5         | b4      | rg2                     |
+| m6         | b4      | rg2                     |
+
+##### Deletions
+
+| blob id | reference generation id | date  | garbage collection iteration id |
+|---------|-------------------------|-------|---------------------------------|
+| b6      | rg3                     | 10:42 | gci3                            |
+| b6      | rg3                     | 18:42 | gci4                            |                  |
+
+#### Running the algorithm
+
+ * fetch `Deletions` for `gci4` in `deletions`
+ * find distinct `generation-id` of `deletions` in `generations = {rg3}`
+ * For each generation
+   * *rg3*
+     * filter `deletions` to keep only `rg3` entries and extract `blob-ids` in `concernedBlobs = {b6}`
+     * fetch all references to `concernedBlobs` and build a Bloom-Filter in `foundedReferences = {}`
+     * filter `concernedBlobs` to keep only those which are not present in `foundedReferences` in `blobsToDelete = {b6}`
+     * Remove `blobsToDelete` from `Blobs` and `Deletions`
+
+
+#### Tables: after garbage collection
+
+##### Generations
+
+
+| reference generation id |
+|-------------------------|
+| rg1                     |
+| rg2                     |
+| rg3                     |
+| rg4                     |
+
+| garbage collection iteration id |
+|---------------------------------|
+| gci1                            |
+| gci2                            |
+| gci3                            |
+| gci4                            |
+
+##### Blobs
+
+| blob id | reference generation id |
+|---------|-------------------------|
+| b3      | rg2                     |
+| b4      | rg2                     |
+
+##### References
+
+| message id | blob id | reference generation id |
+|------------|---------|-------------------------|
+| m4         | b3      | rg2                     |
+| m5         | b4      | rg2                     |
+| m6         | b4      | rg2                     |
+
+##### Deletions
+
+Empty
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[james-project] 03/03: JAMES-3149 Reactive GetMessageList

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 58af5c33e911053e9971ff142b26ff640a95a518
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Apr 5 23:50:42 2020 +0700

    JAMES-3149 Reactive GetMessageList
---
 .../versions/CassandraSchemaVersionManager.java    | 22 ++---
 .../SessionWithInitializedTablesFactoryTest.java   | 14 ++--
 .../CassandraSchemaVersionManagerTest.java         |  6 +-
 .../org/apache/james/mailbox/MailboxManager.java   |  2 +-
 .../apache/james/mailbox/MailboxManagerTest.java   | 49 ++++++++----
 .../cassandra/mail/CassandraMailboxMapper.java     | 52 ++++++------
 .../cassandra/mail/CassandraMessageIdMapper.java   | 27 +++++--
 .../task/SolveMailboxInconsistenciesService.java   |  2 +-
 .../cassandra/mail/CassandraMailboxMapperTest.java | 93 ++++++++++++++--------
 .../ElasticSearchListeningMessageSearchIndex.java  |  9 +--
 .../search/ElasticSearchSearcherTest.java          |  3 +-
 .../james/mailbox/jpa/mail/JPAMailboxMapper.java   | 16 ++--
 .../jpa/mail/TransactionalMailboxMapper.java       |  5 +-
 .../lucene/search/LuceneMessageSearchIndex.java    | 10 ++-
 .../LuceneMailboxMessageSearchIndexTest.java       | 12 ++-
 .../mailbox/maildir/mail/MaildirMailboxMapper.java | 18 ++---
 .../inmemory/mail/InMemoryMailboxMapper.java       | 18 ++---
 .../james/vault/DeletedMessageVaultHook.java       | 14 ++--
 .../james/mailbox/store/StoreMailboxManager.java   | 53 ++++++------
 .../james/mailbox/store/mail/MailboxMapper.java    | 26 +++---
 .../store/quota/DefaultUserQuotaRootResolver.java  |  3 +-
 .../store/search/LazyMessageSearchIndex.java       |  4 +-
 .../mailbox/store/search/MessageSearchIndex.java   |  5 +-
 .../store/search/SimpleMessageSearchIndex.java     | 31 ++++----
 .../store/AbstractCombinationManagerTest.java      |  5 +-
 .../store/mail/model/MailboxMapperACLTest.java     | 24 +++---
 .../store/mail/model/MailboxMapperTest.java        | 44 ++--------
 .../quota/DefaultUserQuotaRootResolverTest.java    |  4 +-
 .../search/AbstractMessageSearchIndexTest.java     | 36 ++++++---
 .../CassandraSchemaVersionStartUpCheck.java        |  6 +-
 .../org/apache/james/FakeMessageSearchIndex.java   |  4 +-
 .../cassandra/CassandraRecipientRewriteTable.java  |  2 +-
 .../jmap/draft/methods/GetMessageListMethod.java   | 71 +++++++++--------
 .../james/jmap/draft/methods/ReferenceUpdater.java |  5 +-
 .../mailet/ExtractMDNOriginalJMAPMessageId.java    |  4 +-
 .../routes/DeletedMessagesVaultRoutesTest.java     |  3 +-
 36 files changed, 378 insertions(+), 324 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
index 9b47f39..516fbd3 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java
@@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraSchemaVersionManager {
     public static final SchemaVersion MIN_VERSION = new SchemaVersion(5);
     public static final SchemaVersion MAX_VERSION = new SchemaVersion(7);
@@ -65,24 +67,26 @@ public class CassandraSchemaVersionManager {
         this.minVersion = minVersion;
         this.maxVersion = maxVersion;
 
-        this.initialSchemaVersion = computeVersion();
+        this.initialSchemaVersion = computeVersion().block();
     }
 
-    public boolean isBefore(SchemaVersion minimum) {
-        return initialSchemaVersion.isBefore(minimum)
+    public Mono<Boolean> isBefore(SchemaVersion minimum) {
+        if (initialSchemaVersion.isBefore(minimum)) {
             // If we started with a legacy james then maybe schema version had been updated since then
-            && computeVersion().isBefore(minimum);
+            return computeVersion()
+                .map(computedVersion -> computedVersion.isBefore(minimum));
+        }
+        return Mono.just(false);
     }
 
-    public SchemaVersion computeVersion() {
+    public Mono<SchemaVersion> computeVersion() {
         return schemaVersionDAO
             .getCurrentSchemaVersion()
-            .block()
-            .orElseGet(() -> {
+            .map(maybeVersion -> maybeVersion.orElseGet(() -> {
                 LOGGER.warn("No schema version information found on Cassandra, we assume schema is at version {}",
                     CassandraSchemaVersionManager.DEFAULT_VERSION);
                 return DEFAULT_VERSION;
-            });
+            }));
     }
 
     public SchemaVersion getMinimumSupportedVersion() {
@@ -94,7 +98,7 @@ public class CassandraSchemaVersionManager {
     }
 
     public SchemaState computeSchemaState() {
-        SchemaVersion version = computeVersion();
+        SchemaVersion version = computeVersion().block();
         if (version.isBefore(minVersion)) {
             return TOO_OLD;
         } else if (version.isBefore(maxVersion)) {
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java
index 3c3345f..817fe85 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java
@@ -83,39 +83,39 @@ class SessionWithInitializedTablesFactoryTest {
 
     @Test
     void createSessionShouldSetTheLatestSchemaVersionWhenCreatingTypesAndTables() {
-        assertThat(versionManager(testee.get()).computeVersion())
+        assertThat(versionManager(testee.get()).computeVersion().block())
                 .isEqualTo(MAX_VERSION);
     }
 
     @Test
     void createSessionShouldKeepTheSetSchemaVersionWhenTypesAndTablesHaveNotChanged() {
         Session session = testee.get();
-        assertThat(versionManager(session).computeVersion())
+        assertThat(versionManager(session).computeVersion().block())
                 .isEqualTo(MAX_VERSION);
 
         new CassandraTableManager(MODULE, session).clearAllTables();
         versionManagerDAO(session).updateVersion(MIN_VERSION);
-        assertThat(versionManager(session).computeVersion())
+        assertThat(versionManager(session).computeVersion().block())
                 .isEqualTo(MIN_VERSION);
 
-        assertThat(versionManager(testee.get()).computeVersion())
+        assertThat(versionManager(testee.get()).computeVersion().block())
                 .isEqualTo(MIN_VERSION);
     }
 
     @Test
     void createSessionShouldKeepTheSetSchemaVersionWhenTypesAndTablesHavePartiallyChanged() {
         Session session = testee.get();
-        assertThat(versionManager(session).computeVersion())
+        assertThat(versionManager(session).computeVersion().block())
                 .isEqualTo(MAX_VERSION);
 
         new CassandraTableManager(MODULE, session).clearAllTables();
         versionManagerDAO(session).updateVersion(MIN_VERSION);
-        assertThat(versionManager(session).computeVersion())
+        assertThat(versionManager(session).computeVersion().block())
                 .isEqualTo(MIN_VERSION);
         session.execute(SchemaBuilder.dropTable(TABLE_NAME));
         session.execute(SchemaBuilder.dropType(TYPE_NAME));
 
-        assertThat(versionManager(testee.get()).computeVersion())
+        assertThat(versionManager(testee.get()).computeVersion().block())
                 .isEqualTo(MIN_VERSION);
     }
 
diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java
index 64c498e..4fb33fb 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java
@@ -74,7 +74,7 @@ class CassandraSchemaVersionManagerTest {
             minVersion,
             maxVersion);
 
-        assertThat(testee.isBefore(maxVersion)).isTrue();
+        assertThat(testee.isBefore(maxVersion).block()).isTrue();
     }
 
     @Test
@@ -89,7 +89,7 @@ class CassandraSchemaVersionManagerTest {
             minVersion,
             maxVersion);
 
-        assertThat(testee.isBefore(maxVersion)).isFalse();
+        assertThat(testee.isBefore(maxVersion).block()).isFalse();
     }
 
     @Test
@@ -107,7 +107,7 @@ class CassandraSchemaVersionManagerTest {
         when(schemaVersionDAO.getCurrentSchemaVersion())
             .thenReturn(Mono.just(Optional.of(maxVersion)));
 
-        assertThat(testee.isBefore(maxVersion)).isFalse();
+        assertThat(testee.isBefore(maxVersion).block()).isFalse();
     }
 
     @Test
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
index 6b83d9f..9f31163 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
@@ -251,7 +251,7 @@ public interface MailboxManager extends RequestAware, RightManager, MailboxAnnot
      * @param session
      *            the context for this call, not null
      */
-    List<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException;
+    Publisher<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException;
 
     /**
      * Does the given mailbox exist?
diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
index 5397ed9..f51c124 100644
--- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
+++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java
@@ -94,6 +94,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -1208,7 +1209,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .build();
 
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsOnly(cacahueteMessageId, pirouetteMessageId);
         }
 
@@ -1237,7 +1239,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .from(new SearchQuery())
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsOnly(messageId);
         }
 
@@ -1264,7 +1267,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .from(new SearchQuery())
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .isEmpty();
         }
 
@@ -1284,7 +1288,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .from(new SearchQuery())
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .isEmpty();
         }
 
@@ -1305,7 +1310,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .inMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .isEmpty();
         }
 
@@ -1325,7 +1331,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .notInMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .isEmpty();
         }
 
@@ -1346,7 +1353,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .notInMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .isEmpty();
         }
 
@@ -1375,7 +1383,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .inMailboxes(searchedMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId);
         }
 
@@ -1850,9 +1859,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .inMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .isEmpty();
-            assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId1, messageId2);
         }
 
@@ -1889,9 +1900,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .inMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId2);
-            assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(composedMessageId1.getMessageId());
         }
 
@@ -1979,9 +1992,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .inMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId1, messageId2);
-            assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId1, messageId2);
         }
 
@@ -2019,9 +2034,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> {
                 .inMailboxes(otherMailboxId)
                 .build();
 
-            assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId1, messageId2);
-            assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT))
+                .collectList().block())
                 .containsExactly(messageId1);
         }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
index d74ce1c..5a15d9e 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
@@ -20,9 +20,7 @@
 package org.apache.james.mailbox.cassandra.mail;
 
 import java.time.Duration;
-import java.util.Collection;
 import java.util.List;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -82,7 +80,7 @@ public class CassandraMailboxMapper implements MailboxMapper {
         this.versionManager = versionManager;
     }
 
-    private boolean needMailboxPathV1Support() {
+    private Mono<Boolean> needMailboxPathV1Support() {
         return versionManager.isBefore(MAILBOX_PATH_V_2_MIGRATION_PERFORMED_VERSION);
     }
 
@@ -96,12 +94,15 @@ public class CassandraMailboxMapper implements MailboxMapper {
     }
 
     private Flux<Void> deletePath(Mailbox mailbox) {
-        if (needMailboxPathV1Support()) {
-            return Flux.merge(
-                mailboxPathDAO.delete(mailbox.generateAssociatedPath()),
-                mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()));
-        }
-        return Flux.from(mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()));
+        return needMailboxPathV1Support()
+            .flatMapMany(needSupport -> {
+                if (needSupport) {
+                    return Flux.merge(
+                        mailboxPathDAO.delete(mailbox.generateAssociatedPath()),
+                        mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()));
+                }
+                return Flux.from(mailboxPathV2DAO.delete(mailbox.generateAssociatedPath()));
+            });
     }
 
     @Override
@@ -144,11 +145,9 @@ public class CassandraMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public Stream<Mailbox> findMailboxesById(Collection<MailboxId> mailboxIds) {
-        return Flux.fromIterable(mailboxIds)
-            .map(CassandraId.class::cast)
-            .concatMap(this::retrieveMailbox)
-            .toStream();
+    public Mono<Mailbox> findMailboxByIdReactive(MailboxId id) {
+        CassandraId mailboxId = (CassandraId) id;
+        return retrieveMailbox(mailboxId);
     }
 
     private Mono<Mailbox> retrieveMailbox(CassandraId mailboxId) {
@@ -164,24 +163,25 @@ public class CassandraMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
+    public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
         String fixedNamespace = query.getFixedNamespace();
         Username fixedUser = query.getFixedUser();
 
         return listPaths(fixedNamespace, fixedUser)
             .filter(idAndPath -> query.isPathMatch(idAndPath.getMailboxPath()))
             .distinct(CassandraIdAndPath::getMailboxPath)
-            .concatMap(this::retrieveMailbox)
-            .collectList()
-            .block();
+            .concatMap(this::retrieveMailbox);
     }
 
     private Flux<CassandraIdAndPath> listPaths(String fixedNamespace, Username fixedUser) {
-        if (needMailboxPathV1Support()) {
-            return Flux.concat(mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser),
-                mailboxPathDAO.listUserMailboxes(fixedNamespace, fixedUser));
-        }
-        return mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser);
+        return needMailboxPathV1Support()
+            .flatMapMany(needSupport -> {
+                if (needSupport) {
+                    return Flux.concat(mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser),
+                        mailboxPathDAO.listUserMailboxes(fixedNamespace, fixedUser));
+                }
+                return mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser);
+            });
     }
 
     private Mono<Mailbox> retrieveMailbox(CassandraIdAndPath idAndPath) {
@@ -302,12 +302,10 @@ public class CassandraMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
+    public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
         return userMailboxRightsDAO.listRightsForUser(userName)
             .filter(mailboxId -> mailboxId.getRight().contains(right))
             .map(Pair::getLeft)
-            .flatMap(this::retrieveMailbox)
-            .collectList()
-            .block();
+            .flatMap(this::retrieveMailbox);
     }
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 47cd80b..107e37b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -32,6 +32,7 @@ import org.apache.james.mailbox.MessageManager;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.exception.MailboxNotFoundException;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MailboxId;
@@ -48,6 +49,7 @@ import org.apache.james.util.streams.Limit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.fge.lambdas.runnable.ThrowingRunnable;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.Multimap;
 
@@ -129,20 +131,31 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     @Override
     public void save(MailboxMessage mailboxMessage) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
-        mailboxMapper.findMailboxById(mailboxId);
-
-        messageDAO.save(mailboxMessage)
+        unbox(() -> mailboxMapper.findMailboxByIdReactive(mailboxId)
+            .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailboxId)))
+            .then(messageDAO.save(mailboxMessage))
             .thenEmpty(saveMessageMetadata(mailboxMessage, mailboxId))
-            .block();
+            .block());
     }
 
     @Override
     public void copyInMailbox(MailboxMessage mailboxMessage) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
-        mailboxMapper.findMailboxById(mailboxId);
+        unbox(() -> mailboxMapper.findMailboxByIdReactive(mailboxId)
+            .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailboxId)))
+            .then(saveMessageMetadata(mailboxMessage, mailboxId))
+            .block());
+    }
 
-        saveMessageMetadata(mailboxMessage, mailboxId)
-            .block();
+    private void unbox(ThrowingRunnable runnable) throws MailboxNotFoundException {
+        try {
+            runnable.run();
+        } catch (RuntimeException e) {
+            if (e.getCause() instanceof MailboxNotFoundException) {
+                throw (MailboxNotFoundException) e.getCause();
+            }
+            throw e;
+        }
     }
 
     private Mono<Void> saveMessageMetadata(MailboxMessage mailboxMessage, CassandraId mailboxId) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
index 5aeb181..a10fc6d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
@@ -375,7 +375,7 @@ public class SolveMailboxInconsistenciesService {
     }
 
     private void assertValidVersion() {
-        SchemaVersion version = versionManager.computeVersion();
+        SchemaVersion version = versionManager.computeVersion().block();
 
         boolean isVersionValid = version.isAfterOrEquals(MAILBOX_PATH_V_2_MIGRATION_PERFORMED_VERSION);
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
index f6d1dc2..f8e6839 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
@@ -23,8 +23,6 @@ import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
 import static org.apache.james.mailbox.model.MailboxAssertingTool.softly;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
 
 import java.util.List;
 
@@ -61,7 +59,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.github.fge.lambdas.Throwing;
 import com.github.fge.lambdas.runnable.ThrowingRunnable;
-import reactor.core.publisher.Mono;
 
 class CassandraMailboxMapperTest {
     private static final UidValidity UID_VALIDITY = UidValidity.of(52);
@@ -162,7 +159,8 @@ class CassandraMailboxMapperTest {
                     softly(softly)
                         .assertThat(testee.findMailboxByPath(inboxPathRenamed).block())
                         .isEqualTo(inboxRenamed);
-                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                        .collectList().block())
                         .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                             .assertThat(searchMailbox)
                             .isEqualTo(inboxRenamed));
@@ -189,7 +187,8 @@ class CassandraMailboxMapperTest {
                     softly(softly)
                         .assertThat(testee.findMailboxByPath(inboxPathRenamed).block())
                         .isEqualTo(inboxRenamed);
-                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                        .collectList().block())
                         .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                             .assertThat(searchMailbox)
                             .isEqualTo(inboxRenamed));
@@ -212,7 +211,8 @@ class CassandraMailboxMapperTest {
                     softly(softly)
                         .assertThat(testee.findMailboxByPath(inboxPath).block())
                         .isEqualTo(inbox);
-                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                        .collectList().block())
                         .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                             .assertThat(searchMailbox)
                             .isEqualTo(inbox));
@@ -235,7 +235,8 @@ class CassandraMailboxMapperTest {
                         .isInstanceOf(MailboxNotFoundException.class);
                     softly.assertThat(testee.findMailboxByPath(inboxPath).blockOptional())
                         .isEmpty();
-                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                    softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                        .collectList().block())
                         .isEmpty();
                 }));
             }
@@ -253,9 +254,11 @@ class CassandraMailboxMapperTest {
             SoftAssertions.assertSoftly(softly -> {
                 softly.assertThat(testee.findMailboxByPath(inboxPath).blockOptional())
                     .isEmpty();
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .isEmpty();
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .isEmpty();
             });
         }
@@ -282,7 +285,8 @@ class CassandraMailboxMapperTest {
                 softly(softly)
                     .assertThat(testee.findMailboxByPath(inboxPath).block())
                     .isEqualTo(inbox);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -304,7 +308,8 @@ class CassandraMailboxMapperTest {
             doQuietly(() -> testee.rename(inboxRenamed));
 
             SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -328,7 +333,8 @@ class CassandraMailboxMapperTest {
             SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
                 softly.assertThatThrownBy(() -> testee.findMailboxByPath(inboxPathRenamed))
                     .isInstanceOf(MailboxNotFoundException.class);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)
+                    .collectList().block())
                     .isEmpty();
             }));
         }
@@ -353,7 +359,8 @@ class CassandraMailboxMapperTest {
                 softly(softly)
                     .assertThat(testee.findMailboxByPath(inboxPath).block())
                     .isEqualTo(inbox);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -375,7 +382,8 @@ class CassandraMailboxMapperTest {
             doQuietly(() -> testee.rename(inboxRenamed));
 
             SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -399,7 +407,8 @@ class CassandraMailboxMapperTest {
             SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
                 softly.assertThatThrownBy(() -> testee.findMailboxByPath(inboxPathRenamed))
                     .isInstanceOf(MailboxNotFoundException.class);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)
+                    .collectList().block())
                     .isEmpty();
             }));
         }
@@ -422,11 +431,13 @@ class CassandraMailboxMapperTest {
                     .doesNotThrowAnyException();
                 softly.assertThatCode(() -> testee.findMailboxByPath(inboxPath))
                     .doesNotThrowAnyException();
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -468,11 +479,13 @@ class CassandraMailboxMapperTest {
                 softly(softly)
                     .assertThat(testee.findMailboxByPath(inboxPath).block())
                     .isEqualTo(inbox);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -495,11 +508,13 @@ class CassandraMailboxMapperTest {
                 softly(softly)
                     .assertThat(testee.findMailboxByPath(inboxPath).block())
                     .isEqualTo(inbox);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inbox));
@@ -525,9 +540,11 @@ class CassandraMailboxMapperTest {
                     .isInstanceOf(MailboxNotFoundException.class);
                     softly.assertThat(testee.findMailboxByPath(inboxPath).blockOptional())
                         .isEmpty();
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .isEmpty();
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .isEmpty();
             }));
         }
@@ -556,13 +573,16 @@ class CassandraMailboxMapperTest {
                 softly(softly)
                     .assertThat(testee.findMailboxByPath(inboxPathRenamed).block())
                     .isEqualTo(inboxRenamed);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .isEmpty();
-                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inboxRenamed));
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inboxRenamed));
@@ -592,14 +612,17 @@ class CassandraMailboxMapperTest {
                 softly(softly)
                     .assertThat(testee.findMailboxByPath(inboxPathRenamed).block())
                     .isEqualTo(inboxRenamed);
-                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)
+                    .collectList().block())
                     .isEmpty();
-                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox ->
                         softly(softly)
                             .assertThat(searchMailbox)
                             .isEqualTo(inboxRenamed));
-                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery))
+                softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)
+                    .collectList().block())
                     .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly)
                         .assertThat(searchMailbox)
                         .isEqualTo(inboxRenamed));
@@ -763,7 +786,8 @@ class CassandraMailboxMapperTest {
             .username(USER)
             .expression(Wildcard.INSTANCE)
             .build()
-            .asUserBound());
+            .asUserBound())
+            .collectList().block();
 
         assertThat(mailboxes).containsOnly(MAILBOX);
     }
@@ -782,7 +806,8 @@ class CassandraMailboxMapperTest {
             .username(USER)
             .expression(Wildcard.INSTANCE)
             .build()
-            .asUserBound());
+            .asUserBound())
+            .collectList().block();
 
         assertThat(mailboxes).containsOnly(MAILBOX);
     }
@@ -799,7 +824,8 @@ class CassandraMailboxMapperTest {
             .username(USER)
             .expression(Wildcard.INSTANCE)
             .build()
-            .asUserBound());
+            .asUserBound())
+            .collectList().block();
 
         assertThat(mailboxes).containsOnly(MAILBOX);
     }
@@ -877,7 +903,8 @@ class CassandraMailboxMapperTest {
                 .username(USER)
                 .expression(Wildcard.INSTANCE)
                 .build()
-                .asUserBound()))
+                .asUserBound())
+            .collectList().block())
             .containsOnly(MAILBOX);
     }
 }
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index b64ecb3..a85467b 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -62,6 +62,7 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSearchIndex {
@@ -118,11 +119,11 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
     }
     
     @Override
-    public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) {
+    public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
 
         if (mailboxIds.isEmpty()) {
-            return ImmutableList.of();
+            return Flux.empty();
         }
 
         return searcher.search(mailboxIds, searchQuery, Optional.empty())
@@ -130,9 +131,7 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
             .map(SearchResult::getMessageId)
             .flatMap(Mono::justOrEmpty)
             .distinct()
-            .take(limit)
-            .collect(Guavate.toImmutableList())
-            .block();
+            .take(limit);
     }
 
     @Override
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
index f7d6564..fcaa97b 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
@@ -154,7 +154,8 @@ class ElasticSearchSearcherTest {
             .stream()
             .map(ComposedMessageId::getMessageId)
             .collect(Guavate.toImmutableList());
-        assertThat(storeMailboxManager.search(multimailboxesSearchQuery, session, numberOfMailboxes + 1))
+        assertThat(storeMailboxManager.search(multimailboxesSearchQuery, session, numberOfMailboxes + 1)
+            .collectList().block())
             .containsExactlyInAnyOrderElementsOf(expectedMessageIds);
     }
 
diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java
index 777edf3..c2d4514 100644
--- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java
+++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java
@@ -48,8 +48,8 @@ import org.apache.james.mailbox.store.mail.MailboxMapper;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -186,15 +186,13 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
     }
 
     @Override
-    public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
+    public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
         try {
             String pathLike = MailboxExpressionBackwardCompatibility.getPathLike(query);
-            return findMailboxWithPathLikeTypedQuery(query.getFixedNamespace(), query.getFixedUser(), pathLike)
-                .getResultList()
-                .stream()
+            return Flux.fromIterable(findMailboxWithPathLikeTypedQuery(query.getFixedNamespace(), query.getFixedUser(), pathLike)
+                .getResultList())
                 .map(JPAMailbox::toMailbox)
-                .filter(query::matches)
-                .collect(Guavate.toImmutableList());
+                .filter(query::matches);
         } catch (PersistenceException e) {
             throw new MailboxException("Search of mailbox " + query + " failed", e);
         }
@@ -250,7 +248,7 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM
     }
 
     @Override
-    public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException {
-        return ImmutableList.of();
+    public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
+        return Flux.empty();
     }
 }
diff --git a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java
index 30b4da7..787db5b 100644
--- a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java
+++ b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java
@@ -36,6 +36,7 @@ import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.transaction.Mapper;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class TransactionalMailboxMapper implements MailboxMapper {
@@ -81,7 +82,7 @@ public class TransactionalMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
+    public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
         return wrapped.findMailboxWithPathLike(query);
     }
 
@@ -106,7 +107,7 @@ public class TransactionalMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException {
+    public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
         return wrapped.findNonPersonalMailboxes(userName, right);
     }
 
diff --git a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
index b93306b..793f4a0 100644
--- a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
+++ b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
@@ -123,6 +123,8 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+
 /**
  * Lucene based {@link ListeningMessageSearchIndex} which offers message searching via a Lucene index
  */
@@ -461,18 +463,18 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex {
     }
 
     @Override
-    public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+    public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
         if (mailboxIds.isEmpty()) {
-            return ImmutableList.of();
+            return Flux.empty();
         }
 
-        return searchMultimap(mailboxIds, searchQuery)
+        return Flux.fromIterable(searchMultimap(mailboxIds, searchQuery)
             .stream()
             .map(searchResult -> searchResult.getMessageId().get())
             .filter(SearchUtil.distinct())
             .limit(Long.valueOf(limit).intValue())
-            .collect(Guavate.toImmutableList());
+            .collect(Guavate.toImmutableList()));
     }
     
     private List<SearchResult> searchMultimap(Collection<MailboxId> mailboxIds, SearchQuery searchQuery) throws MailboxException {
diff --git a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
index 6ff52fb..4fa87f4 100644
--- a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
+++ b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
@@ -310,7 +310,8 @@ class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.bodyContains("My Body"));
 
-        List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT);
+        List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT)
+            .collectList().block();
 
         assertThat(result).containsOnly(id1, id2);
     }
@@ -323,7 +324,8 @@ class LuceneMailboxMessageSearchIndexTest {
         List<MessageId> result = index.search(session,
                 ImmutableList.of(mailbox.getMailboxId(), mailbox3.getMailboxId()),
                 query,
-                LIMIT);
+                LIMIT)
+            .collectList().block();
 
         assertThat(result).containsOnly(id1);
     }
@@ -333,7 +335,8 @@ class LuceneMailboxMessageSearchIndexTest {
         SearchQuery query = new SearchQuery();
         query.andCriteria(SearchQuery.all());
 
-        List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT);
+        List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT)
+            .collectList().block();
 
         // The query is not limited to one mailbox and we have 5 indexed messages
         assertThat(result).hasSize(5);
@@ -345,7 +348,8 @@ class LuceneMailboxMessageSearchIndexTest {
         query.andCriteria(SearchQuery.all());
 
         int limit = 1;
-        List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, limit);
+        List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, limit)
+            .collectList().block();
 
         assertThat(result).hasSize(limit);
     }
diff --git a/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java b/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java
index d9b5c0d..05d9799 100644
--- a/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java
+++ b/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java
@@ -51,9 +51,7 @@ import org.apache.james.mailbox.store.transaction.NonTransactionalMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.steveash.guavate.Guavate;
-import com.google.common.collect.ImmutableList;
-
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class MaildirMailboxMapper extends NonTransactionalMapper implements MailboxMapper {
@@ -128,7 +126,7 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail
     }
     
     @Override
-    public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
+    public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException {
         String pathLike = MailboxExpressionBackwardCompatibility.getPathLike(query);
         final Pattern searchPattern = Pattern.compile("[" + MaildirStore.maildirDelimiter + "]"
                 + pathLike.replace(".", "\\.").replace(MaildirStore.WILDCARD, ".*"));
@@ -147,9 +145,8 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail
             Mailbox mailbox = maildirStore.loadMailbox(session, root, query.getFixedNamespace(), query.getFixedUser(), "");
             mailboxList.add(0, mailbox);
         }
-        return mailboxList.stream()
-            .filter(query::matches)
-            .collect(Guavate.toImmutableList());
+        return Flux.fromIterable(mailboxList)
+            .filter(query::matches);
     }
 
     @Override
@@ -159,7 +156,8 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail
             .userAndNamespaceFrom(mailbox.generateAssociatedPath())
             .expression(new PrefixedWildcard(mailbox.getName() + delimiter))
             .build()
-            .asUserBound());
+            .asUserBound())
+            .collectList().block();
         return mailboxes.size() > 0;
     }
 
@@ -333,7 +331,7 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail
     }
 
     @Override
-    public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException {
-        return ImmutableList.of();
+    public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
+        return Flux.empty();
     }
 }
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java
index fc55302..77db71d 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java
@@ -40,10 +40,10 @@ import org.apache.james.mailbox.model.UidValidity;
 import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
 
-import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class InMemoryMailboxMapper implements MailboxMapper {
@@ -84,12 +84,10 @@ public class InMemoryMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
-        return mailboxesByPath.values()
-            .stream()
+    public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) {
+        return Flux.fromIterable(mailboxesByPath.values())
             .filter(query::matches)
-            .map(Mailbox::new)
-            .collect(Guavate.toImmutableList());
+            .map(Mailbox::new);
     }
 
     @Override
@@ -166,11 +164,9 @@ public class InMemoryMailboxMapper implements MailboxMapper {
     }
 
     @Override
-    public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException {
-        return mailboxesByPath.values()
-            .stream()
-            .filter(mailbox -> hasRightOn(mailbox, userName, right))
-            .collect(Guavate.toImmutableList());
+    public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) {
+        return Flux.fromIterable(mailboxesByPath.values())
+            .filter(mailbox -> hasRightOn(mailbox, userName, right));
     }
 
     private Boolean hasRightOn(Mailbox mailbox, Username userName, Right right) {
diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java
index a69b4bd..cdb5071 100644
--- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java
+++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java
@@ -35,6 +35,7 @@ import org.apache.james.mailbox.MetadataWithMailboxId;
 import org.apache.james.mailbox.SessionProvider;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.extension.PreDeletionHook;
+import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
@@ -154,18 +155,19 @@ public class DeletedMessageVaultHook implements PreDeletionHook {
             .flatMap(groupFlux -> groupFlux.reduce(DeletedMessageMailboxContext::combine));
     }
 
-    private Publisher<DeletedMessageMailboxContext> addOwnerToMetadata(GroupedFlux<MailboxId, MetadataWithMailboxId> groupedFlux) throws MailboxException {
-        Username owner = retrieveMailboxUser(groupedFlux.key());
-        return groupedFlux.map(metadata -> new DeletedMessageMailboxContext(metadata.getMessageMetaData().getMessageId(), owner, ImmutableList.of(metadata.getMailboxId())));
+    private Flux<DeletedMessageMailboxContext> addOwnerToMetadata(GroupedFlux<MailboxId, MetadataWithMailboxId> groupedFlux) throws MailboxException {
+        return retrieveMailboxUser(groupedFlux.key())
+            .flatMapMany(owner -> groupedFlux.map(metadata ->
+                new DeletedMessageMailboxContext(metadata.getMessageMetaData().getMessageId(), owner, ImmutableList.of(metadata.getMailboxId()))));
     }
 
     private Pair<MessageId, Username> toMessageIdUserPair(DeletedMessageMailboxContext deletedMessageMetadata) {
         return Pair.of(deletedMessageMetadata.getMessageId(), deletedMessageMetadata.getOwner());
     }
 
-    private Username retrieveMailboxUser(MailboxId mailboxId) throws MailboxException {
+    private Mono<Username> retrieveMailboxUser(MailboxId mailboxId) throws MailboxException {
         return mapperFactory.getMailboxMapper(session)
-            .findMailboxById(mailboxId)
-            .getUser();
+            .findMailboxByIdReactive(mailboxId)
+            .map(Mailbox::getUser);
     }
 }
\ No newline at end of file
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index c3d7c02..76f8bfb 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -93,6 +93,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -536,7 +537,7 @@ public class StoreMailboxManager implements MailboxManager {
             .build()
             .asUserBound();
         locker.executeWithLock(from, (LockAwareExecution<Void>) () -> {
-            List<Mailbox> subMailboxes = mapper.findMailboxWithPathLike(query);
+            List<Mailbox> subMailboxes = mapper.findMailboxWithPathLike(query).collectList().block();
             for (Mailbox sub : subMailboxes) {
                 String subOriginalName = sub.getName();
                 String subNewName = newMailboxPath.getName() + subOriginalName.substring(from.getName().length());
@@ -596,7 +597,7 @@ public class StoreMailboxManager implements MailboxManager {
     }
 
     private List<MailboxMetaData> searchMailboxesMetadata(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException {
-        List<Mailbox> mailboxes = searchMailboxes(mailboxQuery, session, right);
+        List<Mailbox> mailboxes = searchMailboxes(mailboxQuery, session, right).collectList().block();
 
         ImmutableMap<MailboxId, MailboxCounters> counters = getMailboxCounters(mailboxes, session)
             .stream()
@@ -614,16 +615,13 @@ public class StoreMailboxManager implements MailboxManager {
             .collect(Guavate.toImmutableList());
     }
 
-    private List<Mailbox> searchMailboxes(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException {
+    private Flux<Mailbox> searchMailboxes(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException {
         MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session);
-        Stream<Mailbox> baseMailboxes = mailboxMapper
-            .findMailboxWithPathLike(toSingleUserQuery(mailboxQuery, session))
-            .stream();
-        Stream<Mailbox> delegatedMailboxes = getDelegatedMailboxes(mailboxMapper, mailboxQuery, right, session);
-        return Stream.concat(baseMailboxes, delegatedMailboxes)
+        Flux<Mailbox> baseMailboxes = mailboxMapper.findMailboxWithPathLike(toSingleUserQuery(mailboxQuery, session));
+        Flux<Mailbox> delegatedMailboxes = getDelegatedMailboxes(mailboxMapper, mailboxQuery, right, session);
+        return Flux.merge(baseMailboxes, delegatedMailboxes)
             .distinct()
-            .filter(Throwing.predicate(mailbox -> storeRightManager.hasRight(mailbox, right, session)))
-            .collect(Guavate.toImmutableList());
+            .filter(Throwing.predicate(mailbox -> storeRightManager.hasRight(mailbox, right, session)));
     }
 
     static MailboxQuery.UserBound toSingleUserQuery(MailboxQuery mailboxQuery, MailboxSession mailboxSession) {
@@ -644,12 +642,12 @@ public class StoreMailboxManager implements MailboxManager {
             .build());
     }
 
-    private Stream<Mailbox> getDelegatedMailboxes(MailboxMapper mailboxMapper, MailboxQuery mailboxQuery,
-                                                  Right right, MailboxSession session) throws MailboxException {
+    private Flux<Mailbox> getDelegatedMailboxes(MailboxMapper mailboxMapper, MailboxQuery mailboxQuery,
+                                                Right right, MailboxSession session) {
         if (mailboxQuery.isPrivateMailboxes(session)) {
-            return Stream.of();
+            return Flux.empty();
         }
-        return mailboxMapper.findNonPersonalMailboxes(session.getUser(), right).stream();
+        return mailboxMapper.findNonPersonalMailboxes(session.getUser(), right);
     }
 
     private MailboxMetaData toMailboxMetadata(MailboxSession session, List<Mailbox> mailboxes, Mailbox mailbox, MailboxCounters counters) throws UnsupportedRightException {
@@ -677,32 +675,31 @@ public class StoreMailboxManager implements MailboxManager {
     }
 
     @Override
-    public List<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException {
-        ImmutableSet<MailboxId> wantedMailboxesId =
-            getInMailboxes(expression.getInMailboxes(), session)
-                .filter(id -> !expression.getNotInMailboxes().contains(id))
-                .collect(Guavate.toImmutableSet());
-
-        return index.search(session, wantedMailboxesId, expression.getSearchQuery(), limit);
+    public Flux<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException {
+        return getInMailboxes(expression.getInMailboxes(), session)
+            .filter(id -> !expression.getNotInMailboxes().contains(id))
+            .collect(Guavate.toImmutableSet())
+            .flatMapMany(Throwing.function(ids -> index.search(session, ids, expression.getSearchQuery(), limit)));
     }
 
-    private Stream<MailboxId> getInMailboxes(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException {
-        if (inMailboxes.isEmpty()) {
+
+    private Flux<MailboxId> getInMailboxes(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException {
+       if (inMailboxes.isEmpty()) {
             return getAllReadableMailbox(session);
         } else {
             return filterReadable(inMailboxes, session);
         }
     }
 
-    private Stream<MailboxId> getAllReadableMailbox(MailboxSession session) throws MailboxException {
+    private Flux<MailboxId> getAllReadableMailbox(MailboxSession session) throws MailboxException {
         return searchMailboxes(MailboxQuery.builder().matchesAllMailboxNames().build(), session, Right.Read)
-            .stream()
             .map(Mailbox::getMailboxId);
     }
 
-    private Stream<MailboxId> filterReadable(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException {
-        return mailboxSessionMapperFactory.getMailboxMapper(session)
-            .findMailboxesById(inMailboxes)
+    private Flux<MailboxId> filterReadable(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException {
+        MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session);
+        return Flux.fromIterable(inMailboxes)
+            .concatMap(mailboxMapper::findMailboxByIdReactive)
             .filter(Throwing.<Mailbox>predicate(mailbox -> storeRightManager.hasRight(mailbox, Right.Read, session)).sneakyThrow())
             .map(Mailbox::getMailboxId);
     }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
index 7ace127..6c5e3b8 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java
@@ -18,9 +18,7 @@
  ****************************************************************/
 package org.apache.james.mailbox.store.mail;
 
-import java.util.Collection;
 import java.util.List;
-import java.util.stream.Stream;
 
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.acl.ACLDiff;
@@ -35,8 +33,7 @@ import org.apache.james.mailbox.model.UidValidity;
 import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.mailbox.store.transaction.Mapper;
 
-import com.github.fge.lambdas.Throwing;
-
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -87,26 +84,25 @@ public interface MailboxMapper extends Mapper {
     Mailbox findMailboxById(MailboxId mailboxId)
             throws MailboxException, MailboxNotFoundException;
 
-    default Stream<Mailbox> findMailboxesById(Collection<MailboxId> mailboxIds) throws MailboxException {
-        return mailboxIds.stream()
-            .flatMap(Throwing.<MailboxId, Stream<Mailbox>>function(id -> {
-                try {
-                    return Stream.of(findMailboxById(id));
-                } catch (MailboxNotFoundException e) {
-                    return Stream.empty();
-                }
-            }).sneakyThrow());
+    default Mono<Mailbox> findMailboxByIdReactive(MailboxId id) {
+        try {
+            return Mono.justOrEmpty(findMailboxById(id));
+        } catch (MailboxNotFoundException e) {
+            return Mono.empty();
+        } catch (MailboxException e) {
+            return Mono.error(e);
+        }
     }
 
     /**
      * Return a List of {@link Mailbox} for the given userName and matching the right
      */
-    List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException;
+    Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right);
 
     /**
      * Return a List of {@link Mailbox} which name is like the given name
      */
-    List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query)
+    Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query)
             throws MailboxException;
 
     /**
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
index 9c29ac4..6ea1f88 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java
@@ -118,6 +118,7 @@ public class DefaultUserQuotaRootResolver implements UserQuotaRootResolver {
                 .user(Username.of(user))
                 .matchesAllMailboxNames()
                 .build()
-                .asUserBound());
+                .asUserBound())
+            .collectList().block();
     }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
index 4a06c64..6e75f26 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
@@ -47,6 +47,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Flux;
+
 /**
  * {@link ListeningMessageSearchIndex} implementation which wraps another {@link ListeningMessageSearchIndex} and will forward all calls to it.
  * 
@@ -141,7 +143,7 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
     
 
     @Override
-    public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+    public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
         throw new UnsupportedSearchException();
     }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
index eb1dcee..4873099 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
@@ -21,7 +21,6 @@ package org.apache.james.mailbox.store.search;
 
 import java.util.Collection;
 import java.util.EnumSet;
-import java.util.List;
 import java.util.Optional;
 import java.util.stream.Stream;
 
@@ -34,6 +33,8 @@ import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.SearchQuery;
 
+import reactor.core.publisher.Flux;
+
 /**
  * An index which can be used to search for MailboxMessage UID's that match a {@link SearchQuery}.
  * 
@@ -50,7 +51,7 @@ public interface MessageSearchIndex {
     /**
      * Return all uids of all {@link Mailbox}'s the current user has access to which match the {@link SearchQuery}
      */
-    List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException;
+    Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException;
 
     EnumSet<MailboxManager.SearchCapabilities> getSupportedCapabilities(EnumSet<MailboxManager.MessageCapabilities> messageCapabilities);
 
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
index 26b602d..ed6fd17 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
@@ -53,10 +53,11 @@ import org.apache.james.mailbox.store.mail.MessageMapperFactory;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 
 import com.github.fge.lambdas.Throwing;
-import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+
 /**
  * {@link MessageSearchIndex} which just fetch {@link MailboxMessage}'s from the {@link MessageMapper} and use {@link MessageSearcher}
  * to match them against the {@link SearchQuery}.
@@ -108,10 +109,10 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
     @Override
     public Stream<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
-        return searchResults(session, ImmutableList.of(mailbox).stream(), query)
-            .stream()
+        return searchResults(session, Flux.just(mailbox), query)
             .filter(searchResult -> searchResult.getMailboxId().equals(mailbox.getMailboxId()))
-            .map(SearchResult::getMessageUid);
+            .map(SearchResult::getMessageUid)
+            .toStream();
     }
 
     private List<SearchResult> searchResults(MailboxSession session, Mailbox mailbox, SearchQuery query) throws MailboxException {
@@ -142,19 +143,17 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
     }
 
     @Override
-    public List<MessageId> search(MailboxSession session, final Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
-        MailboxMapper mailboxManager = mailboxMapperFactory.getMailboxMapper(session);
+    public Flux<MessageId> search(MailboxSession session, final Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+        MailboxMapper mailboxMapper = mailboxMapperFactory.getMailboxMapper(session);
 
-        Stream<Mailbox> filteredMailboxes = mailboxIds
-            .stream()
-            .map(Throwing.function(mailboxManager::findMailboxById).sneakyThrow());
+        Flux<Mailbox> filteredMailboxes = Flux.fromIterable(mailboxIds)
+            .concatMap(Throwing.function(mailboxMapper::findMailboxByIdReactive).sneakyThrow());
 
         return getAsMessageIds(searchResults(session, filteredMailboxes, searchQuery), limit);
     }
 
-    private List<SearchResult> searchResults(MailboxSession session, Stream<Mailbox> mailboxes, SearchQuery query) throws MailboxException {
-        return mailboxes.flatMap(mailbox -> getSearchResultStream(session, query, mailbox))
-            .collect(Guavate.toImmutableList());
+    private Flux<SearchResult> searchResults(MailboxSession session, Flux<Mailbox> mailboxes, SearchQuery query) throws MailboxException {
+        return mailboxes.concatMap(mailbox -> Flux.fromStream(getSearchResultStream(session, query, mailbox)));
     }
 
     private Stream<? extends SearchResult> getSearchResultStream(MailboxSession session, SearchQuery query, Mailbox mailbox) {
@@ -165,12 +164,10 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex {
         }
     }
 
-    private List<MessageId> getAsMessageIds(List<SearchResult> temp, long limit) {
-        return temp.stream()
-            .map(searchResult -> searchResult.getMessageId().get())
+    private Flux<MessageId> getAsMessageIds(Flux<SearchResult> temp, long limit) {
+        return temp.map(searchResult -> searchResult.getMessageId().get())
             .filter(SearchUtil.distinct())
-            .limit(Long.valueOf(limit).intValue())
-            .collect(Guavate.toImmutableList());
+            .take(Long.valueOf(limit).intValue());
     }
 
 }
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
index 30dedcf..03f5900 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
@@ -52,6 +52,8 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+
 public abstract class AbstractCombinationManagerTest {
 
     private static final int DEFAULT_MAXIMUM_LIMIT = 256;
@@ -163,7 +165,8 @@ public abstract class AbstractCombinationManagerTest {
 
         messageIdManager.setInMailboxes(messageId, ImmutableList.of(mailbox1.getMailboxId(), mailbox2.getMailboxId()), session);
 
-        assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)).containsOnly(messageId);
+        assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT))
+            .collectList().block()).containsOnly(messageId);
     }
 
     @Test
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
index d4dfaec..c8aef54 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
@@ -233,7 +233,7 @@ public abstract class MailboxMapperACLTest {
 
     @Test
     void findMailboxesShouldReturnEmptyWhenNone() throws MailboxException {
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer)).isEmpty();
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block()).isEmpty();
     }
 
     @Test
@@ -246,7 +246,7 @@ public abstract class MailboxMapperACLTest {
                 .rights(rights)
                 .asReplacement());
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read)).isEmpty();
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read).collectList().block()).isEmpty();
     }
 
     @Test
@@ -258,7 +258,7 @@ public abstract class MailboxMapperACLTest {
                 .rights(rights)
                 .asAddition());
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
             .containsOnly(benwaInboxMailbox);
     }
 
@@ -278,10 +278,10 @@ public abstract class MailboxMapperACLTest {
                 .rights(newRights)
                 .asReplacement());
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read).collectList().block())
             .containsOnly(benwaInboxMailbox);
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
             .isEmpty();
     }
 
@@ -302,7 +302,7 @@ public abstract class MailboxMapperACLTest {
                 .rights(new Rfc4314Rights())
                 .build());
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
             .isEmpty();
     }
 
@@ -321,7 +321,7 @@ public abstract class MailboxMapperACLTest {
                 .rights(initialRights)
                 .asRemoval());
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
             .isEmpty();
     }
 
@@ -336,7 +336,7 @@ public abstract class MailboxMapperACLTest {
                 .asReplacement());
         mailboxMapper.delete(benwaInboxMailbox);
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block())
             .isEmpty();
     }
 
@@ -349,9 +349,9 @@ public abstract class MailboxMapperACLTest {
             new MailboxACL.Entry(user1, new Rfc4314Rights(Right.Administer)),
             new MailboxACL.Entry(user2, new Rfc4314Rights(Right.Read))));
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer).collectList().block())
             .containsOnly(benwaInboxMailbox);
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Read))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Read).collectList().block())
             .containsOnly(benwaInboxMailbox);
     }
 
@@ -367,7 +367,7 @@ public abstract class MailboxMapperACLTest {
         mailboxMapper.setACL(benwaInboxMailbox, new MailboxACL(
             new MailboxACL.Entry(user2, new Rfc4314Rights(Right.Read))));
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer).collectList().block())
             .isEmpty();
     }
 
@@ -383,7 +383,7 @@ public abstract class MailboxMapperACLTest {
         mailboxMapper.setACL(benwaInboxMailbox, new MailboxACL(
             new MailboxACL.Entry(user2, new Rfc4314Rights(Right.Write))));
 
-        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Write))
+        assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Write).collectList().block())
             .containsOnly(benwaInboxMailbox);
     }
 
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java
index 3fdf808..998a862 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java
@@ -193,7 +193,8 @@ public abstract class MailboxMapperTest {
             .build()
             .asUserBound();
 
-        List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery);
+        List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery)
+            .collectList().block();
 
         assertMailboxes(mailboxes).containOnly(bobInboxMailbox);
     }
@@ -216,7 +217,8 @@ public abstract class MailboxMapperTest {
             .build()
             .asUserBound();
 
-        List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery);
+        List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery)
+            .collectList().block();
 
         assertMailboxes(mailboxes).containOnly(benwaWorkMailbox, benwaWorkDoneMailbox, benwaWorkTodoMailbox);
     }
@@ -230,7 +232,8 @@ public abstract class MailboxMapperTest {
             .build()
             .asUserBound();
 
-        List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery);
+        List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery)
+            .collectList().block();
 
         assertMailboxes(mailboxes).containOnly(benwaInboxMailbox);
     }
@@ -244,7 +247,8 @@ public abstract class MailboxMapperTest {
             .build()
             .asUserBound();
 
-        assertThat(mailboxMapper.findMailboxWithPathLike(mailboxQuery)).isEmpty();
+        assertThat(mailboxMapper.findMailboxWithPathLike(mailboxQuery)
+            .collectList().block()).isEmpty();
     }
 
     @Test
@@ -253,38 +257,6 @@ public abstract class MailboxMapperTest {
         Mailbox actual = mailboxMapper.findMailboxById(benwaInboxMailbox.getMailboxId());
         assertThat(actual).isEqualTo(benwaInboxMailbox);
     }
-
-    @Test
-    void findMailboxesByIdShouldReturnEmptyWhenNoIdSupplied() throws MailboxException {
-        createAll();
-
-        Stream<Mailbox> mailboxes = mailboxMapper.findMailboxesById(ImmutableList.of());
-
-        assertThat(mailboxes).isEmpty();
-    }
-
-    @Test
-    void findMailboxesByIdShouldReturnMailboxOfSuppliedId() throws MailboxException {
-        createAll();
-
-        Stream<Mailbox> mailboxes = mailboxMapper.findMailboxesById(ImmutableList.of(
-            benwaInboxMailbox.getMailboxId(),
-            benwaWorkMailbox.getMailboxId()));
-
-        assertThat(mailboxes).containsOnly(benwaWorkMailbox, benwaInboxMailbox);
-    }
-
-    @Test
-    void findMailboxesByIdShouldFilterOutNonExistingMailbox() throws MailboxException {
-        createAll();
-        mailboxMapper.delete(benwaWorkMailbox);
-
-        Stream<Mailbox> mailboxes = mailboxMapper.findMailboxesById(ImmutableList.of(
-            benwaInboxMailbox.getMailboxId(),
-            benwaWorkMailbox.getMailboxId()));
-
-        assertThat(mailboxes).containsOnly(benwaInboxMailbox);
-    }
     
     @Test
     void findMailboxByIdShouldFailWhenAbsent() throws MailboxException {
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java
index d744e15..36cb715 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java
@@ -42,7 +42,7 @@ import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import com.google.common.collect.Lists;
+import reactor.core.publisher.Flux;
 
 class DefaultUserQuotaRootResolverTest {
 
@@ -92,7 +92,7 @@ class DefaultUserQuotaRootResolverTest {
     void retrieveAssociatedMailboxesShouldWork() throws Exception {
         MailboxMapper mockedMapper = mock(MailboxMapper.class);
         when(mockedFactory.getMailboxMapper(MAILBOX_SESSION)).thenReturn(mockedMapper);
-        when(mockedMapper.findMailboxWithPathLike(any())).thenReturn(Lists.newArrayList(MAILBOX, MAILBOX_2));
+        when(mockedMapper.findMailboxWithPathLike(any())).thenReturn(Flux.just(MAILBOX, MAILBOX_2));
 
         assertThat(testee.retrieveAssociatedMailboxes(QUOTA_ROOT, MAILBOX_SESSION)).containsOnly(MAILBOX, MAILBOX_2);
     }
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
index 8271602..6da8aa0 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
@@ -248,7 +248,8 @@ public abstract class AbstractMessageSearchIndexTest {
         List<MessageId> result = messageSearchIndex.search(session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(result)
             .hasSize(12)
@@ -280,7 +281,8 @@ public abstract class AbstractMessageSearchIndexTest {
         List<MessageId> result = messageSearchIndex.search(session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(result)
             .containsOnly(m1.getMessageId(),
@@ -316,7 +318,8 @@ public abstract class AbstractMessageSearchIndexTest {
         List<MessageId> result = messageSearchIndex.search(session,
             ImmutableList.of(mailbox2.getMailboxId(), mailbox.getMailboxId()),
             searchQuery,
-            limit);
+            limit)
+            .collectList().block();
 
         assertThat(result)
             .hasSize(limit);
@@ -329,7 +332,8 @@ public abstract class AbstractMessageSearchIndexTest {
         List<MessageId> result = messageSearchIndex.search(session,
             ImmutableList.of(),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(result)
             .isEmpty();
@@ -353,7 +357,8 @@ public abstract class AbstractMessageSearchIndexTest {
         List<MessageId> result = messageSearchIndex.search(session,
             ImmutableList.of(mailbox2.getMailboxId(), mailbox.getMailboxId()),
             searchQuery,
-            limit);
+            limit)
+            .collectList().block();
 
         assertThat(result)
                 .hasSize(limit);
@@ -549,7 +554,8 @@ public abstract class AbstractMessageSearchIndexTest {
             session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(actual).containsOnly(mOther.getMessageId(), m6.getMessageId());
     }
@@ -558,7 +564,8 @@ public abstract class AbstractMessageSearchIndexTest {
     void multimailboxSearchShouldReturnUidOfMessageMarkedAsSeenInOneMailbox() throws MailboxException {
         SearchQuery searchQuery = new SearchQuery(SearchQuery.flagIsSet(Flags.Flag.SEEN));
 
-        List<MessageId> actual = messageSearchIndex.search(session, ImmutableList.of(mailbox.getMailboxId()), searchQuery, LIMIT);
+        List<MessageId> actual = messageSearchIndex.search(session, ImmutableList.of(mailbox.getMailboxId()), searchQuery, LIMIT)
+            .collectList().block();
 
         assertThat(actual).containsOnly(m6.getMessageId());
     }
@@ -571,7 +578,8 @@ public abstract class AbstractMessageSearchIndexTest {
             session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(actual).containsOnly(mOther.getMessageId(), m8.getMessageId());
     }
@@ -584,7 +592,8 @@ public abstract class AbstractMessageSearchIndexTest {
             session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(actual).containsOnly(mOther.getMessageId(), m6.getMessageId());
     }
@@ -598,7 +607,8 @@ public abstract class AbstractMessageSearchIndexTest {
             session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            limit);
+            limit)
+            .collectList().block();
         // Two messages matches this query : mOther and m6
 
         assertThat(actual).hasSize(1);
@@ -614,7 +624,8 @@ public abstract class AbstractMessageSearchIndexTest {
             session,
             ImmutableList.of(otherMailbox.getMailboxId()),
             searchQuery,
-            limit);
+            limit)
+            .collectList().block();
 
         assertThat(actual).contains(m10.getMessageId());
     }
@@ -1408,7 +1419,8 @@ public abstract class AbstractMessageSearchIndexTest {
             session,
             ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()),
             searchQuery,
-            LIMIT);
+            LIMIT)
+            .collectList().block();
 
         assertThat(actual).containsOnly(m1.getMessageId(), m2.getMessageId(), m3.getMessageId(), m4.getMessageId(), m5.getMessageId(),
             m6.getMessageId(), m7.getMessageId(), m8.getMessageId(), m9.getMessageId(), mOther.getMessageId(), mailWithAttachment.getMessageId(), mailWithInlinedAttachment.getMessageId());
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java
index 2c992cb..1cf5ebb 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java
@@ -69,7 +69,7 @@ public class CassandraSchemaVersionStartUpCheck implements StartUpCheck {
     private CheckResult checkUpgradeAbleState() {
         String upgradeVersionMessage =
             String.format("Current schema version is %d. Recommended version is %d",
-                versionManager.computeVersion().getValue(),
+                versionManager.computeVersion().block().getValue(),
                 versionManager.getMaximumSupportedVersion().getValue());
         LOGGER.warn(upgradeVersionMessage);
         return CheckResult.builder()
@@ -93,7 +93,7 @@ public class CassandraSchemaVersionStartUpCheck implements StartUpCheck {
         String versionExceedMaximumSupportedMessage =
             String.format("Current schema version is %d whereas the maximum supported version is %d. " +
                 "Recommended version is %d.",
-                versionManager.computeVersion().getValue(),
+                versionManager.computeVersion().block().getValue(),
                 versionManager.getMaximumSupportedVersion().getValue(),
                 versionManager.getMaximumSupportedVersion().getValue());
         LOGGER.error(versionExceedMaximumSupportedMessage);
@@ -108,7 +108,7 @@ public class CassandraSchemaVersionStartUpCheck implements StartUpCheck {
         String versionToOldMessage =
             String.format("Current schema version is %d whereas minimum required version is %d. " +
                 "Recommended version is %d",
-                versionManager.computeVersion().getValue(),
+                versionManager.computeVersion().block().getValue(),
                 versionManager.getMinimumSupportedVersion().getValue(),
                 versionManager.getMaximumSupportedVersion().getValue());
         LOGGER.error(versionToOldMessage);
diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
index e272402..f9b866c 100644
--- a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
+++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
@@ -38,6 +38,8 @@ import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
 
+import reactor.core.publisher.Flux;
+
 public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
     private static class FakeMessageSearchIndexGroup extends Group {
 
@@ -80,7 +82,7 @@ public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
     }
 
     @Override
-    public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+    public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) {
         throw new NotImplementedException("not implemented");
     }
 
diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
index 6dc233b..8167ace 100644
--- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
+++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java
@@ -97,7 +97,7 @@ public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTabl
         Preconditions.checkArgument(listSourcesSupportedType.contains(mapping.getType()),
             "Not supported mapping of type %s", mapping.getType());
 
-        if (versionManager.isBefore(MAPPINGS_SOURCES_SUPPORTED_VERSION)) {
+        if (versionManager.isBefore(MAPPINGS_SOURCES_SUPPORTED_VERSION).block()) {
             return super.listSources(mapping);
         }
 
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java
index 19c7503..cd31826 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.jmap.draft.methods;
 
+import static org.apache.james.util.ReactorUtils.context;
+
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -39,7 +41,6 @@ import org.apache.james.jmap.draft.utils.FilterToSearchQuery;
 import org.apache.james.jmap.draft.utils.SortConverter;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxId.Factory;
 import org.apache.james.mailbox.model.MultimailboxesSearchQuery;
@@ -47,10 +48,15 @@ import org.apache.james.mailbox.model.SearchQuery;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.util.MDCBuilder;
 
+import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 public class GetMessageListMethod implements Method {
 
     private static final long DEFAULT_POSITION = 0;
@@ -89,11 +95,18 @@ public class GetMessageListMethod implements Method {
     }
 
     @Override
-    public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
+    public Flux<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) {
         Preconditions.checkArgument(request instanceof GetMessageListRequest);
 
         GetMessageListRequest messageListRequest = (GetMessageListRequest) request;
 
+        return metricFactory.runPublishingTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(),
+            () -> process(methodCallId, mailboxSession, messageListRequest)
+                .subscriberContext(context("GET_MESSAGE_LIST", mdc(messageListRequest))))
+            .subscribeOn(Schedulers.elastic());
+    }
+
+    private MDCBuilder mdc(GetMessageListRequest messageListRequest) {
         return MDCBuilder.create()
             .addContext(MDCBuilder.ACTION, "GET_MESSAGE_LIST")
             .addContext("accountId", messageListRequest.getAccountId())
@@ -105,38 +118,30 @@ public class GetMessageListMethod implements Method {
             .addContext("filters", messageListRequest.getFilter())
             .addContext("sorts", messageListRequest.getSort())
             .addContext("isFetchMessage", messageListRequest.isFetchMessages())
-            .addContext("isCollapseThread", messageListRequest.isCollapseThreads())
-            .wrapArround(
-                () -> metricFactory.runPublishingTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(),
-                    () -> process(methodCallId, mailboxSession, messageListRequest)))
-            .get();
+            .addContext("isCollapseThread", messageListRequest.isCollapseThreads());
     }
 
-    private Stream<JmapResponse> process(MethodCallId methodCallId, MailboxSession mailboxSession, GetMessageListRequest messageListRequest) {
-        GetMessageListResponse messageListResponse = getMessageListResponse(messageListRequest, mailboxSession);
-        Stream<JmapResponse> jmapResponse = Stream.of(JmapResponse.builder().methodCallId(methodCallId)
-            .response(messageListResponse)
-            .responseName(RESPONSE_NAME)
-            .build());
-        return Stream.concat(jmapResponse,
-            processGetMessages(messageListRequest, messageListResponse, methodCallId, mailboxSession));
+    private Flux<JmapResponse> process(MethodCallId methodCallId, MailboxSession mailboxSession, GetMessageListRequest messageListRequest) {
+        return getMessageListResponse(messageListRequest, mailboxSession)
+                .flatMapMany(messageListResponse -> Flux.concat(
+                    Mono.just(JmapResponse.builder().methodCallId(methodCallId)
+                        .response(messageListResponse)
+                        .responseName(RESPONSE_NAME)
+                        .build()),
+                    processGetMessages(messageListRequest, messageListResponse, methodCallId, mailboxSession)));
     }
 
-    private GetMessageListResponse getMessageListResponse(GetMessageListRequest messageListRequest, MailboxSession mailboxSession) {
-        GetMessageListResponse.Builder builder = GetMessageListResponse.builder();
-        try {
-            MultimailboxesSearchQuery searchQuery = convertToSearchQuery(messageListRequest);
-            Long postionValue = messageListRequest.getPosition().map(Number::asLong).orElse(DEFAULT_POSITION);
-            mailboxManager.search(searchQuery,
-                mailboxSession,
-                postionValue + messageListRequest.getLimit().map(Number::asLong).orElse(maximumLimit))
-                .stream()
-                .skip(postionValue)
-                .forEach(builder::messageId);
-            return builder.build();
-        } catch (MailboxException e) {
-            throw new RuntimeException(e);
-        }
+    private Mono<GetMessageListResponse> getMessageListResponse(GetMessageListRequest messageListRequest, MailboxSession mailboxSession) {
+        Mono<MultimailboxesSearchQuery> searchQuery = Mono.fromCallable(() -> convertToSearchQuery(messageListRequest))
+            .subscribeOn(Schedulers.parallel());
+        Long positionValue = messageListRequest.getPosition().map(Number::asLong).orElse(DEFAULT_POSITION);
+        long limit = positionValue + messageListRequest.getLimit().map(Number::asLong).orElse(maximumLimit);
+
+        return searchQuery
+            .flatMapMany(Throwing.function(query -> mailboxManager.search(query, mailboxSession, limit)))
+            .skip(positionValue)
+            .reduce(GetMessageListResponse.builder(), GetMessageListResponse.Builder::messageId)
+            .map(GetMessageListResponse.Builder::build);
     }
 
     private MultimailboxesSearchQuery convertToSearchQuery(GetMessageListRequest messageListRequest) {
@@ -174,15 +179,15 @@ public class GetMessageListMethod implements Method {
                 });
     }
     
-    private Stream<JmapResponse> processGetMessages(GetMessageListRequest messageListRequest, GetMessageListResponse messageListResponse, MethodCallId methodCallId, MailboxSession mailboxSession) {
+    private Flux<JmapResponse> processGetMessages(GetMessageListRequest messageListRequest, GetMessageListResponse messageListResponse, MethodCallId methodCallId, MailboxSession mailboxSession) {
         if (shouldChainToGetMessages(messageListRequest)) {
             GetMessagesRequest getMessagesRequest = GetMessagesRequest.builder()
                     .ids(messageListResponse.getMessageIds())
                     .properties(messageListRequest.getFetchMessageProperties())
                     .build();
-            return getMessagesMethod.processToStream(getMessagesRequest, methodCallId, mailboxSession);
+            return getMessagesMethod.process(getMessagesRequest, methodCallId, mailboxSession);
         }
-        return Stream.empty();
+        return Flux.empty();
     }
 
     private boolean shouldChainToGetMessages(GetMessageListRequest messageListRequest) {
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
index dca90fc..4abc540 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java
@@ -49,6 +49,8 @@ import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.Iterables;
 
+import reactor.core.publisher.Flux;
+
 public class ReferenceUpdater {
     public static final String X_FORWARDED_ID_HEADER = "X-Forwarded-Message-Id";
     public static final Flags FORWARDED_FLAG = new Flags("$Forwarded");
@@ -90,7 +92,8 @@ public class ReferenceUpdater {
         MultimailboxesSearchQuery searchByRFC822MessageId = MultimailboxesSearchQuery
             .from(new SearchQuery(SearchQuery.mimeMessageID(messageId)))
             .build();
-        List<MessageId> references = mailboxManager.search(searchByRFC822MessageId, session, limit);
+        List<MessageId> references = Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit))
+            .collectList().block();
         try {
             MessageId reference = Iterables.getOnlyElement(references);
             List<MailboxId> mailboxIds = messageIdManager.getMessage(reference, FetchGroup.MINIMAL, session).stream()
diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java
index 1679fd9..823867d 100644
--- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java
+++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java
@@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
+import reactor.core.publisher.Flux;
+
 /**
  * This mailet handles MDN messages and define a header X-JAMES-MDN-JMAP-MESSAGE-ID referencing
  * the original message (by its Jmap Id) asking for the recipient to send an MDN.
@@ -107,7 +109,7 @@ public class ExtractMDNOriginalJMAPMessageId extends GenericMailet {
             MultimailboxesSearchQuery searchByRFC822MessageId = MultimailboxesSearchQuery
                 .from(new SearchQuery(SearchQuery.mimeMessageID(messageId)))
                 .build();
-            return mailboxManager.search(searchByRFC822MessageId, session, limit).stream().findFirst();
+            return Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit)).toStream().findFirst();
         } catch (MailboxException | UsersRepositoryException e) {
             LOGGER.error("unable to find message with Message-Id: " + messageId, e);
         }
diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
index 5990161..3e7dcb7 100644
--- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java
@@ -2262,7 +2262,8 @@ class DeletedMessagesVaultRoutesTest {
         MailboxSession session = mailboxManager.createSystemSession(username);
         int limitToOneMessage = 1;
 
-        return !mailboxManager.search(MultimailboxesSearchQuery.from(new SearchQuery()).build(), session, limitToOneMessage)
+        return !Flux.from(mailboxManager.search(MultimailboxesSearchQuery.from(new SearchQuery()).build(), session, limitToOneMessage))
+            .collectList().block()
             .isEmpty();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org