You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/08 02:25:48 UTC

[james-project] 03/06: JAMES-3144 Use ElasticSearch reactively

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 97e06e9ccafa1d5a8869fdcc2b5ead42564a3c90
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Fri Mar 13 16:05:21 2020 +0100

    JAMES-3144 Use ElasticSearch reactively
---
 .../apache/james/backends/es/ClientProvider.java   |  12 +-
 .../james/backends/es/DeleteByQueryPerformer.java  |  17 +-
 .../backends/es/ElasticSearchHealthCheck.java      |   5 +-
 .../james/backends/es/ElasticSearchIndexer.java    |  67 ++++----
 .../james/backends/es/IndexCreationFactory.java    |  11 +-
 .../apache/james/backends/es/ListenerToFuture.java |  51 ------
 .../james/backends/es/NodeMappingFactory.java      |   7 +-
 .../backends/es/ReactorElasticSearchClient.java    | 171 +++++++++++++++++++++
 .../james/backends/es/search/ScrolledSearch.java   | 113 ++++++--------
 .../es/ClientProviderImplConnectionContract.java   |   5 +-
 .../es/ElasticSearchHealthCheckConnectionTest.java |   3 +-
 .../backends/es/ElasticSearchIndexerTest.java      |  72 +++++----
 .../backends/es/IndexCreationFactoryTest.java      |   3 +-
 .../backends/es/NodeMappingFactoryAuthTest.java    |   3 +-
 .../james/backends/es/NodeMappingFactoryTest.java  |   3 +-
 .../backends/es/search/ScrolledSearchTest.java     |  37 +++--
 .../elasticsearch/MailboxIndexCreationUtil.java    |  14 +-
 .../ElasticSearchListeningMessageSearchIndex.java  |  38 +++--
 .../search/ElasticSearchSearcher.java              |  23 +--
 .../ElasticSearchIntegrationTest.java              |   4 +-
 ...asticSearchListeningMessageSearchIndexTest.java |  10 +-
 .../search/ElasticSearchSearcherTest.java          |   4 +-
 .../elasticsearch/ElasticSearchQuotaSearcher.java  |  35 ++---
 .../QuotaSearchIndexCreationUtil.java              |   6 +-
 .../events/ElasticSearchQuotaMailboxListener.java  |   8 +-
 ...lasticSearchQuotaSearchTestSystemExtension.java |   4 +-
 .../ElasticSearchQuotaMailboxListenerTest.java     |  13 +-
 .../host/ElasticSearchHostSystem.java              |   4 +-
 .../modules/mailbox/ElasticSearchClientModule.java |   4 +-
 .../mailbox/ElasticSearchMailboxModule.java        |  10 +-
 .../mailbox/ElasticSearchQuotaSearcherModule.java  |  10 +-
 .../modules/mailbox/ElasticSearchStartUpCheck.java |   6 +-
 .../test/java/org/apache/james/ESReporterTest.java |   5 +-
 .../routes/ElasticSearchQuotaSearchExtension.java  |   4 +-
 34 files changed, 450 insertions(+), 332 deletions(-)

diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
index b879a88..247ed50 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
@@ -57,7 +57,7 @@ import com.google.common.annotations.VisibleForTesting;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
-public class ClientProvider implements Provider<RestHighLevelClient> {
+public class ClientProvider implements Provider<ReactorElasticSearchClient> {
 
     private static class HttpAsyncClientConfigurer {
 
@@ -165,15 +165,17 @@ public class ClientProvider implements Provider<RestHighLevelClient> {
     private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class);
 
     private final ElasticSearchConfiguration configuration;
-    private final RestHighLevelClient client;
+    private final RestHighLevelClient elasticSearchRestHighLevelClient;
     private final HttpAsyncClientConfigurer httpAsyncClientConfigurer;
+    private final ReactorElasticSearchClient client;
 
     @Inject
     @VisibleForTesting
     ClientProvider(ElasticSearchConfiguration configuration) {
         this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration);
         this.configuration = configuration;
-        this.client = connect(configuration);
+        this.elasticSearchRestHighLevelClient = connect(configuration);
+        this.client = new ReactorElasticSearchClient(this.elasticSearchRestHighLevelClient);
     }
 
     private RestHighLevelClient connect(ElasticSearchConfiguration configuration) {
@@ -206,12 +208,12 @@ public class ClientProvider implements Provider<RestHighLevelClient> {
     }
 
     @Override
-    public RestHighLevelClient get() {
+    public ReactorElasticSearchClient get() {
         return client;
     }
 
     @PreDestroy
     public void close() throws IOException {
-        client.close();
+        elasticSearchRestHighLevelClient.close();
     }
 }
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
index 66ad682..5277694 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
@@ -26,7 +26,6 @@ import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.search.SearchHit;
@@ -34,18 +33,17 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class DeleteByQueryPerformer {
     private static final TimeValue TIMEOUT = new TimeValue(60000);
 
-    private final RestHighLevelClient client;
+    private final ReactorElasticSearchClient client;
     private final int batchSize;
     private final WriteAliasName aliasName;
 
     @VisibleForTesting
-    DeleteByQueryPerformer(RestHighLevelClient client, int batchSize, WriteAliasName aliasName) {
+    DeleteByQueryPerformer(ReactorElasticSearchClient client, int batchSize, WriteAliasName aliasName) {
         this.client = client;
         this.batchSize = batchSize;
         this.aliasName = aliasName;
@@ -54,9 +52,10 @@ public class DeleteByQueryPerformer {
     public Mono<Void> perform(QueryBuilder queryBuilder, RoutingKey routingKey) {
         SearchRequest searchRequest = prepareSearch(queryBuilder, routingKey);
 
-        return Flux.fromStream(new ScrolledSearch(client, searchRequest).searchResponses())
-            .flatMap(searchResponse -> deleteRetrievedIds(client, searchResponse, routingKey))
-            .thenEmpty(Mono.empty());
+        return new ScrolledSearch(client, searchRequest).searchResponses()
+            .filter(searchResponse -> searchResponse.getHits().getHits().length > 0)
+            .flatMap(searchResponse -> deleteRetrievedIds(searchResponse, routingKey))
+            .then();
     }
 
     private SearchRequest prepareSearch(QueryBuilder queryBuilder, RoutingKey routingKey) {
@@ -73,7 +72,7 @@ public class DeleteByQueryPerformer {
             .size(batchSize);
     }
 
-    private Mono<BulkResponse> deleteRetrievedIds(RestHighLevelClient client, SearchResponse searchResponse, RoutingKey routingKey) {
+    private Mono<BulkResponse> deleteRetrievedIds(SearchResponse searchResponse, RoutingKey routingKey) {
         BulkRequest request = new BulkRequest();
 
         for (SearchHit hit : searchResponse.getHits()) {
@@ -84,6 +83,6 @@ public class DeleteByQueryPerformer {
                     .routing(routingKey.asString()));
         }
 
-        return Mono.fromCallable(() -> client.bulk(request, RequestOptions.DEFAULT));
+        return client.bulk(request, RequestOptions.DEFAULT);
     }
 }
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java
index 1305420..58cd813 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java
@@ -32,7 +32,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,10 +43,10 @@ public class ElasticSearchHealthCheck implements HealthCheck {
     private static final ComponentName COMPONENT_NAME = new ComponentName("ElasticSearch Backend");
 
     private final Set<IndexName> indexNames;
-    private final RestHighLevelClient client;
+    private final ReactorElasticSearchClient client;
 
     @Inject
-    ElasticSearchHealthCheck(RestHighLevelClient client, Set<IndexName> indexNames) {
+    ElasticSearchHealthCheck(ReactorElasticSearchClient client, Set<IndexName> indexNames) {
         this.client = client;
         this.indexNames = indexNames;
     }
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
index 99a9f80..0fcbc11 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
@@ -18,9 +18,7 @@
  ****************************************************************/
 package org.apache.james.backends.es;
 
-import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
 
 import org.apache.commons.lang3.StringUtils;
 import org.elasticsearch.action.bulk.BulkRequest;
@@ -30,7 +28,6 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.ValidationException;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.query.QueryBuilder;
@@ -40,23 +37,25 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Mono;
+
 public class ElasticSearchIndexer {
     private static final int DEBUG_MAX_LENGTH_CONTENT = 1000;
     private static final int DEFAULT_BATCH_SIZE = 100;
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class);
 
-    private final RestHighLevelClient client;
+    private final ReactorElasticSearchClient client;
     private final AliasName aliasName;
     private final DeleteByQueryPerformer deleteByQueryPerformer;
 
-    public ElasticSearchIndexer(RestHighLevelClient client,
+    public ElasticSearchIndexer(ReactorElasticSearchClient client,
                                 WriteAliasName aliasName) {
         this(client, aliasName, DEFAULT_BATCH_SIZE);
     }
 
     @VisibleForTesting
-    public ElasticSearchIndexer(RestHighLevelClient client,
+    public ElasticSearchIndexer(ReactorElasticSearchClient client,
                                 WriteAliasName aliasName,
                                 int batchSize) {
         this.client = client;
@@ -64,7 +63,7 @@ public class ElasticSearchIndexer {
         this.aliasName = aliasName;
     }
 
-    public IndexResponse index(DocumentId id, String content, RoutingKey routingKey) throws IOException {
+    public Mono<IndexResponse> index(DocumentId id, String content, RoutingKey routingKey) {
         checkArgument(content);
         logContent(id, content);
         return client.index(new IndexRequest(aliasName.getValue())
@@ -81,37 +80,37 @@ public class ElasticSearchIndexer {
         }
     }
 
-    public Optional<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts, RoutingKey routingKey) throws IOException {
-        try {
-            Preconditions.checkNotNull(updatedDocumentParts);
-            Preconditions.checkNotNull(routingKey);
-            BulkRequest request = new BulkRequest();
-            updatedDocumentParts.forEach(updatedDocumentPart -> request.add(
-                new UpdateRequest(aliasName.getValue(),
-                    NodeMappingFactory.DEFAULT_MAPPING_NAME,
-                    updatedDocumentPart.getId().asString())
+    public Mono<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts, RoutingKey routingKey) {
+        Preconditions.checkNotNull(updatedDocumentParts);
+        Preconditions.checkNotNull(routingKey);
+        BulkRequest request = new BulkRequest();
+        updatedDocumentParts.forEach(updatedDocumentPart -> request.add(
+            new UpdateRequest(aliasName.getValue(),
+                NodeMappingFactory.DEFAULT_MAPPING_NAME,
+                updatedDocumentPart.getId().asString())
                 .doc(updatedDocumentPart.getUpdatedDocumentPart(), XContentType.JSON)
                 .routing(routingKey.asString())));
-            return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
-        } catch (ValidationException e) {
-            LOGGER.warn("Error while updating index", e);
-            return Optional.empty();
-        }
+
+        return client.bulk(request, RequestOptions.DEFAULT)
+            .onErrorResume(ValidationException.class, exception -> {
+                LOGGER.warn("Error while updating index", exception);
+                return Mono.empty();
+            });
     }
 
-    public Optional<BulkResponse> delete(List<DocumentId> ids, RoutingKey routingKey) throws IOException {
-        try {
-            BulkRequest request = new BulkRequest();
-            ids.forEach(id -> request.add(
-                new DeleteRequest(aliasName.getValue())
-                    .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
-                    .id(id.asString())
-                    .routing(routingKey.asString())));
-            return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
-        } catch (ValidationException e) {
-            LOGGER.warn("Error while deleting index", e);
-            return Optional.empty();
-        }
+    public Mono<BulkResponse> delete(List<DocumentId> ids, RoutingKey routingKey) {
+        BulkRequest request = new BulkRequest();
+        ids.forEach(id -> request.add(
+            new DeleteRequest(aliasName.getValue())
+                .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
+                .id(id.asString())
+                .routing(routingKey.asString())));
+
+        return client.bulk(request, RequestOptions.DEFAULT)
+            .onErrorResume(ValidationException.class, exception -> {
+                LOGGER.warn("Error while deleting index", exception);
+                return Mono.empty();
+            });
     }
 
     public void deleteAllMatchingQuery(QueryBuilder queryBuilder, RoutingKey routingKey) {
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java
index 9d6f834..07e41e8 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java
@@ -31,7 +31,6 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,7 +62,7 @@ public class IndexCreationFactory {
             return this;
         }
 
-        public RestHighLevelClient createIndexAndAliases(RestHighLevelClient client) {
+        public ReactorElasticSearchClient createIndexAndAliases(ReactorElasticSearchClient client) {
             return new IndexCreationPerformer(nbShards, nbReplica, waitForActiveShards, indexName, aliases.build()).createIndexAndAliases(client);
         }
     }
@@ -83,7 +82,7 @@ public class IndexCreationFactory {
             this.aliases = aliases;
         }
 
-        public RestHighLevelClient createIndexAndAliases(RestHighLevelClient client) {
+        public ReactorElasticSearchClient createIndexAndAliases(ReactorElasticSearchClient client) {
             Preconditions.checkNotNull(indexName);
             try {
                 createIndexIfNeeded(client, indexName, generateSetting(nbShards, nbReplica, waitForActiveShards));
@@ -95,7 +94,7 @@ public class IndexCreationFactory {
             return client;
         }
 
-        private void createAliasIfNeeded(RestHighLevelClient client, IndexName indexName, AliasName aliasName) throws IOException {
+        private void createAliasIfNeeded(ReactorElasticSearchClient client, IndexName indexName, AliasName aliasName) throws IOException {
             if (!aliasExist(client, aliasName)) {
                 client.indices()
                     .updateAliases(
@@ -107,12 +106,12 @@ public class IndexCreationFactory {
             }
         }
 
-        private boolean aliasExist(RestHighLevelClient client, AliasName aliasName) throws IOException {
+        private boolean aliasExist(ReactorElasticSearchClient client, AliasName aliasName) throws IOException {
             return client.indices()
                 .existsAlias(new GetAliasesRequest().aliases(aliasName.getValue()), RequestOptions.DEFAULT);
         }
 
-        private void createIndexIfNeeded(RestHighLevelClient client, IndexName indexName, XContentBuilder settings) throws IOException {
+        private void createIndexIfNeeded(ReactorElasticSearchClient client, IndexName indexName, XContentBuilder settings) throws IOException {
             try {
                 client.indices()
                     .create(
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ListenerToFuture.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ListenerToFuture.java
deleted file mode 100644
index 1df57b0..0000000
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ListenerToFuture.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.backends.es;
-
-import java.util.concurrent.CompletableFuture;
-
-import org.elasticsearch.action.ActionListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ListenerToFuture<T> implements ActionListener<T> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ListenerToFuture.class);
-
-    private CompletableFuture<T> future;
-
-    public ListenerToFuture() {
-        this.future = new CompletableFuture<>();
-    }
-
-    @Override
-    public void onResponse(T t) {
-        future.complete(t);
-    }
-
-    @Override
-    public void onFailure(Exception e) {
-        LOGGER.warn("Error while waiting ElasticSearch query execution: ", e);
-        future.completeExceptionally(e);
-    }
-
-    public CompletableFuture<T> getFuture() {
-        return future;
-    }
-}
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java
index f4359f4..25b82e7 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java
@@ -25,7 +25,6 @@ import org.apache.http.HttpStatus;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.ResponseException;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
 public class NodeMappingFactory {
@@ -55,7 +54,7 @@ public class NodeMappingFactory {
     public static final String SNOWBALL = "snowball";
     public static final String IGNORE_ABOVE = "ignore_above";
 
-    public static RestHighLevelClient applyMapping(RestHighLevelClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
+    public static ReactorElasticSearchClient applyMapping(ReactorElasticSearchClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
         if (!mappingAlreadyExist(client, indexName)) {
             createMapping(client, indexName, mappingsSources);
         }
@@ -64,7 +63,7 @@ public class NodeMappingFactory {
 
     // ElasticSearch 6.3.2 does not support field master_timeout that is set up by 6.4.3 REST client when relying on getMapping
     @SuppressWarnings("deprecation")
-    public static boolean mappingAlreadyExist(RestHighLevelClient client, IndexName indexName) throws IOException {
+    public static boolean mappingAlreadyExist(ReactorElasticSearchClient client, IndexName indexName) throws IOException {
         try {
             client.getLowLevelClient().performRequest("GET", "/" + indexName.getValue() + "/_mapping/" + NodeMappingFactory.DEFAULT_MAPPING_NAME);
             return true;
@@ -76,7 +75,7 @@ public class NodeMappingFactory {
         return false;
     }
 
-    public static void createMapping(RestHighLevelClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
+    public static void createMapping(ReactorElasticSearchClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
         client.indices().putMapping(
             new PutMappingRequest(indexName.getValue())
                 .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java
new file mode 100644
index 0000000..3b0c18b
--- /dev/null
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java
@@ -0,0 +1,171 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.backends.es;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
+import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptResponse;
+import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
+import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.explain.ExplainRequest;
+import org.elasticsearch.action.explain.ExplainResponse;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.main.MainResponse;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.ClearScrollResponse;
+import org.elasticsearch.action.search.MultiSearchRequest;
+import org.elasticsearch.action.search.MultiSearchResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.ClusterClient;
+import org.elasticsearch.client.IndicesClient;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.rankeval.RankEvalRequest;
+import org.elasticsearch.index.rankeval.RankEvalResponse;
+import org.elasticsearch.script.mustache.MultiSearchTemplateRequest;
+import org.elasticsearch.script.mustache.MultiSearchTemplateResponse;
+import org.elasticsearch.script.mustache.SearchTemplateRequest;
+import org.elasticsearch.script.mustache.SearchTemplateResponse;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
+import reactor.core.scheduler.Schedulers;
+
+public class ReactorElasticSearchClient implements AutoCloseable {
+    private final RestHighLevelClient client;
+
+    public ReactorElasticSearchClient(RestHighLevelClient client) {
+        this.client = client;
+    }
+
+    public Mono<BulkResponse> bulk(BulkRequest bulkRequest, RequestOptions options) {
+        return toReactor(listener -> client.bulkAsync(bulkRequest, options, listener));
+    }
+
+    public Mono<ClearScrollResponse> clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) {
+        return toReactor(listener -> client.clearScrollAsync(clearScrollRequest, options, listener));
+    }
+
+    public ClusterClient cluster() {
+        return client.cluster();
+    }
+
+    public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException {
+        return client.delete(deleteRequest, options);
+    }
+
+    public Mono<DeleteStoredScriptResponse> deleteScript(DeleteStoredScriptRequest request, RequestOptions options) {
+        return toReactor(listener -> client.deleteScriptAsync(request, options, listener));
+    }
+
+    public Mono<ExplainResponse> explain(ExplainRequest explainRequest, RequestOptions options) {
+        return toReactor(listener -> client.explainAsync(explainRequest, options, listener));
+    }
+
+    public Mono<FieldCapabilitiesResponse> fieldCaps(FieldCapabilitiesRequest fieldCapabilitiesRequest, RequestOptions options) {
+        return toReactor(listener -> client.fieldCapsAsync(fieldCapabilitiesRequest, options, listener));
+    }
+
+    public RestClient getLowLevelClient() {
+        return client.getLowLevelClient();
+    }
+
+    public Mono<GetStoredScriptResponse> getScript(GetStoredScriptRequest request, RequestOptions options) {
+        return toReactor(listener -> client.getScriptAsync(request, options, listener));
+    }
+
+    public Mono<IndexResponse> index(IndexRequest indexRequest, RequestOptions options) {
+        return toReactor(listener -> client.indexAsync(indexRequest, options, listener));
+    }
+
+    public IndicesClient indices() {
+        return client.indices();
+    }
+
+    public MainResponse info(RequestOptions options) throws IOException {
+        return client.info(options);
+    }
+
+    public Mono<MultiSearchResponse> msearch(MultiSearchRequest multiSearchRequest, RequestOptions options) {
+        return toReactor(listener -> client.msearchAsync(multiSearchRequest, options, listener));
+    }
+
+    public Mono<MultiSearchTemplateResponse> msearchTemplate(MultiSearchTemplateRequest multiSearchTemplateRequest, RequestOptions options) {
+        return toReactor(listener -> client.msearchTemplateAsync(multiSearchTemplateRequest, options, listener));
+    }
+
+    public Mono<RankEvalResponse> rankEval(RankEvalRequest rankEvalRequest, RequestOptions options) {
+        return toReactor(listener -> client.rankEvalAsync(rankEvalRequest, options, listener));
+    }
+
+    public Mono<SearchResponse> scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) {
+        return toReactor(listener -> client.scrollAsync(searchScrollRequest, options, listener));
+    }
+
+    @Deprecated
+    public Mono<SearchResponse> search(SearchRequest searchRequest) {
+        return toReactor(listener -> client.searchAsync(searchRequest, listener));
+    }
+
+    public Mono<SearchResponse> search(SearchRequest searchRequest, RequestOptions options) {
+        return toReactor(listener -> client.searchAsync(searchRequest, options, listener));
+    }
+
+    public Mono<SearchTemplateResponse> searchTemplate(SearchTemplateRequest searchTemplateRequest, RequestOptions options) {
+        return toReactor(listener -> client.searchTemplateAsync(searchTemplateRequest, options, listener));
+    }
+
+    @Override
+    public void close() throws IOException {
+        client.close();
+    }
+
+    private static <T> Mono<T> toReactor(Consumer<ActionListener<T>> async) {
+        return Mono.<T>create(sink -> async.accept(getListener(sink)))
+            .subscribeOn(Schedulers.elastic());
+    }
+
+    private static <T> ActionListener<T> getListener(MonoSink<T> sink) {
+        return new ActionListener<T>() {
+            @Override
+            public void onResponse(T t) {
+                sink.success(t);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                sink.error(e);
+            }
+        };
+    }
+}
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
index 834f801..10146ea 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
@@ -19,94 +19,79 @@
 
 package org.apache.james.backends.es.search;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
-
-import org.apache.james.backends.es.ListenerToFuture;
-import org.apache.james.util.streams.Iterators;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.elasticsearch.action.search.ClearScrollRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchScrollRequest;
 import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.search.SearchHit;
 
-import com.github.fge.lambdas.Throwing;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class ScrolledSearch {
-    private static class ScrollIterator implements Iterator<SearchResponse>, Closeable {
-        private final RestHighLevelClient client;
-        private CompletableFuture<SearchResponse> searchResponseFuture;
+    private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
 
-        ScrollIterator(RestHighLevelClient client, SearchRequest searchRequest) {
-            this.client = client;
-            ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>();
-            client.searchAsync(searchRequest, RequestOptions.DEFAULT, listener);
+    private final ReactorElasticSearchClient client;
+    private final SearchRequest searchRequest;
 
-            this.searchResponseFuture = listener.getFuture();
-        }
+    public ScrolledSearch(ReactorElasticSearchClient client, SearchRequest searchRequest) {
+        this.client = client;
+        this.searchRequest = searchRequest;
+    }
 
-        @Override
-        public void close() throws IOException {
-            ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
-            clearScrollRequest.addScrollId(searchResponseFuture.join().getScrollId());
-            client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
-        }
 
-        @Override
-        public boolean hasNext() {
-            SearchResponse join = searchResponseFuture.join();
-            return !allSearchResponsesConsumed(join);
-        }
+    public Flux<SearchHit> searchHits() {
+        return searchResponses()
+            .flatMap(searchResponse -> Flux.fromArray(searchResponse.getHits().getHits()));
+    }
 
-        @Override
-        public SearchResponse next() {
-            SearchResponse result = searchResponseFuture.join();
-            ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>();
-            client.scrollAsync(
-                new SearchScrollRequest()
-                    .scrollId(result.getScrollId())
-                    .scroll(TIMEOUT),
-                RequestOptions.DEFAULT,
-                listener);
-            searchResponseFuture = listener.getFuture();
-            return result;
-        }
+    public Flux<SearchResponse> searchResponses() {
+        return ensureClosing(Flux.from(startScrolling(searchRequest))
+            .expand(this::nextResponse));
+    }
 
-        public Stream<SearchResponse> stream() {
-            return Iterators.toStream(this)
-                .onClose(Throwing.runnable(this::close));
-        }
+    private Mono<SearchResponse> startScrolling(SearchRequest searchRequest) {
+        return client.search(searchRequest, RequestOptions.DEFAULT);
+    }
 
-        private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
-            return searchResponse.getHits().getHits().length == 0;
+    public Mono<SearchResponse> nextResponse(SearchResponse previous) {
+        if (allSearchResponsesConsumed(previous)) {
+            return Mono.empty();
         }
-    }
 
-    private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
+        return client.scroll(
+            new SearchScrollRequest()
+                .scrollId(previous.getScrollId())
+                .scroll(TIMEOUT),
+            RequestOptions.DEFAULT);
+    }
 
-    private final RestHighLevelClient client;
-    private final SearchRequest searchRequest;
+    private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
+        return searchResponse.getHits().getHits().length == 0;
+    }
 
-    public ScrolledSearch(RestHighLevelClient client, SearchRequest searchRequest) {
-        this.client = client;
-        this.searchRequest = searchRequest;
+    private Flux<SearchResponse> ensureClosing(Flux<SearchResponse> origin) {
+        AtomicReference<SearchResponse> latest = new AtomicReference<>();
+        return origin
+            .doOnNext(latest::set)
+            .doOnTerminate(close(latest));
     }
 
-    public Stream<SearchHit> searchHits() {
-        return searchResponses()
-            .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()));
+    public Runnable close(AtomicReference<SearchResponse> latest) {
+        return () -> Optional.ofNullable(latest.getAndSet(null)).map(this::clearScroll);
     }
 
-    @SuppressWarnings("resource")
-    public Stream<SearchResponse> searchResponses() {
-        return new ScrollIterator(client, searchRequest)
-            .stream();
+    private Disposable clearScroll(SearchResponse current) {
+        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+        clearScrollRequest.addScrollId(current.getScrollId());
+
+        return client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT).subscribe();
     }
 }
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionContract.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionContract.java
index 05a7ec4..4a4328c 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionContract.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionContract.java
@@ -25,7 +25,6 @@ import org.apache.james.backends.es.ElasticSearchClusterExtension.ElasticSearchC
 import org.awaitility.Awaitility;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.junit.jupiter.api.Test;
@@ -75,11 +74,11 @@ interface ClientProviderImplConnectionContract {
     }
 
     default boolean isConnected(ClientProvider clientProvider) {
-        try (RestHighLevelClient client = clientProvider.get()) {
+        try (ReactorElasticSearchClient client = clientProvider.get()) {
             client.search(
                 new SearchRequest()
                     .source(new SearchSourceBuilder().query(QueryBuilders.existsQuery("any"))),
-                RequestOptions.DEFAULT);
+                RequestOptions.DEFAULT).block();
             return true;
         } catch (Exception e) {
             LOGGER.info("Caught exception while trying to connect", e);
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchHealthCheckConnectionTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchHealthCheckConnectionTest.java
index 51e1b2e..0f5ddd6 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchHealthCheckConnectionTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchHealthCheckConnectionTest.java
@@ -22,7 +22,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.time.Duration;
 
-import org.elasticsearch.client.RestHighLevelClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -38,7 +37,7 @@ class ElasticSearchHealthCheckConnectionTest {
 
     @BeforeEach
     void setUp() {
-        RestHighLevelClient client = elasticSearch.getDockerElasticSearch().clientProvider(REQUEST_TIMEOUT).get();
+        ReactorElasticSearchClient client = elasticSearch.getDockerElasticSearch().clientProvider(REQUEST_TIMEOUT).get();
 
         elasticSearchHealthCheck = new ElasticSearchHealthCheck(client, ImmutableSet.of());
     }
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
index c7372a5..f5f8a40 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
@@ -33,7 +33,6 @@ import org.awaitility.core.ConditionFactory;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.junit.jupiter.api.AfterEach;
@@ -62,7 +61,7 @@ class ElasticSearchIndexerTest {
     @RegisterExtension
     public DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
     private ElasticSearchIndexer testee;
-    private RestHighLevelClient client;
+    private ReactorElasticSearchClient client;
 
     @BeforeEach
     void setup() {
@@ -80,17 +79,18 @@ class ElasticSearchIndexerTest {
     }
 
     @Test
-    void indexMessageShouldWork() throws Exception {
+    void indexMessageShouldWork() {
         DocumentId documentId = DocumentId.fromString("1");
         String content = "{\"message\": \"trying out Elasticsearch\"}";
         
-        testee.index(documentId, content, useDocumentId(documentId));
+        testee.index(documentId, content, useDocumentId(documentId)).block();
         elasticSearch.awaitForElasticSearch();
-        
+
         SearchResponse searchResponse = client.search(
             new SearchRequest(INDEX_NAME.getValue())
                 .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "trying"))),
-            RequestOptions.DEFAULT);
+            RequestOptions.DEFAULT)
+            .block();
         assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
     }
     
@@ -101,64 +101,66 @@ class ElasticSearchIndexerTest {
     }
     
     @Test
-    void updateMessages() throws Exception {
+    void updateMessages() {
         String content = "{\"message\": \"trying out Elasticsearch\",\"field\":\"Should be unchanged\"}";
 
-        testee.index(DOCUMENT_ID, content, useDocumentId(DOCUMENT_ID));
+        testee.index(DOCUMENT_ID, content, useDocumentId(DOCUMENT_ID)).block();
         elasticSearch.awaitForElasticSearch();
 
-        testee.update(ImmutableList.of(new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), useDocumentId(DOCUMENT_ID));
+        testee.update(ImmutableList.of(new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), useDocumentId(DOCUMENT_ID)).block();
         elasticSearch.awaitForElasticSearch();
 
 
         SearchResponse searchResponse = client.search(
             new SearchRequest(INDEX_NAME.getValue())
                 .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "mastering"))),
-            RequestOptions.DEFAULT);
+            RequestOptions.DEFAULT)
+            .block();
         assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
 
         SearchResponse searchResponse2 = client.search(
             new SearchRequest(INDEX_NAME.getValue())
                 .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("field", "unchanged"))),
-            RequestOptions.DEFAULT);
+            RequestOptions.DEFAULT)
+            .block();
         assertThat(searchResponse2.getHits().getTotalHits()).isEqualTo(1);
     }
 
     @Test
     void updateMessageShouldThrowWhenJsonIsNull() {
         assertThatThrownBy(() -> testee.update(ImmutableList.of(
-                new UpdatedRepresentation(DOCUMENT_ID, null)), ROUTING))
+                new UpdatedRepresentation(DOCUMENT_ID, null)), ROUTING).block())
             .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
     void updateMessageShouldThrowWhenIdIsNull() {
         assertThatThrownBy(() -> testee.update(ImmutableList.of(
-                new UpdatedRepresentation(null, "{\"message\": \"mastering out Elasticsearch\"}")), ROUTING))
+                new UpdatedRepresentation(null, "{\"message\": \"mastering out Elasticsearch\"}")), ROUTING).block())
             .isInstanceOf(NullPointerException.class);
     }
 
     @Test
     void updateMessageShouldThrowWhenJsonIsEmpty() {
         assertThatThrownBy(() -> testee.update(ImmutableList.of(
-                new UpdatedRepresentation(DOCUMENT_ID, "")), ROUTING))
+                new UpdatedRepresentation(DOCUMENT_ID, "")), ROUTING).block())
             .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
     void updateMessageShouldThrowWhenRoutingKeyIsNull() {
         assertThatThrownBy(() -> testee.update(ImmutableList.of(
-                new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), null))
+                new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), null).block())
             .isInstanceOf(NullPointerException.class);
     }
 
     @Test
-    void deleteByQueryShouldWorkOnSingleMessage() throws Exception {
+    void deleteByQueryShouldWorkOnSingleMessage() {
         DocumentId documentId =  DocumentId.fromString("1:2");
         String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}";
         RoutingKey routingKey = useDocumentId(documentId);
 
-        testee.index(documentId, content, routingKey);
+        testee.index(documentId, content, routingKey).block();
         elasticSearch.awaitForElasticSearch();
         
         testee.deleteAllMatchingQuery(termQuery("property", "1"), routingKey);
@@ -169,25 +171,26 @@ class ElasticSearchIndexerTest {
                     new SearchRequest(INDEX_NAME.getValue())
                         .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
                     RequestOptions.DEFAULT)
+                .block()
                 .getHits().getTotalHits() == 0);
     }
 
     @Test
-    void deleteByQueryShouldWorkWhenMultipleMessages() throws Exception {
+    void deleteByQueryShouldWorkWhenMultipleMessages() {
         DocumentId documentId = DocumentId.fromString("1:1");
         String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}";
         
-        testee.index(documentId, content, ROUTING);
+        testee.index(documentId, content, ROUTING).block();
 
         DocumentId documentId2 = DocumentId.fromString("1:2");
         String content2 = "{\"message\": \"trying out Elasticsearch 2\", \"property\":\"1\"}";
         
-        testee.index(documentId2, content2, ROUTING);
+        testee.index(documentId2, content2, ROUTING).block();
 
         DocumentId documentId3 = DocumentId.fromString("2:3");
         String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"property\":\"2\"}";
         
-        testee.index(documentId3, content3, ROUTING);
+        testee.index(documentId3, content3, ROUTING).block();
         elasticSearch.awaitForElasticSearch();
 
         testee.deleteAllMatchingQuery(termQuery("property", "1"), ROUTING);
@@ -198,64 +201,67 @@ class ElasticSearchIndexerTest {
                     new SearchRequest(INDEX_NAME.getValue())
                         .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
                     RequestOptions.DEFAULT)
+                .block()
                 .getHits().getTotalHits() == 1);
     }
     
     @Test
-    void deleteMessage() throws Exception {
+    void deleteMessage() {
         DocumentId documentId = DocumentId.fromString("1:2");
         String content = "{\"message\": \"trying out Elasticsearch\"}";
 
-        testee.index(documentId, content, useDocumentId(documentId));
+        testee.index(documentId, content, useDocumentId(documentId)).block();
         elasticSearch.awaitForElasticSearch();
 
-        testee.delete(ImmutableList.of(documentId), useDocumentId(documentId));
+        testee.delete(ImmutableList.of(documentId), useDocumentId(documentId)).block();
         elasticSearch.awaitForElasticSearch();
         
         SearchResponse searchResponse = client.search(
             new SearchRequest(INDEX_NAME.getValue())
                 .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
-            RequestOptions.DEFAULT);
+            RequestOptions.DEFAULT)
+            .block();
         assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
     }
 
     @Test
-    void deleteShouldWorkWhenMultipleMessages() throws Exception {
+    void deleteShouldWorkWhenMultipleMessages() {
         DocumentId documentId = DocumentId.fromString("1:1");
         String content = "{\"message\": \"trying out Elasticsearch\", \"mailboxId\":\"1\"}";
 
-        testee.index(documentId, content, ROUTING);
+        testee.index(documentId, content, ROUTING).block();
 
         DocumentId documentId2 = DocumentId.fromString("1:2");
         String content2 = "{\"message\": \"trying out Elasticsearch 2\", \"mailboxId\":\"1\"}";
 
-        testee.index(documentId2, content2, ROUTING);
+        testee.index(documentId2, content2, ROUTING).block();
 
         DocumentId documentId3 = DocumentId.fromString("2:3");
         String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"mailboxId\":\"2\"}";
 
-        testee.index(documentId3, content3, ROUTING);
+        testee.index(documentId3, content3, ROUTING).block();
         elasticSearch.awaitForElasticSearch();
 
-        testee.delete(ImmutableList.of(documentId, documentId3), ROUTING);
+        testee.delete(ImmutableList.of(documentId, documentId3), ROUTING).block();
         elasticSearch.awaitForElasticSearch();
 
         SearchResponse searchResponse = client.search(
             new SearchRequest(INDEX_NAME.getValue())
                 .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
-            RequestOptions.DEFAULT);
+            RequestOptions.DEFAULT)
+            .block();
         assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
     }
     
     @Test
     void updateMessagesShouldNotThrowWhenEmptyList() {
-        assertThatCode(() -> testee.update(ImmutableList.of(), ROUTING))
+        assertThatCode(() -> testee.update(ImmutableList.of(), ROUTING).block())
             .doesNotThrowAnyException();
     }
     
     @Test
     void deleteMessagesShouldNotThrowWhenEmptyList() {
-        assertThatCode(() -> testee.delete(ImmutableList.of(), ROUTING))
+        assertThatCode(() -> testee.delete(ImmutableList.of(), ROUTING).block())
             .doesNotThrowAnyException();
     }
 }
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java
index 0862711..6ff2f7e 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java
@@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.IOException;
 
-import org.elasticsearch.client.RestHighLevelClient;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -35,7 +34,7 @@ class IndexCreationFactoryTest {
 
     @RegisterExtension
     public DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
-    private RestHighLevelClient client;
+    private ReactorElasticSearchClient client;
 
     @BeforeEach
     void setUp() {
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryAuthTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryAuthTest.java
index dddd34a..8c29fca 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryAuthTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryAuthTest.java
@@ -25,7 +25,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import java.io.IOException;
 import java.util.Optional;
 
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -42,7 +41,7 @@ class NodeMappingFactoryAuthTest {
         DockerAuthElasticSearchSingleton.INSTANCE,
         new DockerElasticSearch.WithAuth()));
 
-    private RestHighLevelClient client;
+    private ReactorElasticSearchClient client;
 
     @BeforeEach
     void setUp(ElasticSearchClusterExtension.ElasticSearchCluster esCluster) throws Exception {
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java
index 7d2f52e..e686473 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java
@@ -24,7 +24,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 
 import java.io.IOException;
 
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -38,7 +37,7 @@ class NodeMappingFactoryTest {
 
     @RegisterExtension
     public DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
-    private RestHighLevelClient client;
+    private ReactorElasticSearchClient client;
 
     @BeforeEach
     void setUp() throws Exception {
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java
index c4e9e2b..e07368e 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java
@@ -29,13 +29,13 @@ import org.apache.james.backends.es.ElasticSearchConfiguration;
 import org.apache.james.backends.es.IndexCreationFactory;
 import org.apache.james.backends.es.IndexName;
 import org.apache.james.backends.es.NodeMappingFactory;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.backends.es.ReadAliasName;
 import org.awaitility.Duration;
 import org.awaitility.core.ConditionFactory;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
@@ -56,7 +56,7 @@ class ScrolledSearchTest {
 
     @RegisterExtension
     public DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
-    private RestHighLevelClient client;
+    private ReactorElasticSearchClient client;
 
     @BeforeEach
     void setUp() {
@@ -81,7 +81,7 @@ class ScrolledSearchTest {
                 .query(QueryBuilders.matchAllQuery())
                 .size(SIZE));
 
-        assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+        assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
             .isEmpty();
     }
 
@@ -92,7 +92,8 @@ class ScrolledSearchTest {
                 .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
                 .id(id)
                 .source(MESSAGE, "Sample message"),
-            RequestOptions.DEFAULT);
+            RequestOptions.DEFAULT)
+        .block();
 
         elasticSearch.awaitForElasticSearch();
         WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id));
@@ -103,26 +104,28 @@ class ScrolledSearchTest {
                 .query(QueryBuilders.matchAllQuery())
                 .size(SIZE));
 
-        assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+        assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
             .extracting(SearchHit::getId)
             .containsOnly(id);
     }
 
     @Test
-    void scrollIterableShouldWorkWhenSizeElement() throws Exception {
+    void scrollIterableShouldWorkWhenSizeElement() {
         String id1 = "1";
         client.index(new IndexRequest(INDEX_NAME.getValue())
                 .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
                 .id(id1)
                 .source(MESSAGE, "Sample message"),
-            RequestOptions.DEFAULT);
+            RequestOptions.DEFAULT)
+            .block();
 
         String id2 = "2";
         client.index(new IndexRequest(INDEX_NAME.getValue())
                 .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
                 .id(id2)
                 .source(MESSAGE, "Sample message"),
-            RequestOptions.DEFAULT);
+            RequestOptions.DEFAULT)
+            .block();
 
         elasticSearch.awaitForElasticSearch();
         WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2));
@@ -133,33 +136,36 @@ class ScrolledSearchTest {
                 .query(QueryBuilders.matchAllQuery())
                 .size(SIZE));
 
-        assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+        assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
             .extracting(SearchHit::getId)
             .containsOnly(id1, id2);
     }
 
     @Test
-    void scrollIterableShouldWorkWhenMoreThanSizeElement() throws Exception {
+    void scrollIterableShouldWorkWhenMoreThanSizeElement() {
         String id1 = "1";
         client.index(new IndexRequest(INDEX_NAME.getValue())
                 .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
                 .id(id1)
                 .source(MESSAGE, "Sample message"),
-            RequestOptions.DEFAULT);
+            RequestOptions.DEFAULT)
+            .block();
 
         String id2 = "2";
         client.index(new IndexRequest(INDEX_NAME.getValue())
                 .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
                 .id(id2)
                 .source(MESSAGE, "Sample message"),
-            RequestOptions.DEFAULT);
+            RequestOptions.DEFAULT)
+            .block();
 
         String id3 = "3";
         client.index(new IndexRequest(INDEX_NAME.getValue())
                 .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
                 .id(id3)
                 .source(MESSAGE, "Sample message"),
-            RequestOptions.DEFAULT);
+            RequestOptions.DEFAULT)
+            .block();
 
         elasticSearch.awaitForElasticSearch();
         WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3));
@@ -170,18 +176,19 @@ class ScrolledSearchTest {
                 .query(QueryBuilders.matchAllQuery())
                 .size(SIZE));
 
-        assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+        assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
             .extracting(SearchHit::getId)
             .containsOnly(id1, id2, id3);
     }
 
-    private void hasIdsInIndex(RestHighLevelClient client, String... ids) throws IOException {
+    private void hasIdsInIndex(ReactorElasticSearchClient client, String... ids) {
         SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
             .scroll(TIMEOUT)
             .source(new SearchSourceBuilder()
                 .query(QueryBuilders.matchAllQuery()));
 
         SearchHit[] hits = client.search(searchRequest, RequestOptions.DEFAULT)
+            .block()
             .getHits()
             .getHits();
 
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxIndexCreationUtil.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxIndexCreationUtil.java
index 4b76dad..9871600 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxIndexCreationUtil.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxIndexCreationUtil.java
@@ -25,17 +25,17 @@ import org.apache.james.backends.es.ElasticSearchConfiguration;
 import org.apache.james.backends.es.IndexCreationFactory;
 import org.apache.james.backends.es.IndexName;
 import org.apache.james.backends.es.NodeMappingFactory;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.backends.es.ReadAliasName;
 import org.apache.james.backends.es.WriteAliasName;
-import org.elasticsearch.client.RestHighLevelClient;
 
 public class MailboxIndexCreationUtil {
 
-    public static RestHighLevelClient prepareClient(RestHighLevelClient client,
-                                       ReadAliasName readAlias,
-                                       WriteAliasName writeAlias,
-                                       IndexName indexName,
-                                       ElasticSearchConfiguration configuration) throws IOException {
+    public static ReactorElasticSearchClient prepareClient(ReactorElasticSearchClient client,
+                                                           ReadAliasName readAlias,
+                                                           WriteAliasName writeAlias,
+                                                           IndexName indexName,
+                                                           ElasticSearchConfiguration configuration) throws IOException {
         return NodeMappingFactory.applyMapping(
             new IndexCreationFactory(configuration)
                 .useIndex(indexName)
@@ -46,7 +46,7 @@ public class MailboxIndexCreationUtil {
             MailboxMappingFactory.getMappingContent());
     }
 
-    public static RestHighLevelClient prepareDefaultClient(RestHighLevelClient client, ElasticSearchConfiguration configuration) throws IOException {
+    public static ReactorElasticSearchClient prepareDefaultClient(ReactorElasticSearchClient client, ElasticSearchConfiguration configuration) throws IOException {
         return prepareClient(client,
             MailboxElasticSearchConstants.DEFAULT_MAILBOX_READ_ALIAS,
             MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS,
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index c3d8595..1430111 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -52,7 +52,6 @@ import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
-import org.apache.james.util.OptionalUtils;
 import org.elasticsearch.index.query.TermQueryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +62,8 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Mono;
+
 public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSearchIndex {
     public static class ElasticSearchListeningMessageSearchIndexGroup extends Group {
 
@@ -112,7 +113,8 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
 
         return searcher
             .search(ImmutableList.of(mailbox.getMailboxId()), searchQuery, noLimit)
-            .map(SearchResult::getMessageUid);
+            .map(SearchResult::getMessageUid)
+            .toStream();
     }
     
     @Override
@@ -123,15 +125,14 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
             return ImmutableList.of();
         }
 
-        try (Stream<SearchResult> searchResults = searcher.search(mailboxIds, searchQuery, Optional.empty())) {
-            return searchResults
-                .peek(this::logIfNoMessageId)
-                .map(SearchResult::getMessageId)
-                .flatMap(OptionalUtils::toStream)
-                .distinct()
-                .limit(limit)
-                .collect(Guavate.toImmutableList());
-        }
+        return searcher.search(mailboxIds, searchQuery, Optional.empty())
+            .doOnNext(this::logIfNoMessageId)
+            .map(SearchResult::getMessageId)
+            .flatMap(Mono::justOrEmpty)
+            .distinct()
+            .take(limit)
+            .collect(Guavate.toImmutableList())
+            .block();
     }
 
     @Override
@@ -144,7 +145,9 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
 
         String jsonContent = generateIndexedJson(mailbox, message, session);
 
-        elasticSearchIndexer.index(indexIdFor(mailbox, message.getUid()), jsonContent, routingKeyFactory.from(mailbox.getMailboxId()));
+        elasticSearchIndexer
+            .index(indexIdFor(mailbox, message.getUid()), jsonContent, routingKeyFactory.from(mailbox.getMailboxId()))
+            .block();
     }
 
     private String generateIndexedJson(Mailbox mailbox, MailboxMessage message, MailboxSession session) throws JsonProcessingException {
@@ -162,12 +165,13 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
     }
 
     @Override
-    public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) throws IOException {
+    public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
             elasticSearchIndexer
                 .delete(expungedUids.stream()
                     .map(uid ->  indexIdFor(mailbox, uid))
                     .collect(Guavate.toImmutableList()),
-                    routingKeyFactory.from(mailbox.getMailboxId()));
+                    routingKeyFactory.from(mailbox.getMailboxId()))
+                .block();
     }
 
     @Override
@@ -181,14 +185,16 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
     }
 
     @Override
-    public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws IOException {
+    public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
         ImmutableList<UpdatedRepresentation> updates = updatedFlagsList.stream()
             .map(Throwing.<UpdatedFlags, UpdatedRepresentation>function(
                 updatedFlags -> createUpdatedDocumentPartFromUpdatedFlags(mailbox, updatedFlags))
                 .sneakyThrow())
             .collect(Guavate.toImmutableList());
 
-        elasticSearchIndexer.update(updates, routingKeyFactory.from(mailbox.getMailboxId()));
+        elasticSearchIndexer
+            .update(updates, routingKeyFactory.from(mailbox.getMailboxId()))
+            .block();
     }
 
     private UpdatedRepresentation createUpdatedDocumentPartFromUpdatedFlags(Mailbox mailbox, UpdatedFlags updatedFlags) throws JsonProcessingException {
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
index 85cceb1..7dd6509 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
@@ -21,10 +21,10 @@ package org.apache.james.mailbox.elasticsearch.search;
 
 import java.util.Collection;
 import java.util.Optional;
-import java.util.stream.Stream;
 
 import org.apache.james.backends.es.AliasName;
 import org.apache.james.backends.es.NodeMappingFactory;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.backends.es.ReadAliasName;
 import org.apache.james.backends.es.RoutingKey;
 import org.apache.james.backends.es.search.ScrolledSearch;
@@ -37,7 +37,6 @@ import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.SearchQuery;
 import org.apache.james.mailbox.store.search.MessageSearchIndex;
 import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.document.DocumentField;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.search.SearchHit;
@@ -47,6 +46,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+
 public class ElasticSearchSearcher {
     public static final int DEFAULT_SEARCH_SIZE = 100;
     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchSearcher.class);
@@ -55,7 +56,7 @@ public class ElasticSearchSearcher {
         JsonMessageConstants.UID, JsonMessageConstants.MESSAGE_ID);
     private static final int MAX_ROUTING_KEY = 5;
 
-    private final RestHighLevelClient client;
+    private final ReactorElasticSearchClient client;
     private final QueryConverter queryConverter;
     private final int size;
     private final MailboxId.Factory mailboxIdFactory;
@@ -63,7 +64,7 @@ public class ElasticSearchSearcher {
     private final AliasName aliasName;
     private final RoutingKey.Factory<MailboxId> routingKeyFactory;
 
-    public ElasticSearchSearcher(RestHighLevelClient client, QueryConverter queryConverter, int size,
+    public ElasticSearchSearcher(ReactorElasticSearchClient client, QueryConverter queryConverter, int size,
                                  MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory,
                                  ReadAliasName aliasName, RoutingKey.Factory<MailboxId> routingKeyFactory) {
         this.client = client;
@@ -75,14 +76,14 @@ public class ElasticSearchSearcher {
         this.routingKeyFactory = routingKeyFactory;
     }
 
-    public Stream<MessageSearchIndex.SearchResult> search(Collection<MailboxId> mailboxIds, SearchQuery query,
-                                                          Optional<Integer> limit) {
+    public Flux<MessageSearchIndex.SearchResult> search(Collection<MailboxId> mailboxIds, SearchQuery query,
+                                                        Optional<Integer> limit) {
         SearchRequest searchRequest = prepareSearch(mailboxIds, query, limit);
-        Stream<MessageSearchIndex.SearchResult> pairStream = new ScrolledSearch(client, searchRequest)
+        Flux<MessageSearchIndex.SearchResult> pairStream = new ScrolledSearch(client, searchRequest)
             .searchHits()
             .flatMap(this::extractContentFromHit);
 
-        return limit.map(pairStream::limit)
+        return limit.map(pairStream::take)
             .orElse(pairStream);
     }
 
@@ -122,20 +123,20 @@ public class ElasticSearchSearcher {
             .orElse(size);
     }
 
-    private Stream<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) {
+    private Flux<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) {
         DocumentField mailboxId = hit.field(JsonMessageConstants.MAILBOX_ID);
         DocumentField uid = hit.field(JsonMessageConstants.UID);
         Optional<DocumentField> id = retrieveMessageIdField(hit);
         if (mailboxId != null && uid != null) {
             Number uidAsNumber = uid.getValue();
-            return Stream.of(
+            return Flux.just(
                 new MessageSearchIndex.SearchResult(
                     id.map(field -> messageIdFactory.fromString(field.getValue())),
                     mailboxIdFactory.fromString(mailboxId.getValue()),
                     MessageUid.of(uidAsNumber.longValue())));
         } else {
             LOGGER.warn("Can not extract UID, MessageID and/or MailboxId for search result {}", hit.getId());
-            return Stream.empty();
+            return Flux.empty();
         }
     }
 
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
index 04bc2d6..1813ef3 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
@@ -27,6 +27,7 @@ import java.time.ZoneId;
 
 import org.apache.james.backends.es.DockerElasticSearchExtension;
 import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MailboxSessionUtil;
 import org.apache.james.mailbox.MessageManager;
@@ -48,7 +49,6 @@ import org.apache.james.mailbox.tika.TikaHttpClientImpl;
 import org.apache.james.mailbox.tika.TikaTextExtractor;
 import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.apache.james.mime4j.dom.Message;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -67,7 +67,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
     DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
 
     TikaTextExtractor textExtractor;
-    RestHighLevelClient client;
+    ReactorElasticSearchClient client;
 
     @AfterEach
     void tearDown() throws IOException {
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
index f6d1391..95b67f8 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
@@ -33,6 +33,7 @@ import javax.mail.util.SharedByteArrayInputStream;
 
 import org.apache.james.backends.es.DockerElasticSearchExtension;
 import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.DefaultMailboxes;
 import org.apache.james.mailbox.MailboxSession;
@@ -73,7 +74,6 @@ import org.apache.james.mailbox.store.extractor.DefaultTextExtractor;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.awaitility.Duration;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -155,7 +155,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
 
         InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
 
-        RestHighLevelClient client = MailboxIndexCreationUtil.prepareDefaultClient(
+        ReactorElasticSearchClient client = MailboxIndexCreationUtil.prepareDefaultClient(
             elasticSearch.getDockerElasticSearch().clientProvider().get(),
             elasticSearch.getDockerElasticSearch().configuration());
 
@@ -256,7 +256,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         Thread.sleep(Duration.FIVE_SECONDS.getValueInMS()); // Docker pause is asynchronous and we found no way to poll for it
 
         assertThatThrownBy(() -> testee.add(session, mailbox, MESSAGE_1))
-            .isInstanceOf(IOException.class);
+            .hasCauseInstanceOf(IOException.class);
 
         elasticSearch.getDockerElasticSearch().unpause();
     }
@@ -330,7 +330,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         Thread.sleep(Duration.FIVE_SECONDS.getValueInMS()); // Docker pause is asynchronous and we found no way to poll for it
 
         assertThatThrownBy(() -> testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)))
-            .isInstanceOf(IOException.class);
+            .hasCauseInstanceOf(IOException.class);
 
         elasticSearch.getDockerElasticSearch().unpause();
     }
@@ -413,7 +413,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
             .build();
 
         assertThatThrownBy(() -> testee.update(session, mailbox, Lists.newArrayList(updatedFlags)))
-            .isInstanceOf(IOException.class);
+            .hasCauseInstanceOf(IOException.class);
 
         elasticSearch.getDockerElasticSearch().unpause();
     }
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
index f8be9a0..f7d6564 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
@@ -30,6 +30,7 @@ import java.util.stream.IntStream;
 
 import org.apache.james.backends.es.DockerElasticSearchExtension;
 import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MailboxSessionUtil;
@@ -58,7 +59,6 @@ import org.apache.james.mailbox.tika.TikaHttpClientImpl;
 import org.apache.james.mailbox.tika.TikaTextExtractor;
 import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.apache.james.mime4j.dom.Message;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -80,7 +80,7 @@ class ElasticSearchSearcherTest {
     DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
 
     TikaTextExtractor textExtractor;
-    RestHighLevelClient client;
+    ReactorElasticSearchClient client;
     private InMemoryMailboxManager storeMailboxManager;
 
     @BeforeEach
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java
index 0a1e082..8765345 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java
@@ -21,13 +21,11 @@ package org.apache.james.quota.search.elasticsearch;
 
 import static org.apache.james.quota.search.elasticsearch.json.JsonMessageConstants.USER;
 
-import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Stream;
 
 import org.apache.james.backends.es.AliasName;
 import org.apache.james.backends.es.NodeMappingFactory;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.backends.es.ReadAliasName;
 import org.apache.james.backends.es.search.ScrolledSearch;
 import org.apache.james.core.Username;
@@ -35,7 +33,6 @@ import org.apache.james.quota.search.QuotaQuery;
 import org.apache.james.quota.search.QuotaSearcher;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -44,14 +41,16 @@ import org.elasticsearch.search.sort.SortOrder;
 
 import com.github.steveash.guavate.Guavate;
 
+import reactor.core.publisher.Flux;
+
 public class ElasticSearchQuotaSearcher implements QuotaSearcher {
     private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
 
-    private final RestHighLevelClient client;
+    private final ReactorElasticSearchClient client;
     private final AliasName readAlias;
     private final QuotaQueryConverter quotaQueryConverter;
 
-    public ElasticSearchQuotaSearcher(RestHighLevelClient client, ReadAliasName readAlias) {
+    public ElasticSearchQuotaSearcher(ReactorElasticSearchClient client, ReadAliasName readAlias) {
         this.client = client;
         this.readAlias = readAlias;
         this.quotaQueryConverter = new QuotaQueryConverter();
@@ -60,18 +59,17 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher {
     @Override
     public List<Username> search(QuotaQuery query) {
         try {
-            try (Stream<SearchHit> searchHits = searchHits(query)) {
-                return searchHits
-                    .map(SearchHit::getId)
-                    .map(Username::of)
-                    .collect(Guavate.toImmutableList());
-            }
-        } catch (IOException e) {
+            return searchHits(query)
+                .map(SearchHit::getId)
+                .map(Username::of)
+                .collect(Guavate.toImmutableList())
+                .block();
+        } catch (Exception e) {
             throw new RuntimeException("Unexpected exception while executing " + query, e);
         }
     }
 
-    private Stream<SearchHit> searchHits(QuotaQuery query) throws IOException {
+    private Flux<SearchHit> searchHits(QuotaQuery query) {
         if (query.getLimit().isLimited()) {
             return executeSingleSearch(query);
         } else {
@@ -79,7 +77,7 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher {
         }
     }
 
-    private Stream<SearchHit> executeSingleSearch(QuotaQuery query) throws IOException {
+    private Flux<SearchHit> executeSingleSearch(QuotaQuery query) {
         SearchSourceBuilder searchSourceBuilder = searchSourceBuilder(query)
             .from(query.getOffset().getValue());
         query.getLimit().getValue()
@@ -89,12 +87,11 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher {
             .types(NodeMappingFactory.DEFAULT_MAPPING_NAME)
             .source(searchSourceBuilder);
 
-        return Arrays.stream(client.search(searchRequest, RequestOptions.DEFAULT)
-            .getHits()
-            .getHits());
+        return client.search(searchRequest, RequestOptions.DEFAULT)
+            .flatMapMany(searchResponse -> Flux.fromArray(searchResponse.getHits().getHits()));
     }
 
-    private Stream<SearchHit> executeScrolledSearch(QuotaQuery query) {
+    private Flux<SearchHit> executeScrolledSearch(QuotaQuery query) {
         return new ScrolledSearch(client,
             new SearchRequest(readAlias.getValue())
                 .types(NodeMappingFactory.DEFAULT_MAPPING_NAME)
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaSearchIndexCreationUtil.java b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaSearchIndexCreationUtil.java
index c1918a9..cf1aad6 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaSearchIndexCreationUtil.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaSearchIndexCreationUtil.java
@@ -26,11 +26,11 @@ import org.apache.james.backends.es.ElasticSearchConfiguration;
 import org.apache.james.backends.es.IndexCreationFactory;
 import org.apache.james.backends.es.IndexName;
 import org.apache.james.backends.es.NodeMappingFactory;
-import org.elasticsearch.client.RestHighLevelClient;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 
 public class QuotaSearchIndexCreationUtil {
 
-    public static RestHighLevelClient prepareClient(RestHighLevelClient client,
+    public static ReactorElasticSearchClient prepareClient(ReactorElasticSearchClient client,
                                        AliasName readAlias,
                                        AliasName writeAlias,
                                        IndexName indexName,
@@ -46,7 +46,7 @@ public class QuotaSearchIndexCreationUtil {
             QuotaRatioMappingFactory.getMappingContent());
     }
 
-    public static RestHighLevelClient prepareDefaultClient(RestHighLevelClient client, ElasticSearchConfiguration configuration) throws IOException {
+    public static ReactorElasticSearchClient prepareDefaultClient(ReactorElasticSearchClient client, ElasticSearchConfiguration configuration) throws IOException {
         return prepareClient(client,
             QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_READ_ALIAS,
             QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_WRITE_ALIAS,
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java
index b124c18..0a80a56 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java
@@ -70,9 +70,11 @@ public class ElasticSearchQuotaMailboxListener implements MailboxListener.GroupM
 
     private void handleEvent(QuotaUsageUpdatedEvent event) throws IOException {
         Username user = event.getUsername();
-        indexer.index(toDocumentId(user),
-            quotaRatioToElasticSearchJson.convertToJson(event),
-            routingKeyFactory.from(user));
+        indexer
+            .index(toDocumentId(user),
+                quotaRatioToElasticSearchJson.convertToJson(event),
+                routingKeyFactory.from(user))
+            .block();
     }
 
     private DocumentId toDocumentId(Username user) {
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
index cc7d8b2..991c5d2 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import org.apache.james.backends.es.DockerElasticSearch;
 import org.apache.james.backends.es.DockerElasticSearchSingleton;
 import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.dnsservice.api.DNSService;
 import org.apache.james.domainlist.memory.MemoryDomainList;
 import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
@@ -34,7 +35,6 @@ import org.apache.james.quota.search.QuotaSearchTestSystem;
 import org.apache.james.quota.search.elasticsearch.events.ElasticSearchQuotaMailboxListener;
 import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson;
 import org.apache.james.user.memory.MemoryUsersRepository;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.BeforeEachCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
@@ -45,7 +45,7 @@ import org.junit.jupiter.api.extension.ParameterResolver;
 public class ElasticSearchQuotaSearchTestSystemExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
 
     private final DockerElasticSearch elasticSearch = DockerElasticSearchSingleton.INSTANCE;
-    private RestHighLevelClient client;
+    private ReactorElasticSearchClient client;
 
     @Override
     public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
index ba482aa..9e5671b 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
@@ -30,6 +30,7 @@ import org.apache.james.backends.es.DockerElasticSearchExtension;
 import org.apache.james.backends.es.ElasticSearchConfiguration;
 import org.apache.james.backends.es.ElasticSearchIndexer;
 import org.apache.james.backends.es.NodeMappingFactory;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.mailbox.events.Event;
 import org.apache.james.mailbox.events.Group;
 import org.apache.james.mailbox.quota.QuotaFixture.Counts;
@@ -41,7 +42,6 @@ import org.apache.james.quota.search.elasticsearch.UserRoutingKeyFactory;
 import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.junit.jupiter.api.AfterEach;
@@ -58,7 +58,7 @@ class ElasticSearchQuotaMailboxListenerTest {
     DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
 
     ElasticSearchQuotaMailboxListener quotaMailboxListener;
-    RestHighLevelClient client;
+    ReactorElasticSearchClient client;
 
     @BeforeEach
     void setUp() throws IOException {
@@ -100,10 +100,11 @@ class ElasticSearchQuotaMailboxListenerTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        SearchResponse searchResponse = client.search(new SearchRequest(QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_READ_ALIAS.getValue())
-                .types(NodeMappingFactory.DEFAULT_MAPPING_NAME)
-                .source(new SearchSourceBuilder()
-                    .query(QueryBuilders.matchAllQuery())));
+        SearchRequest searchRequest = new SearchRequest(QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_READ_ALIAS.getValue())
+            .types(NodeMappingFactory.DEFAULT_MAPPING_NAME)
+            .source(new SearchSourceBuilder()
+                .query(QueryBuilders.matchAllQuery()));
+        SearchResponse searchResponse = client.search(searchRequest).block();
 
         assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
     }
diff --git a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
index 38e1d5c..233fb26 100644
--- a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
+++ b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
@@ -27,6 +27,7 @@ import org.apache.james.backends.es.DockerElasticSearch;
 import org.apache.james.backends.es.DockerElasticSearchSingleton;
 import org.apache.james.backends.es.ElasticSearchConfiguration;
 import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.core.quota.QuotaCountLimit;
 import org.apache.james.core.quota.QuotaSizeLimit;
 import org.apache.james.imap.api.process.ImapProcessor;
@@ -54,7 +55,6 @@ import org.apache.james.metrics.logger.DefaultMetricFactory;
 import org.apache.james.mpt.api.ImapFeatures;
 import org.apache.james.mpt.api.ImapFeatures.Feature;
 import org.apache.james.mpt.host.JamesImapHostSystem;
-import org.elasticsearch.client.RestHighLevelClient;
 
 public class ElasticSearchHostSystem extends JamesImapHostSystem {
 
@@ -63,7 +63,7 @@ public class ElasticSearchHostSystem extends JamesImapHostSystem {
 
     private DockerElasticSearch dockerElasticSearch;
     private StoreMailboxManager mailboxManager;
-    private RestHighLevelClient client;
+    private ReactorElasticSearchClient client;
 
     @Override
     public void beforeTest() throws Exception {
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
index 7aab691..ed9b27e 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
@@ -24,10 +24,10 @@ import java.util.Set;
 import org.apache.james.backends.es.ClientProvider;
 import org.apache.james.backends.es.ElasticSearchHealthCheck;
 import org.apache.james.backends.es.IndexName;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.mailbox.elasticsearch.ElasticSearchMailboxConfiguration;
 import org.apache.james.quota.search.elasticsearch.ElasticSearchQuotaConfiguration;
-import org.elasticsearch.client.RestHighLevelClient;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
@@ -41,7 +41,7 @@ public class ElasticSearchClientModule extends AbstractModule {
     @Override
     protected void configure() {
         bind(ClientProvider.class).in(Scopes.SINGLETON);
-        bind(RestHighLevelClient.class).toProvider(ClientProvider.class);
+        bind(ReactorElasticSearchClient.class).toProvider(ClientProvider.class);
 
         Multibinder.newSetBinder(binder(), HealthCheck.class)
             .addBinding()
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
index bcc8148..b02c837 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
@@ -32,6 +32,7 @@ import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.james.backends.es.ElasticSearchConfiguration;
 import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.backends.es.RoutingKey;
 import org.apache.james.lifecycle.api.StartUpCheck;
 import org.apache.james.lifecycle.api.Startable;
@@ -51,7 +52,6 @@ import org.apache.james.mailbox.store.search.MessageSearchIndex;
 import org.apache.james.utils.InitializationOperation;
 import org.apache.james.utils.InitilizationOperationBuilder;
 import org.apache.james.utils.PropertiesProvider;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,12 +68,12 @@ public class ElasticSearchMailboxModule extends AbstractModule {
 
         private final ElasticSearchConfiguration configuration;
         private final ElasticSearchMailboxConfiguration mailboxConfiguration;
-        private final RestHighLevelClient client;
+        private final ReactorElasticSearchClient client;
 
         @Inject
         MailboxIndexCreator(ElasticSearchConfiguration configuration,
                             ElasticSearchMailboxConfiguration mailboxConfiguration,
-                            RestHighLevelClient client) {
+                            ReactorElasticSearchClient client) {
             this.configuration = configuration;
             this.mailboxConfiguration = mailboxConfiguration;
             this.client = client;
@@ -114,7 +114,7 @@ public class ElasticSearchMailboxModule extends AbstractModule {
     @Provides
     @Singleton
     @Named(MailboxElasticSearchConstants.InjectionNames.MAILBOX)
-    private ElasticSearchIndexer createMailboxElasticSearchIndexer(RestHighLevelClient client,
+    private ElasticSearchIndexer createMailboxElasticSearchIndexer(ReactorElasticSearchClient client,
                                                                    ElasticSearchMailboxConfiguration configuration) {
         return new ElasticSearchIndexer(
             client,
@@ -123,7 +123,7 @@ public class ElasticSearchMailboxModule extends AbstractModule {
 
     @Provides
     @Singleton
-    private ElasticSearchSearcher createMailboxElasticSearchSearcher(RestHighLevelClient client,
+    private ElasticSearchSearcher createMailboxElasticSearchSearcher(ReactorElasticSearchClient client,
                                                                      QueryConverter queryConverter,
                                                                      MailboxId.Factory mailboxIdFactory,
                                                                      MessageId.Factory messageIdFactory,
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchQuotaSearcherModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchQuotaSearcherModule.java
index eb4f9b4..123b02f 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchQuotaSearcherModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchQuotaSearcherModule.java
@@ -30,6 +30,7 @@ import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.james.backends.es.ElasticSearchConfiguration;
 import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.lifecycle.api.Startable;
 import org.apache.james.mailbox.events.MailboxListener;
 import org.apache.james.quota.search.QuotaSearcher;
@@ -42,7 +43,6 @@ import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearc
 import org.apache.james.utils.InitializationOperation;
 import org.apache.james.utils.InitilizationOperationBuilder;
 import org.apache.james.utils.PropertiesProvider;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,12 +57,12 @@ public class ElasticSearchQuotaSearcherModule extends AbstractModule {
     static class ElasticSearchQuotaIndexCreator implements Startable {
         private final ElasticSearchConfiguration configuration;
         private final ElasticSearchQuotaConfiguration quotaConfiguration;
-        private final RestHighLevelClient client;
+        private final ReactorElasticSearchClient client;
 
         @Inject
         ElasticSearchQuotaIndexCreator(ElasticSearchConfiguration configuration,
                                        ElasticSearchQuotaConfiguration quotaConfiguration,
-                                       RestHighLevelClient client) {
+                                       ReactorElasticSearchClient client) {
             this.configuration = configuration;
             this.quotaConfiguration = quotaConfiguration;
             this.client = client;
@@ -88,7 +88,7 @@ public class ElasticSearchQuotaSearcherModule extends AbstractModule {
 
     @Provides
     @Singleton
-    public QuotaSearcher provideSearcher(RestHighLevelClient client, ElasticSearchQuotaConfiguration configuration) {
+    public QuotaSearcher provideSearcher(ReactorElasticSearchClient client, ElasticSearchQuotaConfiguration configuration) {
         return new ElasticSearchQuotaSearcher(client,
             configuration.getReadAliasQuotaRatioName());
     }
@@ -107,7 +107,7 @@ public class ElasticSearchQuotaSearcherModule extends AbstractModule {
 
     @Provides
     @Singleton
-    public ElasticSearchQuotaMailboxListener provideListener(RestHighLevelClient client,
+    public ElasticSearchQuotaMailboxListener provideListener(ReactorElasticSearchClient client,
                                                              ElasticSearchQuotaConfiguration configuration) {
         return new ElasticSearchQuotaMailboxListener(
             new ElasticSearchIndexer(client,
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchStartUpCheck.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchStartUpCheck.java
index 4bf89d7..3565b38 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchStartUpCheck.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchStartUpCheck.java
@@ -24,10 +24,10 @@ import java.io.IOException;
 import javax.inject.Inject;
 
 import org.apache.james.backends.es.ElasticSearchConfiguration;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.lifecycle.api.StartUpCheck;
 import org.elasticsearch.Version;
 import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,10 +40,10 @@ public class ElasticSearchStartUpCheck implements StartUpCheck {
 
     public static final String CHECK_NAME = "ElasticSearchStartUpCheck";
 
-    private final RestHighLevelClient client;
+    private final ReactorElasticSearchClient client;
 
     @Inject
-    private ElasticSearchStartUpCheck(RestHighLevelClient client) {
+    private ElasticSearchStartUpCheck(ReactorElasticSearchClient client) {
         this.client = client;
     }
 
diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/ESReporterTest.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/ESReporterTest.java
index 23e7cff..81aa4c0 100644
--- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/ESReporterTest.java
+++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/ESReporterTest.java
@@ -34,6 +34,7 @@ import java.util.TimerTask;
 import java.util.stream.Collectors;
 
 import org.apache.commons.net.imap.IMAPClient;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.core.Username;
 import org.apache.james.jmap.AccessToken;
 import org.apache.james.jmap.draft.JmapGuiceProbe;
@@ -45,7 +46,6 @@ import org.apache.james.modules.protocols.ImapGuiceProbe;
 import org.apache.james.utils.DataProbeImpl;
 import org.awaitility.Duration;
 import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.junit.jupiter.api.AfterEach;
@@ -152,12 +152,13 @@ class ESReporterTest {
     }
 
     private boolean checkMetricRecordedInElasticSearch() {
-        try (RestHighLevelClient client = elasticSearchExtension.getDockerES().clientProvider().get()) {
+        try (ReactorElasticSearchClient client = elasticSearchExtension.getDockerES().clientProvider().get()) {
             SearchRequest searchRequest = new SearchRequest()
                 .source(new SearchSourceBuilder()
                     .query(QueryBuilders.matchAllQuery()));
             return !Arrays.stream(client
                     .search(searchRequest)
+                    .block()
                     .getHits()
                     .getHits())
                 .filter(searchHit -> searchHit.getIndex().startsWith(TestDockerESMetricReporterModule.METRICS_INDEX))
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
index e57771f..392b859 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
@@ -27,6 +27,7 @@ import org.apache.james.backends.es.DockerElasticSearch;
 import org.apache.james.backends.es.DockerElasticSearchSingleton;
 import org.apache.james.backends.es.ElasticSearchConfiguration;
 import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.apache.james.dnsservice.api.DNSService;
 import org.apache.james.domainlist.memory.MemoryDomainList;
 import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
@@ -39,7 +40,6 @@ import org.apache.james.quota.search.elasticsearch.UserRoutingKeyFactory;
 import org.apache.james.quota.search.elasticsearch.events.ElasticSearchQuotaMailboxListener;
 import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson;
 import org.apache.james.user.memory.MemoryUsersRepository;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.BeforeEachCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
@@ -53,7 +53,7 @@ public class ElasticSearchQuotaSearchExtension implements ParameterResolver, Bef
     private final DockerElasticSearch elasticSearch = DockerElasticSearchSingleton.INSTANCE;
     private WebAdminQuotaSearchTestSystem restQuotaSearchTestSystem;
     private TemporaryFolder temporaryFolder = new TemporaryFolder();
-    private RestHighLevelClient client;
+    private ReactorElasticSearchClient client;
 
     @Override
     public void beforeEach(ExtensionContext context) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org