You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/01/06 03:02:40 UTC

[james-project] 01/12: JAMES-3771 Migrate backend opensearch high level to new java client

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c7cb996ee941b649f4d270ce4365aa7eaf910ed6
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Tue Jun 21 15:03:11 2022 +0700

    JAMES-3771 Migrate backend opensearch high level to new java client
---
 backends-common/opensearch/pom.xml                 |  15 +-
 .../james/backends/opensearch/ClientProvider.java  |  34 ++-
 .../opensearch/DeleteByQueryPerformer.java         |  41 ++--
 .../james/backends/opensearch/DocumentId.java      |   3 +-
 .../backends/opensearch/IndexCreationFactory.java  | 254 ++++++++++++---------
 .../backends/opensearch/OpenSearchHealthCheck.java |  38 +--
 .../backends/opensearch/OpenSearchIndexer.java     | 121 ++++++----
 .../opensearch/ReactorOpenSearchClient.java        | 181 ++++++---------
 .../james/backends/opensearch/RoutingKey.java      |   3 +-
 .../backends/opensearch/UpdatedRepresentation.java |   3 +-
 .../backends/opensearch/search/ScrolledSearch.java |  74 +++---
 .../ClientProviderImplConnectionContract.java      |  12 +-
 .../opensearch/DockerOpenSearchExtension.java      |  42 ++--
 .../opensearch/IndexCreationFactoryTest.java       | 213 ++++++++---------
 .../opensearch/OpenSearchHealthCheckTest.java      |  52 ++---
 .../backends/opensearch/OpenSearchIndexerTest.java |  76 +++---
 .../opensearch/search/ScrolledSearchTest.java      | 135 ++++++-----
 17 files changed, 690 insertions(+), 607 deletions(-)

diff --git a/backends-common/opensearch/pom.xml b/backends-common/opensearch/pom.xml
index ce40d7dca4..a53014a685 100644
--- a/backends-common/opensearch/pom.xml
+++ b/backends-common/opensearch/pom.xml
@@ -47,14 +47,10 @@
             <artifactId>testing-base</artifactId>
             <scope>test</scope>
         </dependency>
-
-        <!-- Prevents https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-28491 -->
         <dependency>
-            <groupId>com.fasterxml.jackson.dataformat</groupId>
-            <artifactId>jackson-dataformat-cbor</artifactId>
-            <version>2.13.4</version>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
         </dependency>
-
         <dependency>
             <groupId>com.github.fge</groupId>
             <artifactId>throwing-lambdas</artifactId>
@@ -86,7 +82,12 @@
         </dependency>
         <dependency>
             <groupId>org.opensearch.client</groupId>
-            <artifactId>opensearch-rest-high-level-client</artifactId>
+            <artifactId>opensearch-java</artifactId>
+            <version>2.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.opensearch.client</groupId>
+            <artifactId>opensearch-rest-client</artifactId>
             <version>2.3.0</version>
         </dependency>
         <dependency>
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java
index 52bf765741..89a104bb8b 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java
@@ -46,7 +46,9 @@ import org.apache.http.ssl.SSLContextBuilder;
 import org.apache.http.ssl.TrustStrategy;
 import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.opensearch.client.RestClient;
-import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.client.json.jackson.JacksonJsonpMapper;
+import org.opensearch.client.opensearch.OpenSearchAsyncClient;
+import org.opensearch.client.transport.rest_client.RestClientTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -181,7 +183,8 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> {
     private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class);
 
     private final OpenSearchConfiguration configuration;
-    private final RestHighLevelClient openSearchRestHighLevelClient;
+    private final RestClient lowLevelRestClient;
+    private final OpenSearchAsyncClient openSearchClient;
     private final HttpAsyncClientConfigurer httpAsyncClientConfigurer;
     private final ReactorOpenSearchClient client;
 
@@ -189,28 +192,35 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> {
     public ClientProvider(OpenSearchConfiguration configuration) {
         this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration);
         this.configuration = configuration;
-        this.openSearchRestHighLevelClient = connect(configuration);
-        this.client = new ReactorOpenSearchClient(this.openSearchRestHighLevelClient);
+        this.lowLevelRestClient = buildRestClient();
+        this.openSearchClient = connect();
+        this.client = new ReactorOpenSearchClient(this.openSearchClient, lowLevelRestClient);
     }
 
-    private RestHighLevelClient connect(OpenSearchConfiguration configuration) {
+    private RestClient buildRestClient() {
+        return RestClient.builder(hostsToHttpHosts())
+            .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure)
+            .build();
+    }
+
+    private OpenSearchAsyncClient connect() {
         Duration waitDelay = Duration.ofMillis(configuration.getMinDelay());
         boolean suppressLeadingZeroElements = true;
         boolean suppressTrailingZeroElements = true;
         return Mono.fromCallable(this::connectToCluster)
             .doOnError(e -> LOGGER.warn("Error establishing OpenSearch connection. Next retry scheduled in {}",
                 DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), suppressLeadingZeroElements, suppressTrailingZeroElements), e))
-            .retryWhen(Retry.backoff(configuration.getMaxRetries(), waitDelay).scheduler(Schedulers.boundedElastic()))
+            .retryWhen(Retry.backoff(configuration.getMaxRetries(), waitDelay).scheduler(Schedulers.elastic()))
+            .publishOn(Schedulers.elastic())
             .block();
     }
 
-    private RestHighLevelClient connectToCluster() {
+    private OpenSearchAsyncClient connectToCluster() {
         LOGGER.info("Trying to connect to OpenSearch service at {}", LocalDateTime.now());
 
-        return new RestHighLevelClient(
-            RestClient
-                .builder(hostsToHttpHosts())
-                .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure));
+        RestClientTransport transport = new RestClientTransport(lowLevelRestClient, new JacksonJsonpMapper());
+
+        return new OpenSearchAsyncClient(transport);
     }
 
     private HttpHost[] hostsToHttpHosts() {
@@ -226,6 +236,6 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> {
 
     @PreDestroy
     public void close() throws IOException {
-        openSearchRestHighLevelClient.close();
+        lowLevelRestClient.close();
     }
 }
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DeleteByQueryPerformer.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DeleteByQueryPerformer.java
index a0261badd8..7ea40f627e 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DeleteByQueryPerformer.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DeleteByQueryPerformer.java
@@ -19,9 +19,10 @@
 
 package org.apache.james.backends.opensearch;
 
-import org.opensearch.client.RequestOptions;
-import org.opensearch.index.query.QueryBuilder;
-import org.opensearch.index.reindex.DeleteByQueryRequest;
+import java.io.IOException;
+
+import org.opensearch.client.opensearch._types.query_dsl.Query;
+import org.opensearch.client.opensearch.core.DeleteByQueryRequest;
 
 import reactor.core.publisher.Mono;
 
@@ -35,20 +36,32 @@ public class DeleteByQueryPerformer {
         this.aliasName = aliasName;
     }
 
-    public Mono<Void> perform(QueryBuilder queryBuilder, RoutingKey routingKey) {
-        DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(aliasName.getValue());
-        deleteRequest.setQuery(queryBuilder);
-        deleteRequest.setRouting(routingKey.asString());
+    public Mono<Void> perform(Query query, RoutingKey routingKey) {
+        DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest.Builder()
+            .index(aliasName.getValue())
+            .query(query)
+            .routing(routingKey.asString())
+            .build();
 
-        return client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT)
-            .then();
+        try {
+            return client.deleteByQuery(deleteRequest)
+                .then();
+        } catch (IOException e) {
+            return Mono.error(e);
+        }
     }
 
-    public Mono<Void> perform(QueryBuilder queryBuilder) {
-        DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(aliasName.getValue());
-        deleteRequest.setQuery(queryBuilder);
+    public Mono<Void> perform(Query query) {
+        DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest.Builder()
+            .index(aliasName.getValue())
+            .query(query)
+            .build();
 
-        return client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT)
-            .then();
+        try {
+            return client.deleteByQuery(deleteRequest)
+                .then();
+        } catch (IOException e) {
+            return Mono.error(e);
+        }
     }
 }
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DocumentId.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DocumentId.java
index 42f650b03b..be9a994539 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DocumentId.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DocumentId.java
@@ -21,9 +21,8 @@ package org.apache.james.backends.opensearch;
 
 import java.util.Objects;
 
-import org.opensearch.common.Strings;
-
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 
 public class DocumentId {
 
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/IndexCreationFactory.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/IndexCreationFactory.java
index 48756d104c..f314b373cc 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/IndexCreationFactory.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/IndexCreationFactory.java
@@ -19,58 +19,76 @@
 
 package org.apache.james.backends.opensearch;
 
-import static org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
-import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
-
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
-import org.opensearch.OpenSearchStatusException;
-import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
-import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.client.indices.CreateIndexRequest;
-import org.opensearch.client.indices.GetIndexRequest;
-import org.opensearch.common.Strings;
-import org.opensearch.common.xcontent.XContentBuilder;
-import org.opensearch.common.xcontent.XContentType;
+import org.opensearch.client.opensearch._types.OpenSearchException;
+import org.opensearch.client.opensearch._types.WaitForActiveShards;
+import org.opensearch.client.opensearch._types.analysis.Analyzer;
+import org.opensearch.client.opensearch._types.analysis.CustomAnalyzer;
+import org.opensearch.client.opensearch._types.analysis.CustomNormalizer;
+import org.opensearch.client.opensearch._types.analysis.Normalizer;
+import org.opensearch.client.opensearch._types.analysis.SnowballLanguage;
+import org.opensearch.client.opensearch._types.analysis.SnowballTokenFilter;
+import org.opensearch.client.opensearch._types.analysis.Tokenizer;
+import org.opensearch.client.opensearch._types.mapping.TypeMapping;
+import org.opensearch.client.opensearch.indices.CreateIndexRequest;
+import org.opensearch.client.opensearch.indices.ExistsAliasRequest;
+import org.opensearch.client.opensearch.indices.ExistsRequest;
+import org.opensearch.client.opensearch.indices.IndexSettings;
+import org.opensearch.client.opensearch.indices.IndexSettingsAnalysis;
+import org.opensearch.client.opensearch.indices.UpdateAliasesRequest;
+import org.opensearch.client.opensearch.indices.update_aliases.Action;
+import org.opensearch.client.opensearch.indices.update_aliases.AddAction;
+import org.opensearch.client.transport.endpoints.BooleanResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
 public class IndexCreationFactory {
 
-    public static class IndexCreationCustomElement {
-        public static IndexCreationCustomElement EMPTY = from("{}");
+    public static class IndexCreationCustomAnalyzer {
+        private final String key;
+        private final Analyzer analyzer;
 
-        public static IndexCreationCustomElement from(String value) {
-            try {
-                new ObjectMapper().readTree(value);
-            } catch (JsonProcessingException e) {
-                throw new IllegalArgumentException("value must be a valid json");
-            }
-            return new IndexCreationCustomElement(value);
+        public IndexCreationCustomAnalyzer(String key, Analyzer analyzer) {
+            this.key = key;
+            this.analyzer = analyzer;
+        }
+
+        public String getKey() {
+            return key;
+        }
+
+        public Analyzer getAnalyzer() {
+            return analyzer;
         }
+    }
 
-        private final String payload;
+    public static class IndexCreationCustomTokenizer {
+        private final String key;
+        private final Tokenizer tokenizer;
 
-        IndexCreationCustomElement(String payload) {
-            this.payload = payload;
+        public IndexCreationCustomTokenizer(String key, Tokenizer tokenizer) {
+            this.key = key;
+            this.tokenizer = tokenizer;
         }
 
-        public String getPayload() {
-            return payload;
+        public String getKey() {
+            return key;
+        }
+
+        public Tokenizer getTokenizer() {
+            return tokenizer;
         }
     }
 
@@ -103,8 +121,8 @@ public class IndexCreationFactory {
                 private final int waitForActiveShards;
                 private final IndexName indexName;
                 private final ImmutableList.Builder<AliasName> aliases;
-                private Optional<IndexCreationCustomElement> customAnalyzers;
-                private Optional<IndexCreationCustomElement> customTokenizers;
+                private final ImmutableList.Builder<IndexCreationCustomAnalyzer> customAnalyzers;
+                private final ImmutableList.Builder<IndexCreationCustomTokenizer> customTokenizers;
 
                 FinalStage(int nbShards, int nbReplica, int waitForActiveShards, IndexName indexName) {
                     this.nbShards = nbShards;
@@ -112,8 +130,8 @@ public class IndexCreationFactory {
                     this.waitForActiveShards = waitForActiveShards;
                     this.indexName = indexName;
                     this.aliases = ImmutableList.builder();
-                    this.customAnalyzers = Optional.empty();
-                    this.customTokenizers = Optional.empty();
+                    this.customAnalyzers = ImmutableList.builder();
+                    this.customTokenizers = ImmutableList.builder();
                 }
 
                 public FinalStage addAlias(AliasName... aliases) {
@@ -126,29 +144,29 @@ public class IndexCreationFactory {
                     return this;
                 }
 
-                public FinalStage customAnalyzers(IndexCreationCustomElement customAnalyzers) {
-                    this.customAnalyzers = Optional.of(customAnalyzers);
+                public FinalStage customAnalyzers(IndexCreationCustomAnalyzer... customAnalyzers) {
+                    this.customAnalyzers.add(customAnalyzers);
                     return this;
                 }
 
-                public FinalStage customTokenizers(IndexCreationCustomElement customTokenizers) {
-                    this.customTokenizers = Optional.of(customTokenizers);
+                public FinalStage customTokenizers(IndexCreationCustomTokenizer... customTokenizers) {
+                    this.customTokenizers.add(customTokenizers);
                     return this;
                 }
 
                 public IndexCreationPerformer build() {
-                    return new IndexCreationPerformer(nbShards, nbReplica, waitForActiveShards, indexName, aliases.build(), customAnalyzers, customTokenizers);
+                    return new IndexCreationPerformer(nbShards, nbReplica, waitForActiveShards, indexName, aliases.build(), customAnalyzers.build(), customTokenizers.build());
                 }
 
                 public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client) {
                     return build().createIndexAndAliases(client, Optional.empty(), Optional.empty());
                 }
 
-                public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, XContentBuilder mappingContent) {
+                public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, TypeMapping mappingContent) {
                     return build().createIndexAndAliases(client, Optional.empty(), Optional.of(mappingContent));
                 }
 
-                public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, Optional<XContentBuilder> indexSettings, Optional<XContentBuilder> mappingContent) {
+                public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, Optional<IndexSettings> indexSettings, Optional<TypeMapping> mappingContent) {
                     return build().createIndexAndAliases(client, indexSettings, mappingContent);
                 }
             }
@@ -164,11 +182,12 @@ public class IndexCreationFactory {
         private final int waitForActiveShards;
         private final IndexName indexName;
         private final ImmutableList<AliasName> aliases;
-        private final Optional<IndexCreationCustomElement> customAnalyzers;
-        private final Optional<IndexCreationCustomElement> customTokenizers;
+        private final ImmutableList<IndexCreationCustomAnalyzer> customAnalyzers;
+        private final ImmutableList<IndexCreationCustomTokenizer> customTokenizers;
 
-        private IndexCreationPerformer(int nbShards, int nbReplica, int waitForActiveShards, IndexName indexName, ImmutableList<AliasName> aliases,
-                                      Optional<IndexCreationCustomElement> customAnalyzers, Optional<IndexCreationCustomElement> customTokenizers) {
+        private IndexCreationPerformer(int nbShards, int nbReplica, int waitForActiveShards, IndexName indexName,
+                                       ImmutableList<AliasName> aliases, ImmutableList<IndexCreationCustomAnalyzer> customAnalyzers,
+                                       ImmutableList<IndexCreationCustomTokenizer> customTokenizers) {
             this.nbShards = nbShards;
             this.nbReplica = nbReplica;
             this.waitForActiveShards = waitForActiveShards;
@@ -178,8 +197,9 @@ public class IndexCreationFactory {
             this.customTokenizers = customTokenizers;
         }
 
-        public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, Optional<XContentBuilder> indexSettings,
-                                                             Optional<XContentBuilder> mappingContent) {
+        public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client,
+                                                                Optional<IndexSettings> indexSettings,
+                                                                Optional<TypeMapping> mappingContent) {
             Preconditions.checkNotNull(indexName);
             try {
                 createIndexIfNeeded(client, indexName, indexSettings.orElse(generateSetting()), mappingContent);
@@ -193,31 +213,41 @@ public class IndexCreationFactory {
 
         private void createAliasIfNeeded(ReactorOpenSearchClient client, IndexName indexName, AliasName aliasName) throws IOException {
             if (!aliasExist(client, aliasName)) {
-                client.indices()
-                    .updateAliases(
-                        new IndicesAliasesRequest().addAliasAction(
-                            new AliasActions(AliasActions.Type.ADD)
+                client.updateAliases(
+                    new UpdateAliasesRequest.Builder()
+                        .actions(new Action.Builder()
+                            .add(new AddAction.Builder()
                                 .index(indexName.getValue())
-                                .alias(aliasName.getValue())),
-                        RequestOptions.DEFAULT);
+                                .alias(aliasName.getValue())
+                                .build())
+                            .build())
+                        .build())
+                    .block();
             }
         }
 
         private boolean aliasExist(ReactorOpenSearchClient client, AliasName aliasName) throws IOException {
-            return client.indices()
-                .existsAlias(new GetAliasesRequest().aliases(aliasName.getValue()), RequestOptions.DEFAULT);
+            return client.aliasExists(new ExistsAliasRequest.Builder()
+                    .name(aliasName.getValue())
+                    .build())
+                .map(BooleanResponse::value)
+                .block();
         }
 
-        private void createIndexIfNeeded(ReactorOpenSearchClient client, IndexName indexName, XContentBuilder settings, Optional<XContentBuilder> mappingContent) throws IOException {
+        private void createIndexIfNeeded(ReactorOpenSearchClient client, IndexName indexName, IndexSettings settings, Optional<TypeMapping> mappingContent) throws IOException {
             try {
                 if (!indexExists(client, indexName)) {
-                    CreateIndexRequest request = new CreateIndexRequest(indexName.getValue()).source(settings);
-                    mappingContent.ifPresent(request::mapping);
-                    client.indices().create(
-                        request,
-                        RequestOptions.DEFAULT);
+                    CreateIndexRequest.Builder request = new CreateIndexRequest.Builder()
+                        .index(indexName.getValue())
+                        .waitForActiveShards(new WaitForActiveShards.Builder()
+                            .count(waitForActiveShards)
+                            .build())
+                        .settings(settings);
+                    mappingContent.ifPresent(request::mappings);
+                    client.createIndex(request.build())
+                        .block();
                 }
-            } catch (OpenSearchStatusException exception) {
+            } catch (OpenSearchException exception) {
                 if (exception.getMessage().contains(INDEX_ALREADY_EXISTS_EXCEPTION_MESSAGE)) {
                     LOGGER.info("Index [{}] already exists", indexName.getValue());
                 } else {
@@ -227,60 +257,66 @@ public class IndexCreationFactory {
         }
 
         private boolean indexExists(ReactorOpenSearchClient client, IndexName indexName) throws IOException {
-            return client.indices().exists(new GetIndexRequest(indexName.getValue()), RequestOptions.DEFAULT);
+            return client.indexExists(new ExistsRequest.Builder()
+                    .index(indexName.getValue())
+                    .build())
+                .map(BooleanResponse::value)
+                .block();
+        }
+
+        private IndexSettings generateSetting() {
+            return new IndexSettings.Builder()
+                .numberOfShards(Integer.toString(nbShards))
+                .numberOfReplicas(Integer.toString(nbReplica))
+                .analysis(new IndexSettingsAnalysis.Builder()
+                    .normalizer(CASE_INSENSITIVE, new Normalizer.Builder()
+                        .custom(generateNormalizer())
+                        .build())
+                    .analyzer(generateAnalyzers())
+                    .tokenizer(generateTokenizers())
+                    .build())
+                .build();
         }
 
-        private XContentBuilder generateSetting() throws IOException {
-            return jsonBuilder()
-                .startObject()
-                    .startObject("settings")
-                        .field("number_of_shards", nbShards)
-                        .field("number_of_replicas", nbReplica)
-                        .field("index.write.wait_for_active_shards", waitForActiveShards)
-                        .startObject("analysis")
-                            .startObject("normalizer")
-                                .startObject(CASE_INSENSITIVE)
-                                    .field("type", "custom")
-                                    .startArray("char_filter")
-                                    .endArray()
-                                    .startArray("filter")
-                                        .value("lowercase")
-                                        .value("asciifolding")
-                                    .endArray()
-                                .endObject()
-                            .endObject()
-                            .rawField(ANALYZER, generateAnalyzers(), XContentType.JSON)
-                            .rawField(TOKENIZER, generateTokenizer(), XContentType.JSON)
-                        .endObject()
-                    .endObject()
-                .endObject();
+        private CustomNormalizer generateNormalizer() {
+            return new CustomNormalizer.Builder()
+                .filter("lowercase", "asciifolding")
+                .build();
         }
 
-        private String analyzerDefault() throws IOException {
-            XContentBuilder analyzerBuilder = jsonBuilder()
-                .startObject()
-                    .startObject(KEEP_MAIL_AND_URL)
-                        .field("tokenizer", "uax_url_email")
-                        .startArray("filter")
-                            .value("lowercase")
-                            .value("stop")
-                        .endArray()
-                    .endObject()
-                .endObject();
-
-            return Strings.toString(analyzerBuilder);
+        private SnowballTokenFilter generateFilter() {
+            return new SnowballTokenFilter.Builder()
+                .language(SnowballLanguage.English)
+                .build();
         }
 
-        private InputStream generateAnalyzers() {
-            return new ByteArrayInputStream(customAnalyzers.orElseGet(Throwing.supplier(() -> IndexCreationCustomElement.from(analyzerDefault())).sneakyThrow())
-                .getPayload()
-                .getBytes(StandardCharsets.UTF_8));
+        private Map<String, Analyzer> defaultAnalyzers() {
+            return ImmutableMap.of(
+                KEEP_MAIL_AND_URL, new Analyzer.Builder().custom(
+                    new CustomAnalyzer.Builder()
+                        .tokenizer("uax_url_email")
+                        .filter("lowercase", "stop")
+                        .build())
+                    .build()
+            );
+        }
+
+        private Map<String, Analyzer> generateAnalyzers() {
+            if (customAnalyzers.isEmpty()) {
+                return defaultAnalyzers();
+            }
+            return customAnalyzers.stream()
+                .collect(Collectors.toMap(
+                    IndexCreationCustomAnalyzer::getKey,
+                    IndexCreationCustomAnalyzer::getAnalyzer));
         }
 
-        private InputStream generateTokenizer() {
-            return new ByteArrayInputStream(customTokenizers.orElse(IndexCreationCustomElement.EMPTY)
-                .getPayload()
-                .getBytes(StandardCharsets.UTF_8));
+        private Map<String, Tokenizer> generateTokenizers() {
+            return customTokenizers.stream()
+                .collect(Collectors.toMap(
+                    IndexCreationCustomTokenizer::getKey,
+                    IndexCreationCustomTokenizer::getTokenizer
+                ));
         }
     }
 
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchHealthCheck.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchHealthCheck.java
index de26140daf..49874fc8f9 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchHealthCheck.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchHealthCheck.java
@@ -19,7 +19,10 @@
 
 package org.apache.james.backends.opensearch;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
@@ -27,9 +30,8 @@ import org.apache.commons.lang3.NotImplementedException;
 import org.apache.james.core.healthcheck.ComponentName;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.core.healthcheck.Result;
-import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.opensearch.client.Requests;
+import org.opensearch.client.opensearch.cluster.HealthRequest;
+import org.opensearch.client.opensearch.cluster.HealthResponse;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -54,24 +56,30 @@ public class OpenSearchHealthCheck implements HealthCheck {
 
     @Override
     public Mono<Result> check() {
-        String[] indices = indexNames.stream()
+        List<String> indices = indexNames.stream()
             .map(IndexName::getValue)
-            .toArray(String[]::new);
-        ClusterHealthRequest request = Requests.clusterHealthRequest(indices);
+            .collect(Collectors.toList());
+        HealthRequest request = new HealthRequest.Builder()
+            .index(indices)
+            .build();
 
-        return client.health(request)
-            .map(this::toHealthCheckResult)
-            .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, "Error while contacting cluster", e)));
+        try {
+            return client.health(request)
+                .map(this::toHealthCheckResult)
+                .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, "Error while contacting cluster", e)));
+        } catch (IOException e) {
+            return Mono.error(e);
+        }
     }
 
     @VisibleForTesting
-    Result toHealthCheckResult(ClusterHealthResponse response) {
-        switch (response.getStatus()) {
-            case GREEN:
-            case YELLOW:
+    Result toHealthCheckResult(HealthResponse response) {
+        switch (response.status()) {
+            case Green:
+            case Yellow:
                 return Result.healthy(COMPONENT_NAME);
-            case RED:
-                return Result.unhealthy(COMPONENT_NAME, response.getClusterName() + " status is RED");
+            case Red:
+                return Result.unhealthy(COMPONENT_NAME, response.clusterName() + " status is RED");
             default:
                 throw new NotImplementedException("Un-handled OpenSearch cluster status");
         }
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchIndexer.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchIndexer.java
index 9a0002cf91..19c46e0dac 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchIndexer.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchIndexer.java
@@ -18,24 +18,23 @@
  ****************************************************************/
 package org.apache.james.backends.opensearch;
 
+import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
-import org.opensearch.action.bulk.BulkRequest;
-import org.opensearch.action.bulk.BulkResponse;
-import org.opensearch.action.delete.DeleteRequest;
-import org.opensearch.action.get.GetRequest;
-import org.opensearch.action.get.GetResponse;
-import org.opensearch.action.index.IndexRequest;
-import org.opensearch.action.index.IndexResponse;
-import org.opensearch.action.update.UpdateRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.common.ValidationException;
-import org.opensearch.common.xcontent.XContentType;
-import org.opensearch.index.query.QueryBuilder;
+import org.opensearch.client.opensearch._types.query_dsl.Query;
+import org.opensearch.client.opensearch.core.BulkRequest;
+import org.opensearch.client.opensearch.core.BulkResponse;
+import org.opensearch.client.opensearch.core.GetRequest;
+import org.opensearch.client.opensearch.core.GetResponse;
+import org.opensearch.client.opensearch.core.IndexRequest;
+import org.opensearch.client.opensearch.core.IndexResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.util.RawValue;
 import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Mono;
@@ -59,11 +58,17 @@ public class OpenSearchIndexer {
     public Mono<IndexResponse> index(DocumentId id, String content, RoutingKey routingKey) {
         checkArgument(content);
         logContent(id, content);
-        return client.index(new IndexRequest(aliasName.getValue())
+
+        try {
+            return client.index(new IndexRequest.Builder<>()
+                .index(aliasName.getValue())
                 .id(id.asString())
-                .source(content, XContentType.JSON)
-                .routing(routingKey.asString()),
-            RequestOptions.DEFAULT);
+                .document(new RawValue(content))
+                .routing(routingKey.asString())
+                .build());
+        } catch (IOException e) {
+            return Mono.error(e);
+        }
     }
 
     private void logContent(DocumentId id, String content) {
@@ -75,50 +80,70 @@ public class OpenSearchIndexer {
     public Mono<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts, RoutingKey routingKey) {
         Preconditions.checkNotNull(updatedDocumentParts);
         Preconditions.checkNotNull(routingKey);
-        BulkRequest request = new BulkRequest();
-        updatedDocumentParts.forEach(updatedDocumentPart -> request.add(
-            new UpdateRequest(aliasName.getValue(),
-                updatedDocumentPart.getId().asString())
-                .doc(updatedDocumentPart.getUpdatedDocumentPart(), XContentType.JSON)
-                .routing(routingKey.asString())));
-
-        return client.bulk(request, RequestOptions.DEFAULT)
-            .onErrorResume(ValidationException.class, exception -> {
-                LOGGER.warn("Error while updating index", exception);
-                return Mono.empty();
-            });
+
+        if (updatedDocumentParts.isEmpty()) {
+            return Mono.empty();
+        }
+
+        BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
+        updatedDocumentParts.forEach(updatedDocumentPart -> bulkBuilder.operations(
+            op -> op.update(idx -> idx
+                .index(aliasName.getValue())
+                .id(updatedDocumentPart.getId().asString())
+                .document(Collections.singletonMap("doc", new RawValue(updatedDocumentPart.getUpdatedDocumentPart())))
+                .routing(routingKey.asString())
+            )));
+
+        try {
+            return client.bulk(bulkBuilder.build());
+        } catch (IOException e) {
+            return Mono.error(e);
+        }
     }
 
     public Mono<BulkResponse> delete(List<DocumentId> ids, RoutingKey routingKey) {
-        BulkRequest request = new BulkRequest();
-        ids.forEach(id -> request.add(
-            new DeleteRequest(aliasName.getValue())
+        if (ids.isEmpty()) {
+            return Mono.empty();
+        }
+
+        BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
+
+        ids.forEach(id -> bulkBuilder.operations(
+            op -> op.delete(idx -> idx
+                .index(aliasName.getValue())
                 .id(id.asString())
-                .routing(routingKey.asString())));
+                .routing(routingKey.asString())
+            )));
 
-        return client.bulk(request, RequestOptions.DEFAULT)
-            .onErrorResume(ValidationException.class, exception -> {
-                LOGGER.warn("Error while deleting index", exception);
-                return Mono.empty();
-            });
+        try {
+            return client.bulk(bulkBuilder.build());
+        } catch (IOException e) {
+            return Mono.error(e);
+        }
     }
 
-    public Mono<Void> deleteAllMatchingQuery(QueryBuilder queryBuilder, RoutingKey routingKey) {
-        return deleteByQueryPerformer.perform(queryBuilder, routingKey);
+    public Mono<Void> deleteAllMatchingQuery(Query query, RoutingKey routingKey) {
+        return deleteByQueryPerformer.perform(query, routingKey);
     }
 
     private void checkArgument(String content) {
         Preconditions.checkArgument(content != null, "content should be provided");
     }
 
-    public Mono<GetResponse> get(DocumentId id, RoutingKey routingKey) {
-        return Mono.fromRunnable(() -> {
-                Preconditions.checkNotNull(id);
-                Preconditions.checkNotNull(routingKey);
-            })
-            .then(client.get(new GetRequest(aliasName.getValue())
-                    .id(id.asString())
-                    .routing(routingKey.asString()),
-                RequestOptions.DEFAULT));
+    public Mono<GetResponse<ObjectNode>> get(DocumentId id, RoutingKey routingKey) {
+        try {
+            return Mono.fromRunnable(() -> {
+                    Preconditions.checkNotNull(id);
+                    Preconditions.checkNotNull(routingKey);
+                })
+                .then(client.get(
+                    new GetRequest.Builder()
+                        .index(aliasName.getValue())
+                        .id(id.asString())
+                        .routing(routingKey.asString())
+                        .build()));
+        } catch (IOException e) {
+            return Mono.error(e);
+        }
     }
 }
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java
index 1c739fcf37..d2818e2ad3 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java
@@ -20,160 +20,129 @@
 package org.apache.james.backends.opensearch;
 
 import java.io.IOException;
-import java.util.function.Consumer;
-
-import org.opensearch.action.ActionListener;
-import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.opensearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
-import org.opensearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
-import org.opensearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
-import org.opensearch.action.bulk.BulkRequest;
-import org.opensearch.action.bulk.BulkResponse;
-import org.opensearch.action.delete.DeleteRequest;
-import org.opensearch.action.delete.DeleteResponse;
-import org.opensearch.action.explain.ExplainRequest;
-import org.opensearch.action.explain.ExplainResponse;
-import org.opensearch.action.fieldcaps.FieldCapabilitiesRequest;
-import org.opensearch.action.fieldcaps.FieldCapabilitiesResponse;
-import org.opensearch.action.get.GetRequest;
-import org.opensearch.action.get.GetResponse;
-import org.opensearch.action.index.IndexRequest;
-import org.opensearch.action.index.IndexResponse;
-import org.opensearch.action.search.ClearScrollRequest;
-import org.opensearch.action.search.ClearScrollResponse;
-import org.opensearch.action.search.MultiSearchRequest;
-import org.opensearch.action.search.MultiSearchResponse;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.action.search.SearchResponse;
-import org.opensearch.action.search.SearchScrollRequest;
-import org.opensearch.action.support.master.AcknowledgedResponse;
-import org.opensearch.client.IndicesClient;
-import org.opensearch.client.RequestOptions;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
 import org.opensearch.client.RestClient;
-import org.opensearch.client.RestHighLevelClient;
-import org.opensearch.client.core.MainResponse;
-import org.opensearch.index.rankeval.RankEvalRequest;
-import org.opensearch.index.rankeval.RankEvalResponse;
-import org.opensearch.index.reindex.BulkByScrollResponse;
-import org.opensearch.index.reindex.DeleteByQueryRequest;
-import org.opensearch.script.mustache.MultiSearchTemplateRequest;
-import org.opensearch.script.mustache.MultiSearchTemplateResponse;
-import org.opensearch.script.mustache.SearchTemplateRequest;
-import org.opensearch.script.mustache.SearchTemplateResponse;
+import org.opensearch.client.opensearch.OpenSearchAsyncClient;
+import org.opensearch.client.opensearch.cluster.HealthRequest;
+import org.opensearch.client.opensearch.cluster.HealthResponse;
+import org.opensearch.client.opensearch.core.BulkRequest;
+import org.opensearch.client.opensearch.core.BulkResponse;
+import org.opensearch.client.opensearch.core.ClearScrollRequest;
+import org.opensearch.client.opensearch.core.ClearScrollResponse;
+import org.opensearch.client.opensearch.core.DeleteByQueryRequest;
+import org.opensearch.client.opensearch.core.DeleteByQueryResponse;
+import org.opensearch.client.opensearch.core.DeleteRequest;
+import org.opensearch.client.opensearch.core.DeleteResponse;
+import org.opensearch.client.opensearch.core.GetRequest;
+import org.opensearch.client.opensearch.core.GetResponse;
+import org.opensearch.client.opensearch.core.IndexRequest;
+import org.opensearch.client.opensearch.core.IndexResponse;
+import org.opensearch.client.opensearch.core.InfoResponse;
+import org.opensearch.client.opensearch.core.ScrollRequest;
+import org.opensearch.client.opensearch.core.ScrollResponse;
+import org.opensearch.client.opensearch.core.SearchRequest;
+import org.opensearch.client.opensearch.core.SearchResponse;
+import org.opensearch.client.opensearch.indices.CreateIndexRequest;
+import org.opensearch.client.opensearch.indices.CreateIndexResponse;
+import org.opensearch.client.opensearch.indices.ExistsAliasRequest;
+import org.opensearch.client.opensearch.indices.ExistsRequest;
+import org.opensearch.client.opensearch.indices.UpdateAliasesRequest;
+import org.opensearch.client.opensearch.indices.UpdateAliasesResponse;
+import org.opensearch.client.transport.endpoints.BooleanResponse;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.MonoSink;
 import reactor.core.scheduler.Schedulers;
 
 public class ReactorOpenSearchClient implements AutoCloseable {
-    private final RestHighLevelClient client;
+    private final OpenSearchAsyncClient client;
+    private final RestClient lowLevelRestClient;
 
-    public ReactorOpenSearchClient(RestHighLevelClient client) {
+    public ReactorOpenSearchClient(OpenSearchAsyncClient client, RestClient lowLevelRestClient) {
         this.client = client;
+        this.lowLevelRestClient = lowLevelRestClient;
     }
 
-    public Mono<BulkResponse> bulk(BulkRequest bulkRequest, RequestOptions options) {
-        return toReactor(listener -> client.bulkAsync(bulkRequest, options, listener));
-    }
-
-    public Mono<ClearScrollResponse> clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) {
-        return toReactor(listener -> client.clearScrollAsync(clearScrollRequest, options, listener));
-    }
-
-    public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException {
-        return client.delete(deleteRequest, options);
-    }
-
-    public Mono<BulkByScrollResponse> deleteByQuery(DeleteByQueryRequest deleteRequest, RequestOptions options) {
-        return toReactor(listener -> client.deleteByQueryAsync(deleteRequest, options, listener));
+    public Mono<BulkResponse> bulk(BulkRequest bulkRequest) throws IOException {
+        return toReactor(client.bulk(bulkRequest));
     }
 
-    public Mono<AcknowledgedResponse> deleteScript(DeleteStoredScriptRequest request, RequestOptions options) {
-        return toReactor(listener -> client.deleteScriptAsync(request, options, listener));
+    public Mono<ClearScrollResponse> clearScroll(ClearScrollRequest clearScrollRequest) throws IOException {
+        return toReactor(client.clearScroll(clearScrollRequest));
     }
 
-    public Mono<ExplainResponse> explain(ExplainRequest explainRequest, RequestOptions options) {
-        return toReactor(listener -> client.explainAsync(explainRequest, options, listener));
+    public Mono<DeleteResponse> delete(DeleteRequest deleteRequest) throws IOException {
+        return toReactor(client.delete(deleteRequest));
     }
 
-    public Mono<FieldCapabilitiesResponse> fieldCaps(FieldCapabilitiesRequest fieldCapabilitiesRequest, RequestOptions options) {
-        return toReactor(listener -> client.fieldCapsAsync(fieldCapabilitiesRequest, options, listener));
+    public Mono<DeleteByQueryResponse> deleteByQuery(DeleteByQueryRequest deleteRequest) throws IOException {
+        return toReactor(client.deleteByQuery(deleteRequest));
     }
 
     public RestClient getLowLevelClient() {
-        return client.getLowLevelClient();
+        return lowLevelRestClient;
     }
 
-    public Mono<GetStoredScriptResponse> getScript(GetStoredScriptRequest request, RequestOptions options) {
-        return toReactor(listener -> client.getScriptAsync(request, options, listener));
+    public <T> Mono<IndexResponse> index(IndexRequest<T> indexRequest) throws IOException {
+        return toReactor(client.index(indexRequest));
     }
 
-    public Mono<IndexResponse> index(IndexRequest indexRequest, RequestOptions options) {
-        return toReactor(listener -> client.indexAsync(indexRequest, options, listener));
+    public Mono<BooleanResponse> indexExists(ExistsRequest existsRequest) throws IOException {
+        return toReactor(client.indices().exists(existsRequest));
     }
 
-    public IndicesClient indices() {
-        return client.indices();
+    public Mono<BooleanResponse> aliasExists(ExistsAliasRequest existsAliasRequest) throws IOException {
+        return toReactor(client.indices().existsAlias(existsAliasRequest));
     }
 
-    public MainResponse info(RequestOptions options) throws IOException {
-        return client.info(options);
+    public Mono<CreateIndexResponse> createIndex(CreateIndexRequest indexRequest) throws IOException {
+        return toReactor(client.indices().create(indexRequest));
     }
 
-    public Mono<MultiSearchResponse> msearch(MultiSearchRequest multiSearchRequest, RequestOptions options) {
-        return toReactor(listener -> client.msearchAsync(multiSearchRequest, options, listener));
+    public Mono<UpdateAliasesResponse> updateAliases(UpdateAliasesRequest updateAliasesRequest) throws IOException {
+        return toReactor(client.indices().updateAliases(updateAliasesRequest));
     }
 
-    public Mono<MultiSearchTemplateResponse> msearchTemplate(MultiSearchTemplateRequest multiSearchTemplateRequest, RequestOptions options) {
-        return toReactor(listener -> client.msearchTemplateAsync(multiSearchTemplateRequest, options, listener));
+    public Mono<InfoResponse> info() throws IOException {
+        return toReactor(client.info());
     }
 
-    public Mono<RankEvalResponse> rankEval(RankEvalRequest rankEvalRequest, RequestOptions options) {
-        return toReactor(listener -> client.rankEvalAsync(rankEvalRequest, options, listener));
+    public Mono<ScrollResponse<ObjectNode>> scroll(ScrollRequest scrollRequest) throws IOException {
+        return toReactor(client.scroll(scrollRequest, ObjectNode.class));
     }
 
-    public Mono<SearchResponse> scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) {
-        return toReactor(listener -> client.scrollAsync(searchScrollRequest, options, listener));
+    public Mono<SearchResponse<ObjectNode>> search(SearchRequest searchRequest) throws IOException {
+        return toReactor(client.search(searchRequest, ObjectNode.class));
     }
 
-    public Mono<SearchResponse> search(SearchRequest searchRequest, RequestOptions options) {
-        return toReactor(listener -> client.searchAsync(searchRequest, options, listener));
+    public Mono<HealthResponse> health(HealthRequest request) throws IOException {
+        return toReactor(client.cluster().health(request));
     }
 
-    public Mono<ClusterHealthResponse> health(ClusterHealthRequest request) {
-        return toReactor(listener -> client.cluster()
-            .healthAsync(request, RequestOptions.DEFAULT, listener));
-    }
-
-    public Mono<SearchTemplateResponse> searchTemplate(SearchTemplateRequest searchTemplateRequest, RequestOptions options) {
-        return toReactor(listener -> client.searchTemplateAsync(searchTemplateRequest, options, listener));
-    }
-
-    public Mono<GetResponse> get(GetRequest getRequest, RequestOptions options) {
-        return toReactor(listener -> client.getAsync(getRequest, options, listener));
+    public Mono<GetResponse<ObjectNode>> get(GetRequest getRequest) throws IOException {
+        return toReactor(client.get(getRequest, ObjectNode.class));
     }
 
     @Override
     public void close() throws IOException {
-        client.close();
+        lowLevelRestClient.close();
     }
 
-    private static <T> Mono<T> toReactor(Consumer<ActionListener<T>> async) {
-        return Mono.<T>create(sink -> async.accept(getListener(sink)))
-            .publishOn(Schedulers.boundedElastic());
+    private static <T> Mono<T> toReactor(CompletableFuture<T> async) {
+        return Mono.<T>create(sink -> async.whenComplete(getFuture(sink)))
+            .publishOn(Schedulers.elastic());
     }
 
-    private static <T> ActionListener<T> getListener(MonoSink<T> sink) {
-        return new ActionListener<T>() {
-            @Override
-            public void onResponse(T t) {
-                sink.success(t);
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                sink.error(e);
+    private static <T> BiConsumer<? super T, ? super Throwable> getFuture(MonoSink<T> sink) {
+        return (response, exception) -> {
+            if (exception != null) {
+                sink.error(exception);
+            } else {
+                sink.success(response);
             }
         };
     }
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/RoutingKey.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/RoutingKey.java
index bb823f8e9f..4dedbc67fe 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/RoutingKey.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/RoutingKey.java
@@ -21,9 +21,8 @@ package org.apache.james.backends.opensearch;
 
 import java.util.Objects;
 
-import org.opensearch.common.Strings;
-
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 
 public class RoutingKey {
     public interface Factory<T> {
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/UpdatedRepresentation.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/UpdatedRepresentation.java
index f221ccd45c..76c415b9e1 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/UpdatedRepresentation.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/UpdatedRepresentation.java
@@ -20,10 +20,9 @@ package org.apache.james.backends.opensearch;
 
 import java.util.Objects;
 
-import org.opensearch.common.Strings;
-
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 
 public class UpdatedRepresentation {
     private final DocumentId id;
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/search/ScrolledSearch.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/search/ScrolledSearch.java
index ecb9cf955a..3a5d22d1ce 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/search/ScrolledSearch.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/search/ScrolledSearch.java
@@ -19,19 +19,20 @@
 
 package org.apache.james.backends.opensearch.search;
 
+import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import org.apache.james.backends.opensearch.ReactorOpenSearchClient;
-import org.opensearch.action.search.ClearScrollRequest;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.action.search.SearchResponse;
-import org.opensearch.action.search.SearchScrollRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.common.unit.TimeValue;
-import org.opensearch.search.SearchHit;
-
+import org.opensearch.client.opensearch._types.Time;
+import org.opensearch.client.opensearch.core.ClearScrollRequest;
+import org.opensearch.client.opensearch.core.ScrollRequest;
+import org.opensearch.client.opensearch.core.ScrollResponse;
+import org.opensearch.client.opensearch.core.SearchRequest;
+import org.opensearch.client.opensearch.core.search.Hit;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.github.fge.lambdas.Throwing;
 
 import reactor.core.publisher.Flux;
@@ -39,8 +40,9 @@ import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 
 public class ScrolledSearch {
-
-    private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
+    private static final Time TIMEOUT = new Time.Builder()
+        .time("1m")
+        .build();
 
     private final ReactorOpenSearchClient client;
     private final SearchRequest searchRequest;
@@ -50,12 +52,12 @@ public class ScrolledSearch {
         this.searchRequest = searchRequest;
     }
 
-    public Flux<SearchHit> searchHits() {
+    public Flux<Hit<ObjectNode>> searchHits() {
         return searchResponses()
-            .concatMap(searchResponse -> Flux.just(searchResponse.getHits().getHits()));
+            .concatMap(searchResponse -> Flux.fromIterable(searchResponse.hits().hits()));
     }
 
-    public Flux<SearchResponse> searchResponses() {
+    private Flux<ScrollResponse<ObjectNode>> searchResponses() {
         return Flux.push(sink -> {
             AtomicReference<Optional<String>> scrollId = new AtomicReference<>(Optional.empty());
             sink.onRequest(numberOfRequestedElements -> next(sink, scrollId, numberOfRequestedElements));
@@ -64,17 +66,16 @@ public class ScrolledSearch {
         });
     }
 
-    private void next(FluxSink<SearchResponse> sink, AtomicReference<Optional<String>> scrollId, long numberOfRequestedElements) {
+    private void next(FluxSink<ScrollResponse<ObjectNode>> sink, AtomicReference<Optional<String>> scrollId, long numberOfRequestedElements) {
         if (numberOfRequestedElements <= 0) {
             return;
         }
 
-        Consumer<SearchResponse> onResponse = searchResponse -> {
-            scrollId.set(Optional.of(searchResponse.getScrollId()));
+        Consumer<ScrollResponse<ObjectNode>> onResponse = searchResponse -> {
+            scrollId.set(Optional.of(searchResponse.scrollId()));
             sink.next(searchResponse);
 
-            boolean noHitsLeft = searchResponse.getHits().getHits().length == 0;
-            if (noHitsLeft) {
+            if (searchResponse.hits().hits().isEmpty()) {
                 sink.complete();
             } else {
                 next(sink, scrollId, numberOfRequestedElements - 1);
@@ -87,22 +88,33 @@ public class ScrolledSearch {
             .subscribe(onResponse, onFailure);
     }
 
-    private Mono<SearchResponse> buildRequest(Optional<String> scrollId) {
-        return scrollId.map(id ->
-            client.scroll(
-                new SearchScrollRequest()
-                    .scrollId(scrollId.get())
-                    .scroll(TIMEOUT),
-                RequestOptions.DEFAULT))
-            .orElseGet(() -> client.search(searchRequest, RequestOptions.DEFAULT));
+    private Mono<ScrollResponse<ObjectNode>> buildRequest(Optional<String> scrollId) {
+        return scrollId.map(Throwing.function(id -> client.scroll(new ScrollRequest.Builder()
+                            .scrollId(scrollId.get())
+                            .scroll(TIMEOUT)
+                            .build())).sneakyThrow())
+            .orElseGet(() -> {
+                try {
+                    return client.search(searchRequest)
+                        .map(response -> new ScrollResponse.Builder<ObjectNode>()
+                            .scrollId(response.scrollId())
+                            .hits(response.hits())
+                            .took(response.took())
+                            .timedOut(response.timedOut())
+                            .shards(response.shards())
+                            .build());
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
     }
 
     public void close(AtomicReference<Optional<String>> scrollId) {
-        scrollId.get().map(id -> {
-                ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
-                clearScrollRequest.addScrollId(id);
-                return clearScrollRequest;
-            }).ifPresent(Throwing.<ClearScrollRequest>consumer(clearScrollRequest -> client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT).subscribe()).sneakyThrow());
+        scrollId.get().map(id -> new ClearScrollRequest.Builder()
+                .scrollId(id)
+                .build())
+            .ifPresent(Throwing.<ClearScrollRequest>consumer(clearScrollRequest ->
+                client.clearScroll(clearScrollRequest).subscribe()).sneakyThrow());
     }
 
 }
diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/ClientProviderImplConnectionContract.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/ClientProviderImplConnectionContract.java
index 9434324172..9f89156f42 100644
--- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/ClientProviderImplConnectionContract.java
+++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/ClientProviderImplConnectionContract.java
@@ -24,10 +24,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.james.backends.opensearch.OpenSearchClusterExtension.OpenSearchCluster;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.index.query.QueryBuilders;
-import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery;
+import org.opensearch.client.opensearch.core.SearchRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,9 +74,9 @@ interface ClientProviderImplConnectionContract {
     default boolean isConnected(ClientProvider clientProvider) {
         try (ReactorOpenSearchClient client = clientProvider.get()) {
             client.search(
-                new SearchRequest()
-                    .source(new SearchSourceBuilder().query(QueryBuilders.existsQuery("any"))),
-                RequestOptions.DEFAULT).block();
+                new SearchRequest.Builder()
+                    .query(new MatchAllQuery.Builder().build()._toQuery())
+                    .build()).block();
             return true;
         } catch (Exception e) {
             LOGGER.info("Caught exception while trying to connect", e);
diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/DockerOpenSearchExtension.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/DockerOpenSearchExtension.java
index 34a105b5e6..021a60d790 100644
--- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/DockerOpenSearchExtension.java
+++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/DockerOpenSearchExtension.java
@@ -19,18 +19,14 @@
 
 package org.apache.james.backends.opensearch;
 
-import java.time.Duration;
-
 import org.junit.jupiter.api.extension.AfterEachCallback;
 import org.junit.jupiter.api.extension.BeforeEachCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.api.extension.ParameterContext;
 import org.junit.jupiter.api.extension.ParameterResolutionException;
 import org.junit.jupiter.api.extension.ParameterResolver;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.index.query.QueryBuilders;
-import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery;
+import org.opensearch.client.opensearch.core.SearchRequest;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachCallback, ParameterResolver {
@@ -57,20 +53,20 @@ public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachC
         }
 
         @Override
-        public void clean(DockerOpenSearch elasticSearch) {
+        public void clean(DockerOpenSearch openSearch) {
             Awaitility.await()
                 .until(() -> {
-                    elasticSearch.flushIndices();
-                    ReactorOpenSearchClient client = elasticSearch.clientProvider().get();
+                    openSearch.flushIndices();
+                    ReactorOpenSearchClient client = openSearch.clientProvider().get();
                     new DeleteByQueryPerformer(client, aliasName)
-                        .perform(QueryBuilders.matchAllQuery())
+                        .perform(new MatchAllQuery.Builder().build()._toQuery())
                         .block();
-                    SearchRequest searchRequest = new SearchRequest();
-                    searchRequest.source(new SearchSourceBuilder()
-                        .query(QueryBuilders.matchAllQuery()));
-                    elasticSearch.flushIndices();
-                    boolean result = client.search(new SearchRequest(searchRequest), RequestOptions.DEFAULT)
-                        .map(searchResponse -> searchResponse.getHits().getHits().length)
+                    SearchRequest searchRequest = new SearchRequest.Builder()
+                        .query(new MatchAllQuery.Builder().build()._toQuery())
+                        .build();
+                    openSearch.flushIndices();
+                    boolean result = client.search(searchRequest)
+                        .map(searchResponse -> searchResponse.hits().hits().size())
                         .block() == 0;
 
                     try {
@@ -83,7 +79,7 @@ public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachC
         }
     }
 
-    private final DockerOpenSearch elasticSearch = DockerOpenSearchSingleton.INSTANCE;
+    private final DockerOpenSearch openSearch = DockerOpenSearchSingleton.INSTANCE;
     private final CleanupStrategy cleanupStrategy;
 
     public DockerOpenSearchExtension() {
@@ -96,13 +92,13 @@ public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachC
 
     @Override
     public void afterEach(ExtensionContext context) {
-        cleanupStrategy.clean(elasticSearch);
+        cleanupStrategy.clean(openSearch);
     }
 
     @Override
     public void beforeEach(ExtensionContext extensionContext) {
-        if (!elasticSearch.isRunning()) {
-            elasticSearch.unpause();
+        if (!openSearch.isRunning()) {
+            openSearch.unpause();
         }
         awaitForOpenSearch();
     }
@@ -114,14 +110,14 @@ public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachC
 
     @Override
     public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
-        return elasticSearch;
+        return openSearch;
     }
 
     public void awaitForOpenSearch() {
-        elasticSearch.flushIndices();
+        openSearch.flushIndices();
     }
 
     public DockerOpenSearch getDockerOpenSearch() {
-        return elasticSearch;
+        return openSearch;
     }
 }
diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/IndexCreationFactoryTest.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/IndexCreationFactoryTest.java
index 8cda09676f..2b169aaaa5 100644
--- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/IndexCreationFactoryTest.java
+++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/IndexCreationFactoryTest.java
@@ -19,78 +19,77 @@
 
 package org.apache.james.backends.opensearch;
 
-import static org.apache.james.backends.opensearch.IndexCreationFactory.ANALYZER;
-import static org.apache.james.backends.opensearch.IndexCreationFactory.TOKENIZER;
-import static org.apache.james.backends.opensearch.IndexCreationFactory.TYPE;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
 
 import java.io.IOException;
 import java.util.Optional;
 
-import org.apache.james.backends.opensearch.IndexCreationFactory.IndexCreationCustomElement;
+import org.apache.james.backends.opensearch.IndexCreationFactory.IndexCreationCustomAnalyzer;
+import org.apache.james.backends.opensearch.IndexCreationFactory.IndexCreationCustomTokenizer;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.opensearch.OpenSearchStatusException;
-import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.client.opensearch._types.analysis.Analyzer;
+import org.opensearch.client.opensearch._types.analysis.CustomAnalyzer;
+import org.opensearch.client.opensearch._types.analysis.NGramTokenFilter;
+import org.opensearch.client.opensearch._types.analysis.PatternTokenizer;
+import org.opensearch.client.opensearch._types.analysis.TokenFilter;
+import org.opensearch.client.opensearch._types.analysis.TokenFilterDefinition;
+import org.opensearch.client.opensearch._types.analysis.Tokenizer;
+import org.opensearch.client.opensearch._types.analysis.TokenizerDefinition;
+import org.opensearch.client.opensearch.indices.IndexSettings;
+import org.opensearch.client.opensearch.indices.IndexSettingsAnalysis;
 
 class IndexCreationFactoryTest {
     private static final IndexName INDEX_NAME = new IndexName("index");
     private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
 
-    public static XContentBuilder getValidIndexSetting() throws IOException {
-        return jsonBuilder()
-            .startObject()
-                .startObject("settings")
-                    .startObject("index")
-                        .field("max_ngram_diff", 10)
-                    .endObject()
-                    .startObject("analysis")
-                        .startObject(ANALYZER)
-                            .startObject("email_ngram_filter_analyzer")
-                                .field(TOKENIZER, "uax_url_email")
-                                .startArray("filter")
-                                    .value("ngram_filter")
-                                .endArray()
-                            .endObject()
-                        .endObject()
-                        .startObject("filter")
-                            .startObject("ngram_filter")
-                                .field(TYPE, "ngram")
-                                .field("min_gram", 3)
-                                .field("max_gram", 13)
-                            .endObject()
-                        .endObject()
-                    .endObject()
-                .endObject()
-            .endObject();
+    public static IndexSettings getValidIndexSetting() {
+        return new IndexSettings.Builder()
+            .index(new IndexSettings.Builder()
+                .maxNgramDiff(10)
+                .build())
+            .analysis(new IndexSettingsAnalysis.Builder()
+                .analyzer("email_ngram_filter_analyzer", new Analyzer.Builder()
+                    .custom(generateAnalyzer())
+                    .build())
+                .filter("ngram_filter", new TokenFilter.Builder()
+                    .definition(new TokenFilterDefinition.Builder()
+                        .ngram(generateFilter())
+                        .build())
+                    .build())
+                .build())
+            .build();
     }
 
-    public static XContentBuilder getInvalidIndexSetting() throws IOException {
-        return jsonBuilder()
-            .startObject()
-                .startObject("settings")
-                    .startObject("analysis")
-                        .startObject(ANALYZER)
-                            .startObject("email_ngram_filter_analyzer")
-                                .field(TOKENIZER, "uax_url_email")
-                                .startArray("filter")
-                                    .value("ngram_filter")
-                                .endArray()
-                            .endObject()
-                        .endObject()
-                        .startObject("filter")
-                            .startObject("ngram_filter")
-                                .field(TYPE, "ngram")
-                                .field("min_gram", 3)
-                                .field("max_gram", 13)
-                            .endObject()
-                        .endObject()
-                    .endObject()
-                .endObject()
-            .endObject();
+    public static IndexSettings getInvalidIndexSetting() {
+        return new IndexSettings.Builder()
+            .analysis(new IndexSettingsAnalysis.Builder()
+                .analyzer("email_ngram_filter_analyzer", new Analyzer.Builder()
+                    .custom(generateAnalyzer())
+                    .build())
+                .filter("ngram_filter", new TokenFilter.Builder()
+                    .definition(new TokenFilterDefinition.Builder()
+                        .ngram(generateFilter())
+                        .build())
+                    .build())
+                .build())
+            .build();
+    }
+
+    private static CustomAnalyzer generateAnalyzer() {
+        return new CustomAnalyzer.Builder()
+            .tokenizer("uax_url_email")
+            .filter("ngram_filter")
+            .build();
+    }
+
+    private static NGramTokenFilter generateFilter() {
+        return new NGramTokenFilter.Builder()
+            .minGram(3)
+            .maxGram(13)
+            .build();
     }
 
     @RegisterExtension
@@ -133,19 +132,14 @@ class IndexCreationFactoryTest {
         new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION)
             .useIndex(INDEX_NAME)
             .addAlias(ALIAS_NAME)
-            .customAnalyzers(IndexCreationCustomElement.from("{" +
-                "    \"my_custom_analyzer\": {" +
-                "        \"type\": \"custom\"," +
-                "        \"tokenizer\": \"standard\"," +
-                "        \"char_filter\": [" +
-                "            \"html_strip\"" +
-                "        ]," +
-                "        \"filter\": [" +
-                "            \"lowercase\"," +
-                "            \"asciifolding\"" +
-                "        ]" +
-                "    }" +
-                "}"))
+            .customAnalyzers(new IndexCreationCustomAnalyzer("my_custom_analyzer",
+                new Analyzer.Builder()
+                    .custom(new CustomAnalyzer.Builder()
+                        .tokenizer("standard")
+                        .filter("lowercase", "asciifolding")
+                        .charFilter("html_strip")
+                        .build())
+                    .build()))
             .createIndexAndAliases(client);
     }
 
@@ -155,14 +149,14 @@ class IndexCreationFactoryTest {
             new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION)
                 .useIndex(INDEX_NAME)
                 .addAlias(ALIAS_NAME)
-                .customAnalyzers(IndexCreationCustomElement.from("{" +
-                    "    \"my_custom_analyzer\": {" +
-                    "        \"type\": \"invalid\"," +
-                    "        \"tokenizer\": \"not_Found_tokenizer\"" +
-                    "    }" +
-                    "}"))
+                .customAnalyzers(new IndexCreationCustomAnalyzer("my_custom_analyzer",
+                    new Analyzer.Builder()
+                        .custom(new CustomAnalyzer.Builder()
+                            .tokenizer("not_Found_tokenizer")
+                            .build())
+                        .build()))
                 .createIndexAndAliases(client))
-            .isInstanceOf(OpenSearchStatusException.class);
+            .isInstanceOf(Exception.class);
     }
 
     @Test
@@ -170,12 +164,16 @@ class IndexCreationFactoryTest {
         new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION)
             .useIndex(INDEX_NAME)
             .addAlias(ALIAS_NAME)
-            .customTokenizers(IndexCreationCustomElement.from("{" +
-                "        \"custom_tokenizer\": { " +
-                "          \"type\": \"pattern\"," +
-                "          \"pattern\": \"[ .,!?]\"" +
-                "        }" +
-                "      }"))
+            .customTokenizers(new IndexCreationCustomTokenizer("custom_tokenizer",
+                new Tokenizer.Builder()
+                    .definition(new TokenizerDefinition.Builder()
+                        .pattern(new PatternTokenizer.Builder()
+                            .pattern("[ .,!?]")
+                            .flags("CASE_INSENSITIVE|COMMENTS")
+                            .group(0)
+                            .build())
+                        .build())
+                    .build()))
             .createIndexAndAliases(client);
     }
 
@@ -185,14 +183,16 @@ class IndexCreationFactoryTest {
             new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION)
                 .useIndex(INDEX_NAME)
                 .addAlias(ALIAS_NAME)
-                .customTokenizers(IndexCreationCustomElement.from("{" +
-                    "        \"custom_tokenizer\": { " +
-                    "          \"type\": \"invalidType\"," +
-                    "          \"pattern\": \"[ .,!?]\"" +
-                    "        }" +
-                    "      }"))
+                .customTokenizers(new IndexCreationCustomTokenizer("custom_tokenizer",
+                    new Tokenizer.Builder()
+                        .definition(new TokenizerDefinition.Builder()
+                            .pattern(new PatternTokenizer.Builder()
+                                .pattern("[ .,!?]")
+                                .build())
+                            .build())
+                        .build()))
                 .createIndexAndAliases(client))
-            .isInstanceOf(OpenSearchStatusException.class);
+            .isInstanceOf(Exception.class);
     }
 
     @Test
@@ -200,25 +200,28 @@ class IndexCreationFactoryTest {
         new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION)
             .useIndex(INDEX_NAME)
             .addAlias(ALIAS_NAME)
-            .customAnalyzers(IndexCreationCustomElement.from("{" +
-                "        \"my_custom_analyzer\": { " +
-                "          \"tokenizer\": \"custom_tokenizer\"," +
-                "          \"filter\": [" +
-                "            \"lowercase\"" +
-                "          ]" +
-                "        }" +
-                "      }"))
-            .customTokenizers(IndexCreationCustomElement.from("{" +
-                "        \"custom_tokenizer\": { " +
-                "          \"type\": \"pattern\"," +
-                "          \"pattern\": \"[ .,!?]\"" +
-                "        }" +
-                "      }"))
+            .customAnalyzers(new IndexCreationCustomAnalyzer("my_custom_analyzer",
+                new Analyzer.Builder()
+                    .custom(new CustomAnalyzer.Builder()
+                        .tokenizer("custom_tokenizer")
+                        .filter("lowercase")
+                        .build())
+                    .build()))
+            .customTokenizers(new IndexCreationCustomTokenizer("custom_tokenizer",
+                new Tokenizer.Builder()
+                    .definition(new TokenizerDefinition.Builder()
+                        .pattern(new PatternTokenizer.Builder()
+                            .pattern("[ .,!?]")
+                            .flags("CASE_INSENSITIVE|COMMENTS")
+                            .group(0)
+                            .build())
+                        .build())
+                    .build()))
             .createIndexAndAliases(client);
     }
 
     @Test
-    void customIndexSettingShouldNotThrowWhenValidSetting() throws IOException {
+    void customIndexSettingShouldNotThrowWhenValidSetting() {
         new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION)
             .useIndex(INDEX_NAME)
             .addAlias(ALIAS_NAME)
@@ -232,7 +235,7 @@ class IndexCreationFactoryTest {
                 .useIndex(INDEX_NAME)
                 .addAlias(ALIAS_NAME)
                 .createIndexAndAliases(client, Optional.of(getInvalidIndexSetting()), Optional.empty()))
-            .isInstanceOf(OpenSearchStatusException.class);
+            .isInstanceOf(Exception.class);
     }
 
 }
\ No newline at end of file
diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchHealthCheckTest.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchHealthCheckTest.java
index 5bacd05967..1e40074c7f 100644
--- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchHealthCheckTest.java
+++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchHealthCheckTest.java
@@ -22,17 +22,32 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.opensearch.cluster.ClusterName;
-import org.opensearch.cluster.ClusterState;
-import org.opensearch.cluster.block.ClusterBlocks;
-import org.opensearch.cluster.health.ClusterHealthStatus;
-import org.opensearch.cluster.node.DiscoveryNodes;
-import org.opensearch.cluster.routing.RoutingTable;
+import org.opensearch.client.opensearch._types.HealthStatus;
+import org.opensearch.client.opensearch.cluster.HealthResponse;
 
 import com.google.common.collect.ImmutableSet;
 
 class OpenSearchHealthCheckTest {
+    private static HealthResponse fakeHealthResponse(HealthStatus status) {
+        return new HealthResponse.Builder()
+            .clusterName("fake-cluster")
+            .activePrimaryShards(0)
+            .activeShards(0)
+            .activeShardsPercentAsNumber("0")
+            .delayedUnassignedShards(0)
+            .initializingShards(0)
+            .numberOfDataNodes(0)
+            .numberOfInFlightFetch(0)
+            .numberOfNodes(0)
+            .numberOfPendingTasks(0)
+            .relocatingShards(0)
+            .taskMaxWaitingInQueueMillis(String.valueOf(System.currentTimeMillis()))
+            .timedOut(false)
+            .unassignedShards(0)
+            .status(status)
+            .build();
+    }
+
     private OpenSearchHealthCheck healthCheck;
 
     @BeforeEach
@@ -42,39 +57,22 @@ class OpenSearchHealthCheckTest {
 
     @Test
     void checkShouldReturnHealthyWhenOpenSearchClusterHealthStatusIsGreen() {
-        FakeClusterHealthResponse response = new FakeClusterHealthResponse(ClusterHealthStatus.GREEN);
+        HealthResponse response = fakeHealthResponse(HealthStatus.Green);
 
         assertThat(healthCheck.toHealthCheckResult(response).isHealthy()).isTrue();
     }
 
     @Test
     void checkShouldReturnUnHealthyWhenOpenSearchClusterHealthStatusIsRed() {
-        FakeClusterHealthResponse response = new FakeClusterHealthResponse(ClusterHealthStatus.RED);
+        HealthResponse response = fakeHealthResponse(HealthStatus.Red);
 
         assertThat(healthCheck.toHealthCheckResult(response).isUnHealthy()).isTrue();
     }
 
     @Test
     void checkShouldReturnHealthyWhenOpenSearchClusterHealthStatusIsYellow() {
-        FakeClusterHealthResponse response = new FakeClusterHealthResponse(ClusterHealthStatus.YELLOW);
+        HealthResponse response = fakeHealthResponse(HealthStatus.Yellow);
 
         assertThat(healthCheck.toHealthCheckResult(response).isHealthy()).isTrue();
     }
-
-    private static class FakeClusterHealthResponse extends ClusterHealthResponse {
-        private final ClusterHealthStatus status;
-
-        private FakeClusterHealthResponse(ClusterHealthStatus clusterHealthStatus) {
-            super("fake-cluster", new String[0],
-                new ClusterState(new ClusterName("fake-cluster"), 0, null, null, RoutingTable.builder().build(),
-                    DiscoveryNodes.builder().build(),
-                    ClusterBlocks.builder().build(), null, 0, false));
-            this.status = clusterHealthStatus;
-        }
-
-        @Override
-        public ClusterHealthStatus getStatus() {
-            return this.status;
-        }
-    }
 }
diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchIndexerTest.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchIndexerTest.java
index 2c8ba75c6e..9af1bd3806 100644
--- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchIndexerTest.java
+++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchIndexerTest.java
@@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
-import static org.opensearch.index.query.QueryBuilders.termQuery;
 
 import java.io.IOException;
 
@@ -34,12 +33,13 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.opensearch.action.get.GetResponse;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.index.query.QueryBuilder;
-import org.opensearch.index.query.QueryBuilders;
-import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.client.opensearch._types.FieldValue;
+import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery;
+import org.opensearch.client.opensearch._types.query_dsl.MatchQuery;
+import org.opensearch.client.opensearch._types.query_dsl.Query;
+import org.opensearch.client.opensearch._types.query_dsl.TermQuery;
+import org.opensearch.client.opensearch.core.GetResponse;
+import org.opensearch.client.opensearch.core.SearchRequest;
 
 import com.google.common.collect.ImmutableList;
 
@@ -79,32 +79,36 @@ class OpenSearchIndexerTest {
     }
 
     @Test
-    void indexMessageShouldWork() {
+    void indexMessageShouldWork() throws IOException {
         DocumentId documentId = DocumentId.fromString("1");
         String content = "{\"message\": \"trying out Elasticsearch\"}";
         
         testee.index(documentId, content, useDocumentId(documentId)).block();
 
-        awaitForOpenSearch(QueryBuilders.matchQuery("message", "trying"), 1L);
+        awaitForOpenSearch(new MatchQuery.Builder()
+            .field("message")
+            .query(new FieldValue.Builder().stringValue("trying").build())
+            .build()
+            ._toQuery(), 1L);
     }
     
     @Test
     void indexMessageShouldThrowWhenJsonIsNull() {
-        assertThatThrownBy(() -> testee.index(DOCUMENT_ID, null, ROUTING))
+        assertThatThrownBy(() -> testee.index(DOCUMENT_ID, null, ROUTING).block())
             .isInstanceOf(IllegalArgumentException.class);
     }
     
     @Test
-    void updateMessages() {
+    void updateMessages() throws IOException {
         String content = "{\"message\": \"trying out Elasticsearch\",\"field\":\"Should be unchanged\"}";
 
         testee.index(DOCUMENT_ID, content, useDocumentId(DOCUMENT_ID)).block();
-        awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L);
+        awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L);
 
         testee.update(ImmutableList.of(new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), useDocumentId(DOCUMENT_ID)).block();
-        awaitForOpenSearch(QueryBuilders.matchQuery("message", "mastering"), 1L);
+        awaitForOpenSearch(new MatchQuery.Builder().field("message").query(new FieldValue.Builder().stringValue("mastering").build()).build()._toQuery(), 1L);
 
-        awaitForOpenSearch(QueryBuilders.matchQuery("field", "unchanged"), 1L);
+        awaitForOpenSearch(new MatchQuery.Builder().field("field").query(new FieldValue.Builder().stringValue("unchanged").build()).build()._toQuery(), 1L);
     }
 
     @Test
@@ -136,21 +140,21 @@ class OpenSearchIndexerTest {
     }
 
     @Test
-    void deleteByQueryShouldWorkOnSingleMessage() {
+    void deleteByQueryShouldWorkOnSingleMessage() throws IOException {
         DocumentId documentId =  DocumentId.fromString("1:2");
         String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}";
         RoutingKey routingKey = useDocumentId(documentId);
 
         testee.index(documentId, content, routingKey).block();
-        awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L);
+        awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L);
 
-        testee.deleteAllMatchingQuery(termQuery("property", "1"), routingKey).block();
+        testee.deleteAllMatchingQuery(new TermQuery.Builder().field("property").value(new FieldValue.Builder().stringValue("1").build()).build()._toQuery(), routingKey).block();
 
-        awaitForOpenSearch(QueryBuilders.matchAllQuery(), 0L);
+        awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 0L);
     }
 
     @Test
-    void deleteByQueryShouldWorkWhenMultipleMessages() {
+    void deleteByQueryShouldWorkWhenMultipleMessages() throws IOException {
         DocumentId documentId = DocumentId.fromString("1:1");
         String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}";
         
@@ -165,28 +169,28 @@ class OpenSearchIndexerTest {
         String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"property\":\"2\"}";
         
         testee.index(documentId3, content3, ROUTING).block();
-        awaitForOpenSearch(QueryBuilders.matchAllQuery(), 3L);
+        awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 3L);
 
-        testee.deleteAllMatchingQuery(termQuery("property", "1"), ROUTING).block();
+        testee.deleteAllMatchingQuery(new TermQuery.Builder().field("property").value(new FieldValue.Builder().stringValue("1").build()).build()._toQuery(), ROUTING).block();
 
-        awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L);
+        awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L);
     }
     
     @Test
-    void deleteMessage() {
+    void deleteMessage() throws IOException {
         DocumentId documentId = DocumentId.fromString("1:2");
         String content = "{\"message\": \"trying out Elasticsearch\"}";
 
         testee.index(documentId, content, useDocumentId(documentId)).block();
-        awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L);
+        awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L);
 
         testee.delete(ImmutableList.of(documentId), useDocumentId(documentId)).block();
 
-        awaitForOpenSearch(QueryBuilders.matchAllQuery(), 0L);
+        awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 0L);
     }
 
     @Test
-    void deleteShouldWorkWhenMultipleMessages() {
+    void deleteShouldWorkWhenMultipleMessages() throws IOException {
         DocumentId documentId = DocumentId.fromString("1:1");
         String content = "{\"message\": \"trying out Elasticsearch\", \"mailboxId\":\"1\"}";
         testee.index(documentId, content, ROUTING).block();
@@ -199,11 +203,11 @@ class OpenSearchIndexerTest {
         String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"mailboxId\":\"2\"}";
         testee.index(documentId3, content3, ROUTING).block();
 
-        awaitForOpenSearch(QueryBuilders.matchAllQuery(), 3L);
+        awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 3L);
 
         testee.delete(ImmutableList.of(documentId, documentId3), ROUTING).block();
 
-        awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L);
+        awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L);
     }
     
     @Test
@@ -219,16 +223,16 @@ class OpenSearchIndexerTest {
     }
 
     @Test
-    void getShouldWork() {
+    void getShouldWork() throws IOException {
         DocumentId documentId = DocumentId.fromString("1");
         String content = "{\"message\":\"trying out Elasticsearch\"}";
 
         testee.index(documentId, content, useDocumentId(documentId)).block();
-        awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L);
+        awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L);
 
         GetResponse getResponse = testee.get(documentId, useDocumentId(documentId)).block();
 
-        assertThat(getResponse.getSourceAsString()).isEqualTo(content);
+        assertThat(getResponse.source().toString()).isEqualTo(content);
     }
 
     @Test
@@ -243,13 +247,13 @@ class OpenSearchIndexerTest {
             .isInstanceOf(NullPointerException.class);
     }
 
-    private void awaitForOpenSearch(QueryBuilder query, long totalHits) {
+    private void awaitForOpenSearch(Query query, long totalHits) {
         CALMLY_AWAIT.atMost(Durations.TEN_SECONDS)
             .untilAsserted(() -> assertThat(client.search(
-                new SearchRequest(INDEX_NAME.getValue())
-                    .source(new SearchSourceBuilder().query(query)),
-                RequestOptions.DEFAULT)
+                new SearchRequest.Builder()
+                    .query(query)
+                    .build())
                 .block()
-                .getHits().getTotalHits().value).isEqualTo(totalHits));
+                .hits().total().value()).isEqualTo(totalHits));
     }
 }
diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/search/ScrolledSearchTest.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/search/ScrolledSearchTest.java
index dcd10da200..9e3e32455c 100644
--- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/search/ScrolledSearchTest.java
+++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/search/ScrolledSearchTest.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.awaitility.Awaitility.await;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.james.backends.opensearch.DockerOpenSearchExtension;
 import org.apache.james.backends.opensearch.IndexCreationFactory;
@@ -36,18 +37,20 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.opensearch.action.index.IndexRequest;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.common.unit.TimeValue;
-import org.opensearch.index.query.QueryBuilders;
-import org.opensearch.search.SearchHit;
-import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.client.opensearch._types.Time;
+import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery;
+import org.opensearch.client.opensearch.core.IndexRequest;
+import org.opensearch.client.opensearch.core.SearchRequest;
+import org.opensearch.client.opensearch.core.search.Hit;
+
+import com.fasterxml.jackson.databind.util.RawValue;
 
 class ScrolledSearchTest {
-    private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
+    private static final Time TIMEOUT = new Time.Builder()
+        .time("1m")
+        .build();
     private static final int SIZE = 2;
-    private static final String MESSAGE = "message";
+    private static final String CONTENT = "{\"message\": \"Sample message\"}";
     private static final IndexName INDEX_NAME = new IndexName("index");
     private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
 
@@ -74,119 +77,129 @@ class ScrolledSearchTest {
 
     @Test
     void scrollIterableShouldWorkWhenEmpty() {
-        SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+        SearchRequest searchRequest = new SearchRequest.Builder()
+            .index(INDEX_NAME.getValue())
             .scroll(TIMEOUT)
-            .source(new SearchSourceBuilder()
-                .query(QueryBuilders.matchAllQuery())
-                .size(SIZE));
+            .query(new MatchAllQuery.Builder().build()._toQuery())
+            .size(SIZE)
+            .build();
 
         assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
             .isEmpty();
     }
 
     @Test
-    void scrollIterableShouldWorkWhenOneElement() throws Exception {
+    void scrollIterableShouldWorkWhenOneElement() throws IOException {
         String id = "1";
-        client.index(new IndexRequest(INDEX_NAME.getValue())
+        client.index(new IndexRequest.Builder<>()
+                .index(INDEX_NAME.getValue())
                 .id(id)
-                .source(MESSAGE, "Sample message"),
-            RequestOptions.DEFAULT)
-        .block();
+                .document(new RawValue(CONTENT))
+                .build())
+            .block();
 
         openSearch.awaitForOpenSearch();
         WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id));
 
-        SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+        SearchRequest searchRequest = new SearchRequest.Builder()
+            .index(INDEX_NAME.getValue())
             .scroll(TIMEOUT)
-            .source(new SearchSourceBuilder()
-                .query(QueryBuilders.matchAllQuery())
-                .size(SIZE));
+            .query(new MatchAllQuery.Builder().build()._toQuery())
+            .size(SIZE)
+            .build();
 
         assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
-            .extracting(SearchHit::getId)
+            .extracting(Hit::id)
             .containsOnly(id);
     }
 
     @Test
-    void scrollIterableShouldWorkWhenSizeElement() {
+    void scrollIterableShouldWorkWhenSizeElement() throws IOException {
         String id1 = "1";
-        client.index(new IndexRequest(INDEX_NAME.getValue())
+        client.index(new IndexRequest.Builder<>()
+                .index(INDEX_NAME.getValue())
                 .id(id1)
-                .source(MESSAGE, "Sample message"),
-            RequestOptions.DEFAULT)
+                .document(new RawValue(CONTENT))
+                .build())
             .block();
 
         String id2 = "2";
-        client.index(new IndexRequest(INDEX_NAME.getValue())
+        client.index(new IndexRequest.Builder<>()
+                .index(INDEX_NAME.getValue())
                 .id(id2)
-                .source(MESSAGE, "Sample message"),
-            RequestOptions.DEFAULT)
+                .document(new RawValue(CONTENT))
+                .build())
             .block();
 
         openSearch.awaitForOpenSearch();
         WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2));
 
-        SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+        SearchRequest searchRequest = new SearchRequest.Builder()
+            .index(INDEX_NAME.getValue())
             .scroll(TIMEOUT)
-            .source(new SearchSourceBuilder()
-                .query(QueryBuilders.matchAllQuery())
-                .size(SIZE));
+            .query(new MatchAllQuery.Builder().build()._toQuery())
+            .size(SIZE)
+            .build();
 
         assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
-            .extracting(SearchHit::getId)
+            .extracting(Hit::id)
             .containsOnly(id1, id2);
     }
 
     @Test
-    void scrollIterableShouldWorkWhenMoreThanSizeElement() {
+    void scrollIterableShouldWorkWhenMoreThanSizeElement() throws IOException {
         String id1 = "1";
-        client.index(new IndexRequest(INDEX_NAME.getValue())
+        client.index(new IndexRequest.Builder<>()
+                .index(INDEX_NAME.getValue())
                 .id(id1)
-                .source(MESSAGE, "Sample message"),
-            RequestOptions.DEFAULT)
+                .document(new RawValue(CONTENT))
+                .build())
             .block();
 
         String id2 = "2";
-        client.index(new IndexRequest(INDEX_NAME.getValue())
+        client.index(new IndexRequest.Builder<>()
+                .index(INDEX_NAME.getValue())
                 .id(id2)
-                .source(MESSAGE, "Sample message"),
-            RequestOptions.DEFAULT)
+                .document(new RawValue(CONTENT))
+                .build())
             .block();
 
         String id3 = "3";
-        client.index(new IndexRequest(INDEX_NAME.getValue())
+        client.index(new IndexRequest.Builder<>()
+                .index(INDEX_NAME.getValue())
                 .id(id3)
-                .source(MESSAGE, "Sample message"),
-            RequestOptions.DEFAULT)
+                .document(new RawValue(CONTENT))
+                .build())
             .block();
 
         openSearch.awaitForOpenSearch();
         WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3));
 
-        SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+        SearchRequest searchRequest = new SearchRequest.Builder()
+            .index(INDEX_NAME.getValue())
             .scroll(TIMEOUT)
-            .source(new SearchSourceBuilder()
-                .query(QueryBuilders.matchAllQuery())
-                .size(SIZE));
+            .query(new MatchAllQuery.Builder().build()._toQuery())
+            .size(SIZE)
+            .build();
 
         assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
-            .extracting(SearchHit::getId)
+            .extracting(Hit::id)
             .containsOnly(id1, id2, id3);
     }
 
-    private void hasIdsInIndex(ReactorOpenSearchClient client, String... ids) {
-        SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
-            .scroll(TIMEOUT)
-            .source(new SearchSourceBuilder()
-                .query(QueryBuilders.matchAllQuery()));
+    private void hasIdsInIndex(ReactorOpenSearchClient client, String... ids) throws IOException {
+        SearchRequest searchRequest = new SearchRequest.Builder()
+            .index(INDEX_NAME.getValue())
+            .query(new MatchAllQuery.Builder().build()._toQuery())
+            .build();
 
-        SearchHit[] hits = client.search(searchRequest, RequestOptions.DEFAULT)
-            .block()
-            .getHits()
-            .getHits();
+        List<String> hitIds = client.search(searchRequest)
+            .flatMapIterable(response -> response.hits().hits())
+            .map(Hit::id)
+            .collectList()
+            .block();
 
-        assertThat(hits)
-            .extracting(SearchHit::getId)
+        assertThat(hitIds)
             .contains(ids);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org