You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/05/05 07:51:03 UTC

[james-project] 04/07: JAMES-3155 ElasticSearchListeningMessageSearchIndex can be reactive

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 08c98e6ebf95979038d31e625bc0e9f668e9cf52
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Apr 27 09:05:50 2020 +0700

    JAMES-3155 ElasticSearchListeningMessageSearchIndex can be reactive
---
 .../james/backends/es/ElasticSearchIndexer.java    |  4 +-
 .../backends/es/ElasticSearchIndexerTest.java      |  4 +-
 .../ElasticSearchListeningMessageSearchIndex.java  | 36 ++++++------
 ...asticSearchListeningMessageSearchIndexTest.java | 66 +++++++++++-----------
 .../lucene/search/LuceneMessageSearchIndex.java    | 42 ++++++++------
 .../LuceneMailboxMessageSearchIndexTest.java       | 10 ++--
 .../store/search/LazyMessageSearchIndex.java       | 19 ++++---
 .../store/search/ListeningMessageSearchIndex.java  | 59 +++++++++++--------
 .../mailbox/tools/indexer/ReIndexerPerformer.java  |  6 +-
 .../tools/indexer/CassandraReIndexerImplTest.java  |  5 ++
 .../tools/indexer/MessageIdReIndexerImplTest.java  |  4 ++
 .../mailbox/tools/indexer/ReIndexerImplTest.java   |  5 ++
 .../org/apache/james/FakeMessageSearchIndex.java   |  9 +--
 .../james/webadmin/routes/MailboxesRoutesTest.java | 10 +++-
 .../james/webadmin/routes/MessageRoutesTest.java   |  9 ++-
 .../webadmin/routes/UserMailboxesRoutesTest.java   |  3 +
 16 files changed, 169 insertions(+), 122 deletions(-)

diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
index 0fcbc11..3b5c8a2 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
@@ -113,8 +113,8 @@ public class ElasticSearchIndexer {
             });
     }
 
-    public void deleteAllMatchingQuery(QueryBuilder queryBuilder, RoutingKey routingKey) {
-        deleteByQueryPerformer.perform(queryBuilder, routingKey).block();
+    public Mono<Void> deleteAllMatchingQuery(QueryBuilder queryBuilder, RoutingKey routingKey) {
+        return deleteByQueryPerformer.perform(queryBuilder, routingKey);
     }
 
     private void checkArgument(String content) {
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
index f5f8a40..ac320ca 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
@@ -163,7 +163,7 @@ class ElasticSearchIndexerTest {
         testee.index(documentId, content, routingKey).block();
         elasticSearch.awaitForElasticSearch();
         
-        testee.deleteAllMatchingQuery(termQuery("property", "1"), routingKey);
+        testee.deleteAllMatchingQuery(termQuery("property", "1"), routingKey).block();
         elasticSearch.awaitForElasticSearch();
         
         CALMLY_AWAIT.atMost(Duration.TEN_SECONDS)
@@ -193,7 +193,7 @@ class ElasticSearchIndexerTest {
         testee.index(documentId3, content3, ROUTING).block();
         elasticSearch.awaitForElasticSearch();
 
-        testee.deleteAllMatchingQuery(termQuery("property", "1"), ROUTING);
+        testee.deleteAllMatchingQuery(termQuery("property", "1"), ROUTING).block();
         elasticSearch.awaitForElasticSearch();
         
         CALMLY_AWAIT.atMost(Duration.TEN_SECONDS)
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index a85467b..3d2d099 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -20,7 +20,6 @@ package org.apache.james.mailbox.elasticsearch.events;
 
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
@@ -135,18 +134,19 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
     }
 
     @Override
-    public void add(MailboxSession session, Mailbox mailbox, MailboxMessage message) throws IOException {
+    public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message) {
         LOGGER.info("Indexing mailbox {}-{} of user {} on message {}",
             mailbox.getName(),
             mailbox.getMailboxId(),
             session.getUser().asString(),
             message.getUid());
 
-        String jsonContent = generateIndexedJson(mailbox, message, session);
+        RoutingKey from = routingKeyFactory.from(mailbox.getMailboxId());
+        DocumentId id = indexIdFor(mailbox, message.getUid());
 
-        elasticSearchIndexer
-            .index(indexIdFor(mailbox, message.getUid()), jsonContent, routingKeyFactory.from(mailbox.getMailboxId()))
-            .block();
+        return Mono.fromCallable(() -> generateIndexedJson(mailbox, message, session))
+            .flatMap(jsonContent -> elasticSearchIndexer.index(id, jsonContent, from))
+            .then();
     }
 
     private String generateIndexedJson(Mailbox mailbox, MailboxMessage message, MailboxSession session) throws JsonProcessingException {
@@ -164,36 +164,36 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
     }
 
     @Override
-    public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
-        elasticSearchIndexer
+    public Mono<Void> delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
+        return elasticSearchIndexer
             .delete(expungedUids.stream()
                 .map(uid ->  indexIdFor(mailbox, uid))
                 .collect(Guavate.toImmutableList()),
                 routingKeyFactory.from(mailbox.getMailboxId()))
-            .block();
+            .then();
     }
 
     @Override
-    public void deleteAll(MailboxSession session, MailboxId mailboxId) {
+    public Mono<Void> deleteAll(MailboxSession session, MailboxId mailboxId) {
         TermQueryBuilder queryBuilder = termQuery(
             JsonMessageConstants.MAILBOX_ID,
             mailboxId.serialize());
 
-        elasticSearchIndexer
+        return elasticSearchIndexer
                 .deleteAllMatchingQuery(queryBuilder, routingKeyFactory.from(mailboxId));
     }
 
     @Override
-    public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
-        ImmutableList<UpdatedRepresentation> updates = updatedFlagsList.stream()
+    public Mono<Void> update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
+        RoutingKey routingKey = routingKeyFactory.from(mailbox.getMailboxId());
+
+        return Flux.fromIterable(updatedFlagsList)
             .map(Throwing.<UpdatedFlags, UpdatedRepresentation>function(
                 updatedFlags -> createUpdatedDocumentPartFromUpdatedFlags(mailbox, updatedFlags))
                 .sneakyThrow())
-            .collect(Guavate.toImmutableList());
-
-        elasticSearchIndexer
-            .update(updates, routingKeyFactory.from(mailbox.getMailboxId()))
-            .block();
+            .collect(Guavate.toImmutableList())
+            .flatMap(updates -> elasticSearchIndexer.update(updates, routingKey))
+            .then();
     }
 
     private UpdatedRepresentation createUpdatedDocumentPartFromUpdatedFlags(Mailbox mailbox, UpdatedFlags updatedFlags) throws JsonProcessingException {
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
index bd4fc78..e3f16a3 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
@@ -192,7 +192,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
     
     @Test
     void addShouldIndexMessageWithoutAttachment() throws Exception {
-        testee.add(session, mailbox, MESSAGE_1);
+        testee.add(session, mailbox, MESSAGE_1).block();
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -203,7 +203,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
     @Test
     void addShouldIndexMessageWithAttachment() throws Exception {
-        testee.add(session, mailbox, MESSAGE_WITH_ATTACHMENT);
+        testee.add(session, mailbox, MESSAGE_WITH_ATTACHMENT).block();
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -213,8 +213,8 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
     @Test
     void addShouldBeIndempotent() throws Exception {
-        testee.add(session, mailbox, MESSAGE_1);
-        testee.add(session, mailbox, MESSAGE_1);
+        testee.add(session, mailbox, MESSAGE_1).block();
+        testee.add(session, mailbox, MESSAGE_1).block();
 
         elasticSearch.awaitForElasticSearch();
 
@@ -225,8 +225,8 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
     @Test
     void addShouldIndexMultipleMessages() throws Exception {
-        testee.add(session, mailbox, MESSAGE_1);
-        testee.add(session, mailbox, MESSAGE_2);
+        testee.add(session, mailbox, MESSAGE_1).block();
+        testee.add(session, mailbox, MESSAGE_2).block();
 
         elasticSearch.awaitForElasticSearch();
 
@@ -245,7 +245,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         testee = new ElasticSearchListeningMessageSearchIndex(mapperFactory, elasticSearchIndexer, elasticSearchSearcher,
             messageToElasticSearchJson, sessionProvider, new MailboxIdRoutingKeyFactory());
 
-        testee.add(session, mailbox, MESSAGE_WITH_ATTACHMENT);
+        testee.add(session, mailbox, MESSAGE_WITH_ATTACHMENT).block();
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -258,7 +258,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.getDockerElasticSearch().pause();
         Thread.sleep(Duration.FIVE_SECONDS.getValueInMS()); // Docker pause is asynchronous and we found no way to poll for it
 
-        assertThatThrownBy(() -> testee.add(session, mailbox, MESSAGE_1))
+        assertThatThrownBy(() -> testee.add(session, mailbox, MESSAGE_1).block())
             .hasCauseInstanceOf(IOException.class);
 
         elasticSearch.getDockerElasticSearch().unpause();
@@ -266,10 +266,10 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
     @Test
     void deleteShouldRemoveIndex() throws IOException {
-        testee.add(session, mailbox, MESSAGE_1);
+        testee.add(session, mailbox, MESSAGE_1).block();
         elasticSearch.awaitForElasticSearch();
 
-        testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1));
+        testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)).block();
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -279,12 +279,12 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
     @Test
     void deleteShouldOnlyRemoveIndexesPassedAsArguments() throws IOException {
-        testee.add(session, mailbox, MESSAGE_1);
-        testee.add(session, mailbox, MESSAGE_2);
+        testee.add(session, mailbox, MESSAGE_1).block();
+        testee.add(session, mailbox, MESSAGE_2).block();
 
         elasticSearch.awaitForElasticSearch();
 
-        testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1));
+        testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)).block();
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -294,12 +294,12 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
     @Test
     void deleteShouldRemoveMultipleIndexes() throws IOException {
-        testee.add(session, mailbox, MESSAGE_1);
-        testee.add(session, mailbox, MESSAGE_2);
+        testee.add(session, mailbox, MESSAGE_1).block();
+        testee.add(session, mailbox, MESSAGE_2).block();
 
         elasticSearch.awaitForElasticSearch();
 
-        testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1, MESSAGE_UID_2));
+        testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1, MESSAGE_UID_2)).block();
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -309,11 +309,11 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
     @Test
     void deleteShouldBeIdempotent() throws IOException {
-        testee.add(session, mailbox, MESSAGE_1);
+        testee.add(session, mailbox, MESSAGE_1).block();
         elasticSearch.awaitForElasticSearch();
 
-        testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1));
-        testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1));
+        testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)).block();
+        testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)).block();
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -323,7 +323,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
     @Test
     void deleteShouldNotThrowOnUnknownMessageUid() {
-        assertThatCode(() -> testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)))
+        assertThatCode(() -> testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)).block())
             .doesNotThrowAnyException();
     }
 
@@ -332,7 +332,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.getDockerElasticSearch().pause();
         Thread.sleep(Duration.FIVE_SECONDS.getValueInMS()); // Docker pause is asynchronous and we found no way to poll for it
 
-        assertThatThrownBy(() -> testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)))
+        assertThatThrownBy(() -> testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)).block())
             .hasCauseInstanceOf(IOException.class);
 
         elasticSearch.getDockerElasticSearch().unpause();
@@ -340,7 +340,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
     @Test
     void updateShouldUpdateIndex() throws Exception {
-        testee.add(session, mailbox, MESSAGE_1);
+        testee.add(session, mailbox, MESSAGE_1).block();
         elasticSearch.awaitForElasticSearch();
 
         Flags newFlags = new Flags(Flags.Flag.ANSWERED);
@@ -351,7 +351,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
             .newFlags(newFlags)
             .build();
 
-        testee.update(session, mailbox, Lists.newArrayList(updatedFlags));
+        testee.update(session, mailbox, Lists.newArrayList(updatedFlags)).block();
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = new SearchQuery(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
@@ -361,7 +361,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
     @Test
     void updateShouldNotUpdateNorThrowOnUnknownMessageUid() throws Exception {
-        testee.add(session, mailbox, MESSAGE_1);
+        testee.add(session, mailbox, MESSAGE_1).block();
         elasticSearch.awaitForElasticSearch();
 
         Flags newFlags = new Flags(Flags.Flag.ANSWERED);
@@ -372,7 +372,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
             .newFlags(newFlags)
             .build();
 
-        testee.update(session, mailbox, Lists.newArrayList(updatedFlags));
+        testee.update(session, mailbox, Lists.newArrayList(updatedFlags)).block();
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = new SearchQuery(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
@@ -382,7 +382,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
     @Test
     void updateShouldBeIdempotent() throws Exception {
-        testee.add(session, mailbox, MESSAGE_1);
+        testee.add(session, mailbox, MESSAGE_1).block();
         elasticSearch.awaitForElasticSearch();
 
         Flags newFlags = new Flags(Flags.Flag.ANSWERED);
@@ -393,8 +393,8 @@ class ElasticSearchListeningMessageSearchIndexTest {
             .newFlags(newFlags)
             .build();
 
-        testee.update(session, mailbox, Lists.newArrayList(updatedFlags));
-        testee.update(session, mailbox, Lists.newArrayList(updatedFlags));
+        testee.update(session, mailbox, Lists.newArrayList(updatedFlags)).block();
+        testee.update(session, mailbox, Lists.newArrayList(updatedFlags)).block();
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = new SearchQuery(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
@@ -415,7 +415,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
             .newFlags(newFlags)
             .build();
 
-        assertThatThrownBy(() -> testee.update(session, mailbox, Lists.newArrayList(updatedFlags)))
+        assertThatThrownBy(() -> testee.update(session, mailbox, Lists.newArrayList(updatedFlags)).block())
             .hasCauseInstanceOf(IOException.class);
 
         elasticSearch.getDockerElasticSearch().unpause();
@@ -423,12 +423,12 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
     @Test
     void deleteAllShouldRemoveAllIndexes() throws Exception {
-        testee.add(session, mailbox, MESSAGE_1);
-        testee.add(session, mailbox, MESSAGE_2);
+        testee.add(session, mailbox, MESSAGE_1).block();
+        testee.add(session, mailbox, MESSAGE_2).block();
 
         elasticSearch.awaitForElasticSearch();
 
-        testee.deleteAll(session, mailbox.getMailboxId());
+        testee.deleteAll(session, mailbox.getMailboxId()).block();
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = new SearchQuery(SearchQuery.all());
@@ -438,7 +438,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
     @Test
     void deleteAllShouldNotThrowWhenEmptyIndex() {
-        assertThatCode(() -> testee.deleteAll(session, mailbox.getMailboxId()))
+        assertThatCode(() -> testee.deleteAll(session, mailbox.getMailboxId()).block())
             .doesNotThrowAnyException();
     }
 
diff --git a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
index ff86117..e3a4f25 100644
--- a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
+++ b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
@@ -119,11 +119,13 @@ import org.apache.lucene.util.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.fge.lambdas.Throwing;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 /**
  * Lucene based {@link ListeningMessageSearchIndex} which offers message searching via a Lucene index
@@ -1176,19 +1178,23 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex {
     }
 
     @Override
-    public void add(MailboxSession session, Mailbox mailbox, MailboxMessage membership) throws IOException, MimeException {
-        Document doc = createMessageDocument(session, membership);
-        Document flagsDoc = createFlagsDocument(membership);
-
-        writer.addDocument(doc);
-        writer.addDocument(flagsDoc);
+    public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage membership) {
+        return Mono.fromRunnable(Throwing.runnable(() -> {
+            Document doc = createMessageDocument(session, membership);
+            Document flagsDoc = createFlagsDocument(membership);
+
+            writer.addDocument(doc);
+            writer.addDocument(flagsDoc);
+        }));
     }
 
     @Override
-    public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws IOException {
-        for (UpdatedFlags updatedFlags : updatedFlagsList) {
-            update(mailbox, updatedFlags.getUid(), updatedFlags.getNewFlags());
-        }
+    public Mono<Void> update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
+        return Mono.fromRunnable(Throwing.runnable(() -> {
+            for (UpdatedFlags updatedFlags : updatedFlagsList) {
+                update(mailbox, updatedFlags.getUid(), updatedFlags.getNewFlags());
+            }
+        }));
     }
 
     private void update(Mailbox mailbox, MessageUid uid, Flags f) throws IOException {
@@ -1266,16 +1272,18 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex {
     }
 
     @Override
-    public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) throws IOException {
-        Collection<MessageRange> messageRanges = MessageRange.toRanges(expungedUids);
-        for (MessageRange messageRange : messageRanges) {
-            delete(mailbox.getMailboxId(), messageRange);
-        }
+    public Mono<Void> delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
+        return Mono.fromRunnable(Throwing.runnable(() -> {
+            Collection<MessageRange> messageRanges = MessageRange.toRanges(expungedUids);
+            for (MessageRange messageRange : messageRanges) {
+                delete(mailbox.getMailboxId(), messageRange);
+            }
+        }));
     }
 
     @Override
-    public void deleteAll(MailboxSession session, MailboxId mailboxId) throws IOException {
-        delete(mailboxId, MessageRange.all());
+    public Mono<Void> deleteAll(MailboxSession session, MailboxId mailboxId) {
+        return Mono.fromRunnable(Throwing.runnable(() -> delete(mailboxId, MessageRange.all())));
     }
 
     public void delete(MailboxId mailboxId, MessageRange range) throws IOException {
diff --git a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
index 4fa87f4..d7338f2 100644
--- a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
+++ b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
@@ -133,7 +133,7 @@ class LuceneMailboxMessageSearchIndexTest {
             .internalDate(new Date())
             .body("My Body".getBytes(StandardCharsets.UTF_8))
             .size(200);
-        index.add(session, mailbox, builder1.build(id1));
+        index.add(session, mailbox, builder1.build(id1)).block();
 
         uid2 = MessageUid.of(1);
         MessageBuilder builder2 = new MessageBuilder()
@@ -144,7 +144,7 @@ class LuceneMailboxMessageSearchIndexTest {
             .internalDate(new Date())
             .body("My Body".getBytes(StandardCharsets.UTF_8))
             .size(20);
-        index.add(session, mailbox2, builder2.build(id2));
+        index.add(session, mailbox2, builder2.build(id2)).block();
         
         uid3 = MessageUid.of(2);
         Calendar cal = Calendar.getInstance();
@@ -157,7 +157,7 @@ class LuceneMailboxMessageSearchIndexTest {
             .internalDate(cal.getTime())
             .body("My Otherbody".getBytes(StandardCharsets.UTF_8))
             .size(20);
-        index.add(session, mailbox, builder3.build(id3));
+        index.add(session, mailbox, builder3.build(id3)).block();
         
         uid4 = MessageUid.of(3);
         Calendar cal2 = Calendar.getInstance();
@@ -170,7 +170,7 @@ class LuceneMailboxMessageSearchIndexTest {
             .internalDate(cal2.getTime())
             .body("My Otherbody2".getBytes(StandardCharsets.UTF_8))
             .size(20);
-        index.add(session, mailbox, builder4.build(id4));
+        index.add(session, mailbox, builder4.build(id4)).block();
         
         uid5 = MessageUid.of(10);
         MessageBuilder builder = new MessageBuilder();
@@ -181,7 +181,7 @@ class LuceneMailboxMessageSearchIndexTest {
         builder.body(StandardCharsets.US_ASCII.encode(BODY).array());
         builder.uid(uid5);
         builder.mailboxId(TEST_ID_3);
-        index.add(session, mailbox3, builder.build(id5));
+        index.add(session, mailbox3, builder.build(id5)).block();
 
     }
 
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
index 6e75f26..eac5ee5 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 /**
  * {@link ListeningMessageSearchIndex} implementation which wraps another {@link ListeningMessageSearchIndex} and will forward all calls to it.
@@ -88,18 +89,18 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
     }
 
     @Override
-    public void add(MailboxSession session, Mailbox mailbox, MailboxMessage message) throws Exception {
-        index.add(session, mailbox, message);
+    public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message) {
+        return index.add(session, mailbox, message);
     }
 
     @Override
-    public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) throws Exception {
-        index.delete(session, mailbox, expungedUids);
+    public Mono<Void> delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
+        return index.delete(session, mailbox, expungedUids);
     }
 
     @Override
-    public void deleteAll(MailboxSession session, MailboxId mailboxId) throws Exception {
-        index.deleteAll(session, mailboxId);
+    public Mono<Void> deleteAll(MailboxSession session, MailboxId mailboxId) {
+        return index.deleteAll(session, mailboxId);
     }
 
     /**
@@ -125,7 +126,7 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
                 while (messages.hasNext()) {
                     final MailboxMessage message = messages.next();
                     try {
-                        add(session, mailbox, message);
+                        add(session, mailbox, message).block();
                     } catch (Exception e) {
                         LOGGER.error("Unable to index message {} in mailbox {}", message.getUid(), mailbox.getName(), e);
                     }
@@ -137,8 +138,8 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
     }
 
     @Override
-    public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws Exception {
-        index.update(session, mailbox, updatedFlagsList);
+    public Mono<Void> update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
+        return index.update(session, mailbox, updatedFlagsList);
     }
     
 
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
index e38c8cd..77a7a8b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
@@ -20,7 +20,6 @@ package org.apache.james.mailbox.store.search;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.stream.Stream;
 
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageUid;
@@ -38,14 +37,16 @@ import org.apache.james.util.streams.Iterators;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 /**
  * {@link MessageSearchIndex} which needs to get registered as global {@link MailboxListener} and so get
  * notified about message changes. This will then allow to update the underlying index.
  */
-public abstract class ListeningMessageSearchIndex implements MessageSearchIndex, MailboxListener.GroupMailboxListener {
+public abstract class ListeningMessageSearchIndex implements MessageSearchIndex, MailboxListener.ReactiveGroupMailboxListener {
     private static final Logger LOGGER = LoggerFactory.getLogger(ListeningMessageSearchIndex.class);
 
     protected static final int UNLIMITED = -1;
@@ -68,45 +69,53 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex,
      * something relevant is received
      */
     @Override
-    public void event(Event event) throws Exception {
-        handleMailboxEvent(event,
+    public Mono<Void> reactiveEvent(Event event) {
+        return handleMailboxEvent(event,
             sessionProvider.createSystemSession(event.getUsername()),
             (MailboxEvent) event);
     }
 
-    private void handleMailboxEvent(Event event, MailboxSession session, MailboxEvent mailboxEvent) throws Exception {
+    private Mono<Void> handleMailboxEvent(Event event, MailboxSession session, MailboxEvent mailboxEvent) {
         MailboxId mailboxId = mailboxEvent.getMailboxId();
 
         if (event instanceof Added) {
-            Mailbox mailbox = factory.getMailboxMapper(session).findMailboxById(mailboxId);
-            handleAdded(session, mailbox, (Added) event);
+            return factory.getMailboxMapper(session)
+                .findMailboxByIdReactive(mailboxId)
+                .flatMap(mailbox -> handleAdded(session, mailbox, (Added) event));
         } else if (event instanceof Expunged) {
-            Mailbox mailbox = factory.getMailboxMapper(session).findMailboxById(mailboxId);
             Expunged expunged = (Expunged) event;
-            delete(session, mailbox, expunged.getUids());
+
+            return factory.getMailboxMapper(session)
+                .findMailboxByIdReactive(mailboxId)
+                .flatMap(mailbox -> delete(session, mailbox, expunged.getUids()));
         } else if (event instanceof FlagsUpdated) {
-            Mailbox mailbox = factory.getMailboxMapper(session).findMailboxById(mailboxId);
             FlagsUpdated flagsUpdated = (FlagsUpdated) event;
-            update(session, mailbox, flagsUpdated.getUpdatedFlags());
+
+            return factory.getMailboxMapper(session)
+                .findMailboxByIdReactive(mailboxId)
+                .flatMap(mailbox -> update(session, mailbox, flagsUpdated.getUpdatedFlags()));
         } else if (event instanceof MailboxDeletion) {
-            deleteAll(session, mailboxId);
+            return deleteAll(session, mailboxId);
+        } else {
+            return Mono.empty();
         }
     }
 
-    private void handleAdded(MailboxSession session, Mailbox mailbox, Added added) {
-        MessageRange.toRanges(added.getUids())
-            .stream()
-            .flatMap(range -> retrieveMailboxMessages(session, mailbox, range))
-            .forEach(Throwing.<MailboxMessage>consumer(mailboxMessage -> add(session, mailbox, mailboxMessage)).sneakyThrow());
+    private Mono<Void> handleAdded(MailboxSession session, Mailbox mailbox, Added added) {
+        return Flux.fromIterable(MessageRange.toRanges(added.getUids()))
+            .concatMap(range -> retrieveMailboxMessages(session, mailbox, range))
+            .concatMap(mailboxMessage -> add(session, mailbox, mailboxMessage))
+            .then();
     }
 
-    private Stream<MailboxMessage> retrieveMailboxMessages(MailboxSession session, Mailbox mailbox, MessageRange range) {
+    private Flux<MailboxMessage> retrieveMailboxMessages(MailboxSession session, Mailbox mailbox, MessageRange range) {
         try {
-            return Iterators.toStream(factory.getMessageMapper(session)
+            return Iterators.toFlux(factory.getMessageMapper(session)
                 .findInMailbox(mailbox, range, FetchType.Full, UNLIMITED));
         } catch (Exception e) {
+            // todo this error handling makes us loose messages!
             LOGGER.error("Could not retrieve message {} in mailbox {}", range.toString(), mailbox.getMailboxId().serialize(), e);
-            return Stream.empty();
+            return Flux.empty();
         }
     }
 
@@ -117,7 +126,7 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex,
      * @param mailbox mailbox on which the message addition was performed
      * @param message The added message
      */
-    public abstract void add(MailboxSession session, Mailbox mailbox, MailboxMessage message) throws Exception;
+    public abstract Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message);
 
     /**
      * Delete the concerned UIDs for the given {@link Mailbox} from the index
@@ -126,7 +135,7 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex,
      * @param mailbox      mailbox on which the expunge was performed
      * @param expungedUids UIDS to be deleted
      */
-    public abstract void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) throws Exception;
+    public abstract Mono<Void> delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids);
 
     /**
      * Delete the messages contained in the given {@link Mailbox} from the index
@@ -134,7 +143,7 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex,
      * @param session The mailbox session performing the expunge
      * @param mailboxId mailboxId on which the expunge was performed
      */
-    public abstract void deleteAll(MailboxSession session, MailboxId mailboxId) throws Exception;
+    public abstract Mono<Void> deleteAll(MailboxSession session, MailboxId mailboxId);
 
     /**
      * Update the messages concerned by the updated flags list for the given {@link Mailbox}
@@ -143,5 +152,5 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex,
      * @param mailbox          mailbox containing the updated messages
      * @param updatedFlagsList list of flags that were updated
      */
-    public abstract void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws Exception;
+    public abstract Mono<Void> update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList);
 }
diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
index 0f8cb27..8e06511 100644
--- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
+++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java
@@ -74,7 +74,7 @@ public class ReIndexerPerformer {
         LOGGER.info("Intend to reindex mailbox with mailboxId {}", mailboxId.serialize());
         MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER);
         Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
-        messageSearchIndex.deleteAll(mailboxSession, mailboxId);
+        messageSearchIndex.deleteAll(mailboxSession, mailboxId).block();
         try {
             return Iterators.toStream(
                 mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
@@ -163,7 +163,7 @@ public class ReIndexerPerformer {
         try {
             MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session);
             Mailbox mailbox = mailboxMapper.findMailboxById(mailboxMessage.getMailboxId());
-            messageSearchIndex.add(session, mailbox, mailboxMessage);
+            messageSearchIndex.add(session, mailbox, mailboxMessage).block();
             return Task.Result.COMPLETED;
         } catch (Exception e) {
             LOGGER.warn("Failed to re-index {} in {}", mailboxMessage.getUid(), mailboxMessage.getMailboxId(), e);
@@ -189,7 +189,7 @@ public class ReIndexerPerformer {
         try {
             Optional.of(uid)
                 .flatMap(Throwing.function(mUid -> fullyReadMessage(mailboxSession, mailbox, mUid)))
-                .ifPresent(Throwing.consumer(message -> messageSearchIndex.add(mailboxSession, mailbox, message)));
+                .ifPresent(message -> messageSearchIndex.add(mailboxSession, mailbox, message).block());
             reprocessingContext.recordSuccess();
             return Task.Result.COMPLETED;
         } catch (Exception e) {
diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
index 2362609..c147110 100644
--- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
+++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/CassandraReIndexerImplTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
@@ -51,6 +52,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.google.common.base.Strings;
 
+import reactor.core.publisher.Mono;
+
 public class CassandraReIndexerImplTest {
     private static final Username USERNAME = Username.of("benwa@apache.org");
     public static final MailboxPath INBOX = MailboxPath.inbox(USERNAME);
@@ -67,6 +70,8 @@ public class CassandraReIndexerImplTest {
         mailboxManager = CassandraMailboxManagerProvider.provideMailboxManager(cassandra, PreDeletionHooks.NO_PRE_DELETION_HOOK);
         MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory();
         messageSearchIndex = mock(ListeningMessageSearchIndex.class);
+        when(messageSearchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+        when(messageSearchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
         reIndexer = new ReIndexerImpl(new ReIndexerPerformer(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory),
             mailboxManager, mailboxSessionMapperFactory);
     }
diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/MessageIdReIndexerImplTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/MessageIdReIndexerImplTest.java
index 5aefb71..823114f 100644
--- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/MessageIdReIndexerImplTest.java
+++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/MessageIdReIndexerImplTest.java
@@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.MailboxSession;
@@ -41,6 +42,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 
+import reactor.core.publisher.Mono;
+
 public class MessageIdReIndexerImplTest {
     private static final Username USERNAME = Username.of("benwa@apache.org");
     public static final MailboxPath INBOX = MailboxPath.inbox(USERNAME);
@@ -56,6 +59,7 @@ public class MessageIdReIndexerImplTest {
         mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
         MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory();
         messageSearchIndex = mock(ListeningMessageSearchIndex.class);
+        when(messageSearchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
         reindexerPerformer = new ReIndexerPerformer(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory);
         reIndexer = new MessageIdReIndexerImpl(reindexerPerformer);
     }
diff --git a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java
index f38f39c..d5b15e7 100644
--- a/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java
+++ b/mailbox/tools/indexer/src/test/java/org/apache/mailbox/tools/indexer/ReIndexerImplTest.java
@@ -25,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.MailboxSession;
@@ -46,6 +47,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 
+import reactor.core.publisher.Mono;
+
 public class ReIndexerImplTest {
 
     private static final Username USERNAME = Username.of("benwa@apache.org");
@@ -60,6 +63,8 @@ public class ReIndexerImplTest {
         mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
         MailboxSessionMapperFactory mailboxSessionMapperFactory = mailboxManager.getMapperFactory();
         messageSearchIndex = mock(ListeningMessageSearchIndex.class);
+        when(messageSearchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+        when(messageSearchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
         reIndexer = new ReIndexerImpl(new ReIndexerPerformer(mailboxManager, messageSearchIndex, mailboxSessionMapperFactory),
             mailboxManager, mailboxSessionMapperFactory);
     }
diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
index f9b866c..46fe6d0 100644
--- a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
+++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
@@ -39,6 +39,7 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
     private static class FakeMessageSearchIndexGroup extends Group {
@@ -52,22 +53,22 @@ public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
     }
 
     @Override
-    public void add(MailboxSession session, Mailbox mailbox, MailboxMessage message) throws Exception {
+    public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message) {
         throw new NotImplementedException("not implemented");
     }
 
     @Override
-    public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) throws Exception {
+    public Mono<Void> delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
         throw new NotImplementedException("not implemented");
     }
 
     @Override
-    public void deleteAll(MailboxSession session, MailboxId mailboxId) throws Exception {
+    public Mono<Void> deleteAll(MailboxSession session, MailboxId mailboxId) {
         throw new NotImplementedException("not implemented");
     }
 
     @Override
-    public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws Exception {
+    public Mono<Void> update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
         throw new NotImplementedException("not implemented");
     }
 
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
index 9d211d4..05a333c 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MailboxesRoutesTest.java
@@ -26,7 +26,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
@@ -64,9 +64,11 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableSet;
 
 import io.restassured.RestAssured;
+import reactor.core.publisher.Mono;
 
 class MailboxesRoutesTest {
     private static final Username USERNAME = Username.of("benwa@apache.org");
@@ -83,6 +85,8 @@ class MailboxesRoutesTest {
         taskManager = new MemoryTaskManager(new Hostname("foo"));
         InMemoryId.Factory mailboxIdFactory = new InMemoryId.Factory();
         searchIndex = mock(ListeningMessageSearchIndex.class);
+        Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+        Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
         ReIndexerPerformer reIndexerPerformer = new ReIndexerPerformer(
             mailboxManager,
             searchIndex,
@@ -750,7 +754,7 @@ class MailboxesRoutesTest {
                     .basePath(TasksRoutes.BASE)
                     .get(taskId + "/await");
 
-                doNothing().when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
+                doReturn(Mono.empty()).when(searchIndex).add(any(MailboxSession.class), any(Mailbox.class), any(MailboxMessage.class));
 
                 String fixingTaskId = with()
                     .queryParam("reIndexFailedMessagesOf", taskId)
@@ -843,6 +847,8 @@ class MailboxesRoutesTest {
                     .get(taskId + "/await");
 
                 reset(searchIndex);
+                Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+                Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
 
                 String fixingTaskId = with()
                     .queryParam("reIndexFailedMessagesOf", taskId)
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesTest.java
index 02be730..45c4d98 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/MessageRoutesTest.java
@@ -55,9 +55,12 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
-import org.testcontainers.shaded.com.google.common.collect.ImmutableSet;
+import org.mockito.Mockito;
+
+import com.google.common.collect.ImmutableSet;
 
 import io.restassured.RestAssured;
+import reactor.core.publisher.Mono;
 
 class MessageRoutesTest {
     private static final Username USERNAME = Username.of("benwa@apache.org");
@@ -72,7 +75,9 @@ class MessageRoutesTest {
     void beforeEach() {
         taskManager = new MemoryTaskManager(new Hostname("foo"));
         mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
-        searchIndex = mock(ListeningMessageSearchIndex.class);
+        searchIndex = mock(ListeningMessageSearchIndex.class);;
+        Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+        Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
         ReIndexerPerformer reIndexerPerformer = new ReIndexerPerformer(
             mailboxManager,
             searchIndex,
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
index 7b6875d..5d21359 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/UserMailboxesRoutesTest.java
@@ -78,6 +78,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -103,6 +104,8 @@ class UserMailboxesRoutesTest {
 
         taskManager = new MemoryTaskManager(new Hostname("foo"));
         searchIndex = mock(ListeningMessageSearchIndex.class);
+        Mockito.when(searchIndex.add(any(), any(), any())).thenReturn(Mono.empty());
+        Mockito.when(searchIndex.deleteAll(any(), any())).thenReturn(Mono.empty());
         ReIndexerPerformer reIndexerPerformer = new ReIndexerPerformer(
             mailboxManager,
             searchIndex,


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org