You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/01/06 03:02:40 UTC
[james-project] 01/12: JAMES-3771 Migrate backend opensearch high level to new java client
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c7cb996ee941b649f4d270ce4365aa7eaf910ed6
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Tue Jun 21 15:03:11 2022 +0700
JAMES-3771 Migrate backend opensearch high level to new java client
---
backends-common/opensearch/pom.xml | 15 +-
.../james/backends/opensearch/ClientProvider.java | 34 ++-
.../opensearch/DeleteByQueryPerformer.java | 41 ++--
.../james/backends/opensearch/DocumentId.java | 3 +-
.../backends/opensearch/IndexCreationFactory.java | 254 ++++++++++++---------
.../backends/opensearch/OpenSearchHealthCheck.java | 38 +--
.../backends/opensearch/OpenSearchIndexer.java | 121 ++++++----
.../opensearch/ReactorOpenSearchClient.java | 181 ++++++---------
.../james/backends/opensearch/RoutingKey.java | 3 +-
.../backends/opensearch/UpdatedRepresentation.java | 3 +-
.../backends/opensearch/search/ScrolledSearch.java | 74 +++---
.../ClientProviderImplConnectionContract.java | 12 +-
.../opensearch/DockerOpenSearchExtension.java | 42 ++--
.../opensearch/IndexCreationFactoryTest.java | 213 ++++++++---------
.../opensearch/OpenSearchHealthCheckTest.java | 52 ++---
.../backends/opensearch/OpenSearchIndexerTest.java | 76 +++---
.../opensearch/search/ScrolledSearchTest.java | 135 ++++++-----
17 files changed, 690 insertions(+), 607 deletions(-)
diff --git a/backends-common/opensearch/pom.xml b/backends-common/opensearch/pom.xml
index ce40d7dca4..a53014a685 100644
--- a/backends-common/opensearch/pom.xml
+++ b/backends-common/opensearch/pom.xml
@@ -47,14 +47,10 @@
<artifactId>testing-base</artifactId>
<scope>test</scope>
</dependency>
-
- <!-- Prevents https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-28491 -->
<dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-cbor</artifactId>
- <version>2.13.4</version>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
</dependency>
-
<dependency>
<groupId>com.github.fge</groupId>
<artifactId>throwing-lambdas</artifactId>
@@ -86,7 +82,12 @@
</dependency>
<dependency>
<groupId>org.opensearch.client</groupId>
- <artifactId>opensearch-rest-high-level-client</artifactId>
+ <artifactId>opensearch-java</artifactId>
+ <version>2.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opensearch.client</groupId>
+ <artifactId>opensearch-rest-client</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java
index 52bf765741..89a104bb8b 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ClientProvider.java
@@ -46,7 +46,9 @@ import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.TrustStrategy;
import org.apache.james.util.concurrent.NamedThreadFactory;
import org.opensearch.client.RestClient;
-import org.opensearch.client.RestHighLevelClient;
+import org.opensearch.client.json.jackson.JacksonJsonpMapper;
+import org.opensearch.client.opensearch.OpenSearchAsyncClient;
+import org.opensearch.client.transport.rest_client.RestClientTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -181,7 +183,8 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class);
private final OpenSearchConfiguration configuration;
- private final RestHighLevelClient openSearchRestHighLevelClient;
+ private final RestClient lowLevelRestClient;
+ private final OpenSearchAsyncClient openSearchClient;
private final HttpAsyncClientConfigurer httpAsyncClientConfigurer;
private final ReactorOpenSearchClient client;
@@ -189,28 +192,35 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> {
public ClientProvider(OpenSearchConfiguration configuration) {
this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration);
this.configuration = configuration;
- this.openSearchRestHighLevelClient = connect(configuration);
- this.client = new ReactorOpenSearchClient(this.openSearchRestHighLevelClient);
+ this.lowLevelRestClient = buildRestClient();
+ this.openSearchClient = connect();
+ this.client = new ReactorOpenSearchClient(this.openSearchClient, lowLevelRestClient);
}
- private RestHighLevelClient connect(OpenSearchConfiguration configuration) {
+ private RestClient buildRestClient() {
+ return RestClient.builder(hostsToHttpHosts())
+ .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure)
+ .build();
+ }
+
+ private OpenSearchAsyncClient connect() {
Duration waitDelay = Duration.ofMillis(configuration.getMinDelay());
boolean suppressLeadingZeroElements = true;
boolean suppressTrailingZeroElements = true;
return Mono.fromCallable(this::connectToCluster)
.doOnError(e -> LOGGER.warn("Error establishing OpenSearch connection. Next retry scheduled in {}",
DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), suppressLeadingZeroElements, suppressTrailingZeroElements), e))
- .retryWhen(Retry.backoff(configuration.getMaxRetries(), waitDelay).scheduler(Schedulers.boundedElastic()))
+ .retryWhen(Retry.backoff(configuration.getMaxRetries(), waitDelay).scheduler(Schedulers.elastic()))
+ .publishOn(Schedulers.elastic())
.block();
}
- private RestHighLevelClient connectToCluster() {
+ private OpenSearchAsyncClient connectToCluster() {
LOGGER.info("Trying to connect to OpenSearch service at {}", LocalDateTime.now());
- return new RestHighLevelClient(
- RestClient
- .builder(hostsToHttpHosts())
- .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure));
+ RestClientTransport transport = new RestClientTransport(lowLevelRestClient, new JacksonJsonpMapper());
+
+ return new OpenSearchAsyncClient(transport);
}
private HttpHost[] hostsToHttpHosts() {
@@ -226,6 +236,6 @@ public class ClientProvider implements Provider<ReactorOpenSearchClient> {
@PreDestroy
public void close() throws IOException {
- openSearchRestHighLevelClient.close();
+ lowLevelRestClient.close();
}
}
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DeleteByQueryPerformer.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DeleteByQueryPerformer.java
index a0261badd8..7ea40f627e 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DeleteByQueryPerformer.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DeleteByQueryPerformer.java
@@ -19,9 +19,10 @@
package org.apache.james.backends.opensearch;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.index.query.QueryBuilder;
-import org.opensearch.index.reindex.DeleteByQueryRequest;
+import java.io.IOException;
+
+import org.opensearch.client.opensearch._types.query_dsl.Query;
+import org.opensearch.client.opensearch.core.DeleteByQueryRequest;
import reactor.core.publisher.Mono;
@@ -35,20 +36,32 @@ public class DeleteByQueryPerformer {
this.aliasName = aliasName;
}
- public Mono<Void> perform(QueryBuilder queryBuilder, RoutingKey routingKey) {
- DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(aliasName.getValue());
- deleteRequest.setQuery(queryBuilder);
- deleteRequest.setRouting(routingKey.asString());
+ public Mono<Void> perform(Query query, RoutingKey routingKey) {
+ DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest.Builder()
+ .index(aliasName.getValue())
+ .query(query)
+ .routing(routingKey.asString())
+ .build();
- return client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT)
- .then();
+ try {
+ return client.deleteByQuery(deleteRequest)
+ .then();
+ } catch (IOException e) {
+ return Mono.error(e);
+ }
}
- public Mono<Void> perform(QueryBuilder queryBuilder) {
- DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(aliasName.getValue());
- deleteRequest.setQuery(queryBuilder);
+ public Mono<Void> perform(Query query) {
+ DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest.Builder()
+ .index(aliasName.getValue())
+ .query(query)
+ .build();
- return client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT)
- .then();
+ try {
+ return client.deleteByQuery(deleteRequest)
+ .then();
+ } catch (IOException e) {
+ return Mono.error(e);
+ }
}
}
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DocumentId.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DocumentId.java
index 42f650b03b..be9a994539 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DocumentId.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/DocumentId.java
@@ -21,9 +21,8 @@ package org.apache.james.backends.opensearch;
import java.util.Objects;
-import org.opensearch.common.Strings;
-
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
public class DocumentId {
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/IndexCreationFactory.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/IndexCreationFactory.java
index 48756d104c..f314b373cc 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/IndexCreationFactory.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/IndexCreationFactory.java
@@ -19,58 +19,76 @@
package org.apache.james.backends.opensearch;
-import static org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
-import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
-
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
import java.util.Collection;
+import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import javax.inject.Inject;
-import org.opensearch.OpenSearchStatusException;
-import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
-import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.client.indices.CreateIndexRequest;
-import org.opensearch.client.indices.GetIndexRequest;
-import org.opensearch.common.Strings;
-import org.opensearch.common.xcontent.XContentBuilder;
-import org.opensearch.common.xcontent.XContentType;
+import org.opensearch.client.opensearch._types.OpenSearchException;
+import org.opensearch.client.opensearch._types.WaitForActiveShards;
+import org.opensearch.client.opensearch._types.analysis.Analyzer;
+import org.opensearch.client.opensearch._types.analysis.CustomAnalyzer;
+import org.opensearch.client.opensearch._types.analysis.CustomNormalizer;
+import org.opensearch.client.opensearch._types.analysis.Normalizer;
+import org.opensearch.client.opensearch._types.analysis.SnowballLanguage;
+import org.opensearch.client.opensearch._types.analysis.SnowballTokenFilter;
+import org.opensearch.client.opensearch._types.analysis.Tokenizer;
+import org.opensearch.client.opensearch._types.mapping.TypeMapping;
+import org.opensearch.client.opensearch.indices.CreateIndexRequest;
+import org.opensearch.client.opensearch.indices.ExistsAliasRequest;
+import org.opensearch.client.opensearch.indices.ExistsRequest;
+import org.opensearch.client.opensearch.indices.IndexSettings;
+import org.opensearch.client.opensearch.indices.IndexSettingsAnalysis;
+import org.opensearch.client.opensearch.indices.UpdateAliasesRequest;
+import org.opensearch.client.opensearch.indices.update_aliases.Action;
+import org.opensearch.client.opensearch.indices.update_aliases.AddAction;
+import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.fge.lambdas.Throwing;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
public class IndexCreationFactory {
- public static class IndexCreationCustomElement {
- public static IndexCreationCustomElement EMPTY = from("{}");
+ public static class IndexCreationCustomAnalyzer {
+ private final String key;
+ private final Analyzer analyzer;
- public static IndexCreationCustomElement from(String value) {
- try {
- new ObjectMapper().readTree(value);
- } catch (JsonProcessingException e) {
- throw new IllegalArgumentException("value must be a valid json");
- }
- return new IndexCreationCustomElement(value);
+ public IndexCreationCustomAnalyzer(String key, Analyzer analyzer) {
+ this.key = key;
+ this.analyzer = analyzer;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public Analyzer getAnalyzer() {
+ return analyzer;
}
+ }
- private final String payload;
+ public static class IndexCreationCustomTokenizer {
+ private final String key;
+ private final Tokenizer tokenizer;
- IndexCreationCustomElement(String payload) {
- this.payload = payload;
+ public IndexCreationCustomTokenizer(String key, Tokenizer tokenizer) {
+ this.key = key;
+ this.tokenizer = tokenizer;
}
- public String getPayload() {
- return payload;
+ public String getKey() {
+ return key;
+ }
+
+ public Tokenizer getTokenizer() {
+ return tokenizer;
}
}
@@ -103,8 +121,8 @@ public class IndexCreationFactory {
private final int waitForActiveShards;
private final IndexName indexName;
private final ImmutableList.Builder<AliasName> aliases;
- private Optional<IndexCreationCustomElement> customAnalyzers;
- private Optional<IndexCreationCustomElement> customTokenizers;
+ private final ImmutableList.Builder<IndexCreationCustomAnalyzer> customAnalyzers;
+ private final ImmutableList.Builder<IndexCreationCustomTokenizer> customTokenizers;
FinalStage(int nbShards, int nbReplica, int waitForActiveShards, IndexName indexName) {
this.nbShards = nbShards;
@@ -112,8 +130,8 @@ public class IndexCreationFactory {
this.waitForActiveShards = waitForActiveShards;
this.indexName = indexName;
this.aliases = ImmutableList.builder();
- this.customAnalyzers = Optional.empty();
- this.customTokenizers = Optional.empty();
+ this.customAnalyzers = ImmutableList.builder();
+ this.customTokenizers = ImmutableList.builder();
}
public FinalStage addAlias(AliasName... aliases) {
@@ -126,29 +144,29 @@ public class IndexCreationFactory {
return this;
}
- public FinalStage customAnalyzers(IndexCreationCustomElement customAnalyzers) {
- this.customAnalyzers = Optional.of(customAnalyzers);
+ public FinalStage customAnalyzers(IndexCreationCustomAnalyzer... customAnalyzers) {
+ this.customAnalyzers.add(customAnalyzers);
return this;
}
- public FinalStage customTokenizers(IndexCreationCustomElement customTokenizers) {
- this.customTokenizers = Optional.of(customTokenizers);
+ public FinalStage customTokenizers(IndexCreationCustomTokenizer... customTokenizers) {
+ this.customTokenizers.add(customTokenizers);
return this;
}
public IndexCreationPerformer build() {
- return new IndexCreationPerformer(nbShards, nbReplica, waitForActiveShards, indexName, aliases.build(), customAnalyzers, customTokenizers);
+ return new IndexCreationPerformer(nbShards, nbReplica, waitForActiveShards, indexName, aliases.build(), customAnalyzers.build(), customTokenizers.build());
}
public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client) {
return build().createIndexAndAliases(client, Optional.empty(), Optional.empty());
}
- public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, XContentBuilder mappingContent) {
+ public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, TypeMapping mappingContent) {
return build().createIndexAndAliases(client, Optional.empty(), Optional.of(mappingContent));
}
- public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, Optional<XContentBuilder> indexSettings, Optional<XContentBuilder> mappingContent) {
+ public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, Optional<IndexSettings> indexSettings, Optional<TypeMapping> mappingContent) {
return build().createIndexAndAliases(client, indexSettings, mappingContent);
}
}
@@ -164,11 +182,12 @@ public class IndexCreationFactory {
private final int waitForActiveShards;
private final IndexName indexName;
private final ImmutableList<AliasName> aliases;
- private final Optional<IndexCreationCustomElement> customAnalyzers;
- private final Optional<IndexCreationCustomElement> customTokenizers;
+ private final ImmutableList<IndexCreationCustomAnalyzer> customAnalyzers;
+ private final ImmutableList<IndexCreationCustomTokenizer> customTokenizers;
- private IndexCreationPerformer(int nbShards, int nbReplica, int waitForActiveShards, IndexName indexName, ImmutableList<AliasName> aliases,
- Optional<IndexCreationCustomElement> customAnalyzers, Optional<IndexCreationCustomElement> customTokenizers) {
+ private IndexCreationPerformer(int nbShards, int nbReplica, int waitForActiveShards, IndexName indexName,
+ ImmutableList<AliasName> aliases, ImmutableList<IndexCreationCustomAnalyzer> customAnalyzers,
+ ImmutableList<IndexCreationCustomTokenizer> customTokenizers) {
this.nbShards = nbShards;
this.nbReplica = nbReplica;
this.waitForActiveShards = waitForActiveShards;
@@ -178,8 +197,9 @@ public class IndexCreationFactory {
this.customTokenizers = customTokenizers;
}
- public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client, Optional<XContentBuilder> indexSettings,
- Optional<XContentBuilder> mappingContent) {
+ public ReactorOpenSearchClient createIndexAndAliases(ReactorOpenSearchClient client,
+ Optional<IndexSettings> indexSettings,
+ Optional<TypeMapping> mappingContent) {
Preconditions.checkNotNull(indexName);
try {
createIndexIfNeeded(client, indexName, indexSettings.orElse(generateSetting()), mappingContent);
@@ -193,31 +213,41 @@ public class IndexCreationFactory {
private void createAliasIfNeeded(ReactorOpenSearchClient client, IndexName indexName, AliasName aliasName) throws IOException {
if (!aliasExist(client, aliasName)) {
- client.indices()
- .updateAliases(
- new IndicesAliasesRequest().addAliasAction(
- new AliasActions(AliasActions.Type.ADD)
+ client.updateAliases(
+ new UpdateAliasesRequest.Builder()
+ .actions(new Action.Builder()
+ .add(new AddAction.Builder()
.index(indexName.getValue())
- .alias(aliasName.getValue())),
- RequestOptions.DEFAULT);
+ .alias(aliasName.getValue())
+ .build())
+ .build())
+ .build())
+ .block();
}
}
private boolean aliasExist(ReactorOpenSearchClient client, AliasName aliasName) throws IOException {
- return client.indices()
- .existsAlias(new GetAliasesRequest().aliases(aliasName.getValue()), RequestOptions.DEFAULT);
+ return client.aliasExists(new ExistsAliasRequest.Builder()
+ .name(aliasName.getValue())
+ .build())
+ .map(BooleanResponse::value)
+ .block();
}
- private void createIndexIfNeeded(ReactorOpenSearchClient client, IndexName indexName, XContentBuilder settings, Optional<XContentBuilder> mappingContent) throws IOException {
+ private void createIndexIfNeeded(ReactorOpenSearchClient client, IndexName indexName, IndexSettings settings, Optional<TypeMapping> mappingContent) throws IOException {
try {
if (!indexExists(client, indexName)) {
- CreateIndexRequest request = new CreateIndexRequest(indexName.getValue()).source(settings);
- mappingContent.ifPresent(request::mapping);
- client.indices().create(
- request,
- RequestOptions.DEFAULT);
+ CreateIndexRequest.Builder request = new CreateIndexRequest.Builder()
+ .index(indexName.getValue())
+ .waitForActiveShards(new WaitForActiveShards.Builder()
+ .count(waitForActiveShards)
+ .build())
+ .settings(settings);
+ mappingContent.ifPresent(request::mappings);
+ client.createIndex(request.build())
+ .block();
}
- } catch (OpenSearchStatusException exception) {
+ } catch (OpenSearchException exception) {
if (exception.getMessage().contains(INDEX_ALREADY_EXISTS_EXCEPTION_MESSAGE)) {
LOGGER.info("Index [{}] already exists", indexName.getValue());
} else {
@@ -227,60 +257,66 @@ public class IndexCreationFactory {
}
private boolean indexExists(ReactorOpenSearchClient client, IndexName indexName) throws IOException {
- return client.indices().exists(new GetIndexRequest(indexName.getValue()), RequestOptions.DEFAULT);
+ return client.indexExists(new ExistsRequest.Builder()
+ .index(indexName.getValue())
+ .build())
+ .map(BooleanResponse::value)
+ .block();
+ }
+
+ private IndexSettings generateSetting() {
+ return new IndexSettings.Builder()
+ .numberOfShards(Integer.toString(nbShards))
+ .numberOfReplicas(Integer.toString(nbReplica))
+ .analysis(new IndexSettingsAnalysis.Builder()
+ .normalizer(CASE_INSENSITIVE, new Normalizer.Builder()
+ .custom(generateNormalizer())
+ .build())
+ .analyzer(generateAnalyzers())
+ .tokenizer(generateTokenizers())
+ .build())
+ .build();
}
- private XContentBuilder generateSetting() throws IOException {
- return jsonBuilder()
- .startObject()
- .startObject("settings")
- .field("number_of_shards", nbShards)
- .field("number_of_replicas", nbReplica)
- .field("index.write.wait_for_active_shards", waitForActiveShards)
- .startObject("analysis")
- .startObject("normalizer")
- .startObject(CASE_INSENSITIVE)
- .field("type", "custom")
- .startArray("char_filter")
- .endArray()
- .startArray("filter")
- .value("lowercase")
- .value("asciifolding")
- .endArray()
- .endObject()
- .endObject()
- .rawField(ANALYZER, generateAnalyzers(), XContentType.JSON)
- .rawField(TOKENIZER, generateTokenizer(), XContentType.JSON)
- .endObject()
- .endObject()
- .endObject();
+ private CustomNormalizer generateNormalizer() {
+ return new CustomNormalizer.Builder()
+ .filter("lowercase", "asciifolding")
+ .build();
}
- private String analyzerDefault() throws IOException {
- XContentBuilder analyzerBuilder = jsonBuilder()
- .startObject()
- .startObject(KEEP_MAIL_AND_URL)
- .field("tokenizer", "uax_url_email")
- .startArray("filter")
- .value("lowercase")
- .value("stop")
- .endArray()
- .endObject()
- .endObject();
-
- return Strings.toString(analyzerBuilder);
+ private SnowballTokenFilter generateFilter() {
+ return new SnowballTokenFilter.Builder()
+ .language(SnowballLanguage.English)
+ .build();
}
- private InputStream generateAnalyzers() {
- return new ByteArrayInputStream(customAnalyzers.orElseGet(Throwing.supplier(() -> IndexCreationCustomElement.from(analyzerDefault())).sneakyThrow())
- .getPayload()
- .getBytes(StandardCharsets.UTF_8));
+ private Map<String, Analyzer> defaultAnalyzers() {
+ return ImmutableMap.of(
+ KEEP_MAIL_AND_URL, new Analyzer.Builder().custom(
+ new CustomAnalyzer.Builder()
+ .tokenizer("uax_url_email")
+ .filter("lowercase", "stop")
+ .build())
+ .build()
+ );
+ }
+
+ private Map<String, Analyzer> generateAnalyzers() {
+ if (customAnalyzers.isEmpty()) {
+ return defaultAnalyzers();
+ }
+ return customAnalyzers.stream()
+ .collect(Collectors.toMap(
+ IndexCreationCustomAnalyzer::getKey,
+ IndexCreationCustomAnalyzer::getAnalyzer));
}
- private InputStream generateTokenizer() {
- return new ByteArrayInputStream(customTokenizers.orElse(IndexCreationCustomElement.EMPTY)
- .getPayload()
- .getBytes(StandardCharsets.UTF_8));
+ private Map<String, Tokenizer> generateTokenizers() {
+ return customTokenizers.stream()
+ .collect(Collectors.toMap(
+ IndexCreationCustomTokenizer::getKey,
+ IndexCreationCustomTokenizer::getTokenizer
+ ));
}
}
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchHealthCheck.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchHealthCheck.java
index de26140daf..49874fc8f9 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchHealthCheck.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchHealthCheck.java
@@ -19,7 +19,10 @@
package org.apache.james.backends.opensearch;
+import java.io.IOException;
+import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.inject.Inject;
@@ -27,9 +30,8 @@ import org.apache.commons.lang3.NotImplementedException;
import org.apache.james.core.healthcheck.ComponentName;
import org.apache.james.core.healthcheck.HealthCheck;
import org.apache.james.core.healthcheck.Result;
-import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.opensearch.client.Requests;
+import org.opensearch.client.opensearch.cluster.HealthRequest;
+import org.opensearch.client.opensearch.cluster.HealthResponse;
import com.google.common.annotations.VisibleForTesting;
@@ -54,24 +56,30 @@ public class OpenSearchHealthCheck implements HealthCheck {
@Override
public Mono<Result> check() {
- String[] indices = indexNames.stream()
+ List<String> indices = indexNames.stream()
.map(IndexName::getValue)
- .toArray(String[]::new);
- ClusterHealthRequest request = Requests.clusterHealthRequest(indices);
+ .collect(Collectors.toList());
+ HealthRequest request = new HealthRequest.Builder()
+ .index(indices)
+ .build();
- return client.health(request)
- .map(this::toHealthCheckResult)
- .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, "Error while contacting cluster", e)));
+ try {
+ return client.health(request)
+ .map(this::toHealthCheckResult)
+ .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, "Error while contacting cluster", e)));
+ } catch (IOException e) {
+ return Mono.error(e);
+ }
}
@VisibleForTesting
- Result toHealthCheckResult(ClusterHealthResponse response) {
- switch (response.getStatus()) {
- case GREEN:
- case YELLOW:
+ Result toHealthCheckResult(HealthResponse response) {
+ switch (response.status()) {
+ case Green:
+ case Yellow:
return Result.healthy(COMPONENT_NAME);
- case RED:
- return Result.unhealthy(COMPONENT_NAME, response.getClusterName() + " status is RED");
+ case Red:
+ return Result.unhealthy(COMPONENT_NAME, response.clusterName() + " status is RED");
default:
throw new NotImplementedException("Un-handled OpenSearch cluster status");
}
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchIndexer.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchIndexer.java
index 9a0002cf91..19c46e0dac 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchIndexer.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/OpenSearchIndexer.java
@@ -18,24 +18,23 @@
****************************************************************/
package org.apache.james.backends.opensearch;
+import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
-import org.opensearch.action.bulk.BulkRequest;
-import org.opensearch.action.bulk.BulkResponse;
-import org.opensearch.action.delete.DeleteRequest;
-import org.opensearch.action.get.GetRequest;
-import org.opensearch.action.get.GetResponse;
-import org.opensearch.action.index.IndexRequest;
-import org.opensearch.action.index.IndexResponse;
-import org.opensearch.action.update.UpdateRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.common.ValidationException;
-import org.opensearch.common.xcontent.XContentType;
-import org.opensearch.index.query.QueryBuilder;
+import org.opensearch.client.opensearch._types.query_dsl.Query;
+import org.opensearch.client.opensearch.core.BulkRequest;
+import org.opensearch.client.opensearch.core.BulkResponse;
+import org.opensearch.client.opensearch.core.GetRequest;
+import org.opensearch.client.opensearch.core.GetResponse;
+import org.opensearch.client.opensearch.core.IndexRequest;
+import org.opensearch.client.opensearch.core.IndexResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.util.RawValue;
import com.google.common.base.Preconditions;
import reactor.core.publisher.Mono;
@@ -59,11 +58,17 @@ public class OpenSearchIndexer {
public Mono<IndexResponse> index(DocumentId id, String content, RoutingKey routingKey) {
checkArgument(content);
logContent(id, content);
- return client.index(new IndexRequest(aliasName.getValue())
+
+ try {
+ return client.index(new IndexRequest.Builder<>()
+ .index(aliasName.getValue())
.id(id.asString())
- .source(content, XContentType.JSON)
- .routing(routingKey.asString()),
- RequestOptions.DEFAULT);
+ .document(new RawValue(content))
+ .routing(routingKey.asString())
+ .build());
+ } catch (IOException e) {
+ return Mono.error(e);
+ }
}
private void logContent(DocumentId id, String content) {
@@ -75,50 +80,70 @@ public class OpenSearchIndexer {
public Mono<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts, RoutingKey routingKey) {
Preconditions.checkNotNull(updatedDocumentParts);
Preconditions.checkNotNull(routingKey);
- BulkRequest request = new BulkRequest();
- updatedDocumentParts.forEach(updatedDocumentPart -> request.add(
- new UpdateRequest(aliasName.getValue(),
- updatedDocumentPart.getId().asString())
- .doc(updatedDocumentPart.getUpdatedDocumentPart(), XContentType.JSON)
- .routing(routingKey.asString())));
-
- return client.bulk(request, RequestOptions.DEFAULT)
- .onErrorResume(ValidationException.class, exception -> {
- LOGGER.warn("Error while updating index", exception);
- return Mono.empty();
- });
+
+ if (updatedDocumentParts.isEmpty()) {
+ return Mono.empty();
+ }
+
+ BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
+ updatedDocumentParts.forEach(updatedDocumentPart -> bulkBuilder.operations(
+ op -> op.update(idx -> idx
+ .index(aliasName.getValue())
+ .id(updatedDocumentPart.getId().asString())
+ .document(Collections.singletonMap("doc", new RawValue(updatedDocumentPart.getUpdatedDocumentPart())))
+ .routing(routingKey.asString())
+ )));
+
+ try {
+ return client.bulk(bulkBuilder.build());
+ } catch (IOException e) {
+ return Mono.error(e);
+ }
}
public Mono<BulkResponse> delete(List<DocumentId> ids, RoutingKey routingKey) {
- BulkRequest request = new BulkRequest();
- ids.forEach(id -> request.add(
- new DeleteRequest(aliasName.getValue())
+ if (ids.isEmpty()) {
+ return Mono.empty();
+ }
+
+ BulkRequest.Builder bulkBuilder = new BulkRequest.Builder();
+
+ ids.forEach(id -> bulkBuilder.operations(
+ op -> op.delete(idx -> idx
+ .index(aliasName.getValue())
.id(id.asString())
- .routing(routingKey.asString())));
+ .routing(routingKey.asString())
+ )));
- return client.bulk(request, RequestOptions.DEFAULT)
- .onErrorResume(ValidationException.class, exception -> {
- LOGGER.warn("Error while deleting index", exception);
- return Mono.empty();
- });
+ try {
+ return client.bulk(bulkBuilder.build());
+ } catch (IOException e) {
+ return Mono.error(e);
+ }
}
- public Mono<Void> deleteAllMatchingQuery(QueryBuilder queryBuilder, RoutingKey routingKey) {
- return deleteByQueryPerformer.perform(queryBuilder, routingKey);
+ public Mono<Void> deleteAllMatchingQuery(Query query, RoutingKey routingKey) {
+ return deleteByQueryPerformer.perform(query, routingKey);
}
private void checkArgument(String content) {
Preconditions.checkArgument(content != null, "content should be provided");
}
- public Mono<GetResponse> get(DocumentId id, RoutingKey routingKey) {
- return Mono.fromRunnable(() -> {
- Preconditions.checkNotNull(id);
- Preconditions.checkNotNull(routingKey);
- })
- .then(client.get(new GetRequest(aliasName.getValue())
- .id(id.asString())
- .routing(routingKey.asString()),
- RequestOptions.DEFAULT));
+ public Mono<GetResponse<ObjectNode>> get(DocumentId id, RoutingKey routingKey) {
+ try {
+ return Mono.fromRunnable(() -> {
+ Preconditions.checkNotNull(id);
+ Preconditions.checkNotNull(routingKey);
+ })
+ .then(client.get(
+ new GetRequest.Builder()
+ .index(aliasName.getValue())
+ .id(id.asString())
+ .routing(routingKey.asString())
+ .build()));
+ } catch (IOException e) {
+ return Mono.error(e);
+ }
}
}
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java
index 1c739fcf37..d2818e2ad3 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/ReactorOpenSearchClient.java
@@ -20,160 +20,129 @@
package org.apache.james.backends.opensearch;
import java.io.IOException;
-import java.util.function.Consumer;
-
-import org.opensearch.action.ActionListener;
-import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.opensearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
-import org.opensearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
-import org.opensearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
-import org.opensearch.action.bulk.BulkRequest;
-import org.opensearch.action.bulk.BulkResponse;
-import org.opensearch.action.delete.DeleteRequest;
-import org.opensearch.action.delete.DeleteResponse;
-import org.opensearch.action.explain.ExplainRequest;
-import org.opensearch.action.explain.ExplainResponse;
-import org.opensearch.action.fieldcaps.FieldCapabilitiesRequest;
-import org.opensearch.action.fieldcaps.FieldCapabilitiesResponse;
-import org.opensearch.action.get.GetRequest;
-import org.opensearch.action.get.GetResponse;
-import org.opensearch.action.index.IndexRequest;
-import org.opensearch.action.index.IndexResponse;
-import org.opensearch.action.search.ClearScrollRequest;
-import org.opensearch.action.search.ClearScrollResponse;
-import org.opensearch.action.search.MultiSearchRequest;
-import org.opensearch.action.search.MultiSearchResponse;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.action.search.SearchResponse;
-import org.opensearch.action.search.SearchScrollRequest;
-import org.opensearch.action.support.master.AcknowledgedResponse;
-import org.opensearch.client.IndicesClient;
-import org.opensearch.client.RequestOptions;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
import org.opensearch.client.RestClient;
-import org.opensearch.client.RestHighLevelClient;
-import org.opensearch.client.core.MainResponse;
-import org.opensearch.index.rankeval.RankEvalRequest;
-import org.opensearch.index.rankeval.RankEvalResponse;
-import org.opensearch.index.reindex.BulkByScrollResponse;
-import org.opensearch.index.reindex.DeleteByQueryRequest;
-import org.opensearch.script.mustache.MultiSearchTemplateRequest;
-import org.opensearch.script.mustache.MultiSearchTemplateResponse;
-import org.opensearch.script.mustache.SearchTemplateRequest;
-import org.opensearch.script.mustache.SearchTemplateResponse;
+import org.opensearch.client.opensearch.OpenSearchAsyncClient;
+import org.opensearch.client.opensearch.cluster.HealthRequest;
+import org.opensearch.client.opensearch.cluster.HealthResponse;
+import org.opensearch.client.opensearch.core.BulkRequest;
+import org.opensearch.client.opensearch.core.BulkResponse;
+import org.opensearch.client.opensearch.core.ClearScrollRequest;
+import org.opensearch.client.opensearch.core.ClearScrollResponse;
+import org.opensearch.client.opensearch.core.DeleteByQueryRequest;
+import org.opensearch.client.opensearch.core.DeleteByQueryResponse;
+import org.opensearch.client.opensearch.core.DeleteRequest;
+import org.opensearch.client.opensearch.core.DeleteResponse;
+import org.opensearch.client.opensearch.core.GetRequest;
+import org.opensearch.client.opensearch.core.GetResponse;
+import org.opensearch.client.opensearch.core.IndexRequest;
+import org.opensearch.client.opensearch.core.IndexResponse;
+import org.opensearch.client.opensearch.core.InfoResponse;
+import org.opensearch.client.opensearch.core.ScrollRequest;
+import org.opensearch.client.opensearch.core.ScrollResponse;
+import org.opensearch.client.opensearch.core.SearchRequest;
+import org.opensearch.client.opensearch.core.SearchResponse;
+import org.opensearch.client.opensearch.indices.CreateIndexRequest;
+import org.opensearch.client.opensearch.indices.CreateIndexResponse;
+import org.opensearch.client.opensearch.indices.ExistsAliasRequest;
+import org.opensearch.client.opensearch.indices.ExistsRequest;
+import org.opensearch.client.opensearch.indices.UpdateAliasesRequest;
+import org.opensearch.client.opensearch.indices.UpdateAliasesResponse;
+import org.opensearch.client.transport.endpoints.BooleanResponse;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers;
public class ReactorOpenSearchClient implements AutoCloseable {
- private final RestHighLevelClient client;
+ private final OpenSearchAsyncClient client;
+ private final RestClient lowLevelRestClient;
- public ReactorOpenSearchClient(RestHighLevelClient client) {
+ public ReactorOpenSearchClient(OpenSearchAsyncClient client, RestClient lowLevelRestClient) {
this.client = client;
+ this.lowLevelRestClient = lowLevelRestClient;
}
- public Mono<BulkResponse> bulk(BulkRequest bulkRequest, RequestOptions options) {
- return toReactor(listener -> client.bulkAsync(bulkRequest, options, listener));
- }
-
- public Mono<ClearScrollResponse> clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) {
- return toReactor(listener -> client.clearScrollAsync(clearScrollRequest, options, listener));
- }
-
- public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException {
- return client.delete(deleteRequest, options);
- }
-
- public Mono<BulkByScrollResponse> deleteByQuery(DeleteByQueryRequest deleteRequest, RequestOptions options) {
- return toReactor(listener -> client.deleteByQueryAsync(deleteRequest, options, listener));
+ public Mono<BulkResponse> bulk(BulkRequest bulkRequest) throws IOException {
+ return toReactor(client.bulk(bulkRequest));
}
- public Mono<AcknowledgedResponse> deleteScript(DeleteStoredScriptRequest request, RequestOptions options) {
- return toReactor(listener -> client.deleteScriptAsync(request, options, listener));
+ public Mono<ClearScrollResponse> clearScroll(ClearScrollRequest clearScrollRequest) throws IOException {
+ return toReactor(client.clearScroll(clearScrollRequest));
}
- public Mono<ExplainResponse> explain(ExplainRequest explainRequest, RequestOptions options) {
- return toReactor(listener -> client.explainAsync(explainRequest, options, listener));
+ public Mono<DeleteResponse> delete(DeleteRequest deleteRequest) throws IOException {
+ return toReactor(client.delete(deleteRequest));
}
- public Mono<FieldCapabilitiesResponse> fieldCaps(FieldCapabilitiesRequest fieldCapabilitiesRequest, RequestOptions options) {
- return toReactor(listener -> client.fieldCapsAsync(fieldCapabilitiesRequest, options, listener));
+ public Mono<DeleteByQueryResponse> deleteByQuery(DeleteByQueryRequest deleteRequest) throws IOException {
+ return toReactor(client.deleteByQuery(deleteRequest));
}
public RestClient getLowLevelClient() {
- return client.getLowLevelClient();
+ return lowLevelRestClient;
}
- public Mono<GetStoredScriptResponse> getScript(GetStoredScriptRequest request, RequestOptions options) {
- return toReactor(listener -> client.getScriptAsync(request, options, listener));
+ public <T> Mono<IndexResponse> index(IndexRequest<T> indexRequest) throws IOException {
+ return toReactor(client.index(indexRequest));
}
- public Mono<IndexResponse> index(IndexRequest indexRequest, RequestOptions options) {
- return toReactor(listener -> client.indexAsync(indexRequest, options, listener));
+ public Mono<BooleanResponse> indexExists(ExistsRequest existsRequest) throws IOException {
+ return toReactor(client.indices().exists(existsRequest));
}
- public IndicesClient indices() {
- return client.indices();
+ public Mono<BooleanResponse> aliasExists(ExistsAliasRequest existsAliasRequest) throws IOException {
+ return toReactor(client.indices().existsAlias(existsAliasRequest));
}
- public MainResponse info(RequestOptions options) throws IOException {
- return client.info(options);
+ public Mono<CreateIndexResponse> createIndex(CreateIndexRequest indexRequest) throws IOException {
+ return toReactor(client.indices().create(indexRequest));
}
- public Mono<MultiSearchResponse> msearch(MultiSearchRequest multiSearchRequest, RequestOptions options) {
- return toReactor(listener -> client.msearchAsync(multiSearchRequest, options, listener));
+ public Mono<UpdateAliasesResponse> updateAliases(UpdateAliasesRequest updateAliasesRequest) throws IOException {
+ return toReactor(client.indices().updateAliases(updateAliasesRequest));
}
- public Mono<MultiSearchTemplateResponse> msearchTemplate(MultiSearchTemplateRequest multiSearchTemplateRequest, RequestOptions options) {
- return toReactor(listener -> client.msearchTemplateAsync(multiSearchTemplateRequest, options, listener));
+ public Mono<InfoResponse> info() throws IOException {
+ return toReactor(client.info());
}
- public Mono<RankEvalResponse> rankEval(RankEvalRequest rankEvalRequest, RequestOptions options) {
- return toReactor(listener -> client.rankEvalAsync(rankEvalRequest, options, listener));
+ public Mono<ScrollResponse<ObjectNode>> scroll(ScrollRequest scrollRequest) throws IOException {
+ return toReactor(client.scroll(scrollRequest, ObjectNode.class));
}
- public Mono<SearchResponse> scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) {
- return toReactor(listener -> client.scrollAsync(searchScrollRequest, options, listener));
+ public Mono<SearchResponse<ObjectNode>> search(SearchRequest searchRequest) throws IOException {
+ return toReactor(client.search(searchRequest, ObjectNode.class));
}
- public Mono<SearchResponse> search(SearchRequest searchRequest, RequestOptions options) {
- return toReactor(listener -> client.searchAsync(searchRequest, options, listener));
+ public Mono<HealthResponse> health(HealthRequest request) throws IOException {
+ return toReactor(client.cluster().health(request));
}
- public Mono<ClusterHealthResponse> health(ClusterHealthRequest request) {
- return toReactor(listener -> client.cluster()
- .healthAsync(request, RequestOptions.DEFAULT, listener));
- }
-
- public Mono<SearchTemplateResponse> searchTemplate(SearchTemplateRequest searchTemplateRequest, RequestOptions options) {
- return toReactor(listener -> client.searchTemplateAsync(searchTemplateRequest, options, listener));
- }
-
- public Mono<GetResponse> get(GetRequest getRequest, RequestOptions options) {
- return toReactor(listener -> client.getAsync(getRequest, options, listener));
+ public Mono<GetResponse<ObjectNode>> get(GetRequest getRequest) throws IOException {
+ return toReactor(client.get(getRequest, ObjectNode.class));
}
@Override
public void close() throws IOException {
- client.close();
+ lowLevelRestClient.close();
}
- private static <T> Mono<T> toReactor(Consumer<ActionListener<T>> async) {
- return Mono.<T>create(sink -> async.accept(getListener(sink)))
- .publishOn(Schedulers.boundedElastic());
+ private static <T> Mono<T> toReactor(CompletableFuture<T> async) {
+ return Mono.<T>create(sink -> async.whenComplete(getFuture(sink)))
+ .publishOn(Schedulers.elastic());
}
- private static <T> ActionListener<T> getListener(MonoSink<T> sink) {
- return new ActionListener<T>() {
- @Override
- public void onResponse(T t) {
- sink.success(t);
- }
-
- @Override
- public void onFailure(Exception e) {
- sink.error(e);
+ private static <T> BiConsumer<? super T, ? super Throwable> getFuture(MonoSink<T> sink) {
+ return (response, exception) -> {
+ if (exception != null) {
+ sink.error(exception);
+ } else {
+ sink.success(response);
}
};
}
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/RoutingKey.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/RoutingKey.java
index bb823f8e9f..4dedbc67fe 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/RoutingKey.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/RoutingKey.java
@@ -21,9 +21,8 @@ package org.apache.james.backends.opensearch;
import java.util.Objects;
-import org.opensearch.common.Strings;
-
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
public class RoutingKey {
public interface Factory<T> {
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/UpdatedRepresentation.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/UpdatedRepresentation.java
index f221ccd45c..76c415b9e1 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/UpdatedRepresentation.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/UpdatedRepresentation.java
@@ -20,10 +20,9 @@ package org.apache.james.backends.opensearch;
import java.util.Objects;
-import org.opensearch.common.Strings;
-
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
public class UpdatedRepresentation {
private final DocumentId id;
diff --git a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/search/ScrolledSearch.java b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/search/ScrolledSearch.java
index ecb9cf955a..3a5d22d1ce 100644
--- a/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/search/ScrolledSearch.java
+++ b/backends-common/opensearch/src/main/java/org/apache/james/backends/opensearch/search/ScrolledSearch.java
@@ -19,19 +19,20 @@
package org.apache.james.backends.opensearch.search;
+import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.james.backends.opensearch.ReactorOpenSearchClient;
-import org.opensearch.action.search.ClearScrollRequest;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.action.search.SearchResponse;
-import org.opensearch.action.search.SearchScrollRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.common.unit.TimeValue;
-import org.opensearch.search.SearchHit;
-
+import org.opensearch.client.opensearch._types.Time;
+import org.opensearch.client.opensearch.core.ClearScrollRequest;
+import org.opensearch.client.opensearch.core.ScrollRequest;
+import org.opensearch.client.opensearch.core.ScrollResponse;
+import org.opensearch.client.opensearch.core.SearchRequest;
+import org.opensearch.client.opensearch.core.search.Hit;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.fge.lambdas.Throwing;
import reactor.core.publisher.Flux;
@@ -39,8 +40,9 @@ import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
public class ScrolledSearch {
-
- private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
+ private static final Time TIMEOUT = new Time.Builder()
+ .time("1m")
+ .build();
private final ReactorOpenSearchClient client;
private final SearchRequest searchRequest;
@@ -50,12 +52,12 @@ public class ScrolledSearch {
this.searchRequest = searchRequest;
}
- public Flux<SearchHit> searchHits() {
+ public Flux<Hit<ObjectNode>> searchHits() {
return searchResponses()
- .concatMap(searchResponse -> Flux.just(searchResponse.getHits().getHits()));
+ .concatMap(searchResponse -> Flux.fromIterable(searchResponse.hits().hits()));
}
- public Flux<SearchResponse> searchResponses() {
+ private Flux<ScrollResponse<ObjectNode>> searchResponses() {
return Flux.push(sink -> {
AtomicReference<Optional<String>> scrollId = new AtomicReference<>(Optional.empty());
sink.onRequest(numberOfRequestedElements -> next(sink, scrollId, numberOfRequestedElements));
@@ -64,17 +66,16 @@ public class ScrolledSearch {
});
}
- private void next(FluxSink<SearchResponse> sink, AtomicReference<Optional<String>> scrollId, long numberOfRequestedElements) {
+ private void next(FluxSink<ScrollResponse<ObjectNode>> sink, AtomicReference<Optional<String>> scrollId, long numberOfRequestedElements) {
if (numberOfRequestedElements <= 0) {
return;
}
- Consumer<SearchResponse> onResponse = searchResponse -> {
- scrollId.set(Optional.of(searchResponse.getScrollId()));
+ Consumer<ScrollResponse<ObjectNode>> onResponse = searchResponse -> {
+ scrollId.set(Optional.of(searchResponse.scrollId()));
sink.next(searchResponse);
- boolean noHitsLeft = searchResponse.getHits().getHits().length == 0;
- if (noHitsLeft) {
+ if (searchResponse.hits().hits().isEmpty()) {
sink.complete();
} else {
next(sink, scrollId, numberOfRequestedElements - 1);
@@ -87,22 +88,33 @@ public class ScrolledSearch {
.subscribe(onResponse, onFailure);
}
- private Mono<SearchResponse> buildRequest(Optional<String> scrollId) {
- return scrollId.map(id ->
- client.scroll(
- new SearchScrollRequest()
- .scrollId(scrollId.get())
- .scroll(TIMEOUT),
- RequestOptions.DEFAULT))
- .orElseGet(() -> client.search(searchRequest, RequestOptions.DEFAULT));
+ private Mono<ScrollResponse<ObjectNode>> buildRequest(Optional<String> scrollId) {
+ return scrollId.map(Throwing.function(id -> client.scroll(new ScrollRequest.Builder()
+ .scrollId(scrollId.get())
+ .scroll(TIMEOUT)
+ .build())).sneakyThrow())
+ .orElseGet(() -> {
+ try {
+ return client.search(searchRequest)
+ .map(response -> new ScrollResponse.Builder<ObjectNode>()
+ .scrollId(response.scrollId())
+ .hits(response.hits())
+ .took(response.took())
+ .timedOut(response.timedOut())
+ .shards(response.shards())
+ .build());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
public void close(AtomicReference<Optional<String>> scrollId) {
- scrollId.get().map(id -> {
- ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
- clearScrollRequest.addScrollId(id);
- return clearScrollRequest;
- }).ifPresent(Throwing.<ClearScrollRequest>consumer(clearScrollRequest -> client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT).subscribe()).sneakyThrow());
+ scrollId.get().map(id -> new ClearScrollRequest.Builder()
+ .scrollId(id)
+ .build())
+ .ifPresent(Throwing.<ClearScrollRequest>consumer(clearScrollRequest ->
+ client.clearScroll(clearScrollRequest).subscribe()).sneakyThrow());
}
}
diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/ClientProviderImplConnectionContract.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/ClientProviderImplConnectionContract.java
index 9434324172..9f89156f42 100644
--- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/ClientProviderImplConnectionContract.java
+++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/ClientProviderImplConnectionContract.java
@@ -24,10 +24,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.james.backends.opensearch.OpenSearchClusterExtension.OpenSearchCluster;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.index.query.QueryBuilders;
-import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery;
+import org.opensearch.client.opensearch.core.SearchRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,9 +74,9 @@ interface ClientProviderImplConnectionContract {
default boolean isConnected(ClientProvider clientProvider) {
try (ReactorOpenSearchClient client = clientProvider.get()) {
client.search(
- new SearchRequest()
- .source(new SearchSourceBuilder().query(QueryBuilders.existsQuery("any"))),
- RequestOptions.DEFAULT).block();
+ new SearchRequest.Builder()
+ .query(new MatchAllQuery.Builder().build()._toQuery())
+ .build()).block();
return true;
} catch (Exception e) {
LOGGER.info("Caught exception while trying to connect", e);
diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/DockerOpenSearchExtension.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/DockerOpenSearchExtension.java
index 34a105b5e6..021a60d790 100644
--- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/DockerOpenSearchExtension.java
+++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/DockerOpenSearchExtension.java
@@ -19,18 +19,14 @@
package org.apache.james.backends.opensearch;
-import java.time.Duration;
-
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.index.query.QueryBuilders;
-import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery;
+import org.opensearch.client.opensearch.core.SearchRequest;
import org.testcontainers.shaded.org.awaitility.Awaitility;
public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachCallback, ParameterResolver {
@@ -57,20 +53,20 @@ public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachC
}
@Override
- public void clean(DockerOpenSearch elasticSearch) {
+ public void clean(DockerOpenSearch openSearch) {
Awaitility.await()
.until(() -> {
- elasticSearch.flushIndices();
- ReactorOpenSearchClient client = elasticSearch.clientProvider().get();
+ openSearch.flushIndices();
+ ReactorOpenSearchClient client = openSearch.clientProvider().get();
new DeleteByQueryPerformer(client, aliasName)
- .perform(QueryBuilders.matchAllQuery())
+ .perform(new MatchAllQuery.Builder().build()._toQuery())
.block();
- SearchRequest searchRequest = new SearchRequest();
- searchRequest.source(new SearchSourceBuilder()
- .query(QueryBuilders.matchAllQuery()));
- elasticSearch.flushIndices();
- boolean result = client.search(new SearchRequest(searchRequest), RequestOptions.DEFAULT)
- .map(searchResponse -> searchResponse.getHits().getHits().length)
+ SearchRequest searchRequest = new SearchRequest.Builder()
+ .query(new MatchAllQuery.Builder().build()._toQuery())
+ .build();
+ openSearch.flushIndices();
+ boolean result = client.search(searchRequest)
+ .map(searchResponse -> searchResponse.hits().hits().size())
.block() == 0;
try {
@@ -83,7 +79,7 @@ public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachC
}
}
- private final DockerOpenSearch elasticSearch = DockerOpenSearchSingleton.INSTANCE;
+ private final DockerOpenSearch openSearch = DockerOpenSearchSingleton.INSTANCE;
private final CleanupStrategy cleanupStrategy;
public DockerOpenSearchExtension() {
@@ -96,13 +92,13 @@ public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachC
@Override
public void afterEach(ExtensionContext context) {
- cleanupStrategy.clean(elasticSearch);
+ cleanupStrategy.clean(openSearch);
}
@Override
public void beforeEach(ExtensionContext extensionContext) {
- if (!elasticSearch.isRunning()) {
- elasticSearch.unpause();
+ if (!openSearch.isRunning()) {
+ openSearch.unpause();
}
awaitForOpenSearch();
}
@@ -114,14 +110,14 @@ public class DockerOpenSearchExtension implements AfterEachCallback, BeforeEachC
@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
- return elasticSearch;
+ return openSearch;
}
public void awaitForOpenSearch() {
- elasticSearch.flushIndices();
+ openSearch.flushIndices();
}
public DockerOpenSearch getDockerOpenSearch() {
- return elasticSearch;
+ return openSearch;
}
}
diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/IndexCreationFactoryTest.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/IndexCreationFactoryTest.java
index 8cda09676f..2b169aaaa5 100644
--- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/IndexCreationFactoryTest.java
+++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/IndexCreationFactoryTest.java
@@ -19,78 +19,77 @@
package org.apache.james.backends.opensearch;
-import static org.apache.james.backends.opensearch.IndexCreationFactory.ANALYZER;
-import static org.apache.james.backends.opensearch.IndexCreationFactory.TOKENIZER;
-import static org.apache.james.backends.opensearch.IndexCreationFactory.TYPE;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.IOException;
import java.util.Optional;
-import org.apache.james.backends.opensearch.IndexCreationFactory.IndexCreationCustomElement;
+import org.apache.james.backends.opensearch.IndexCreationFactory.IndexCreationCustomAnalyzer;
+import org.apache.james.backends.opensearch.IndexCreationFactory.IndexCreationCustomTokenizer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.opensearch.OpenSearchStatusException;
-import org.opensearch.common.xcontent.XContentBuilder;
+import org.opensearch.client.opensearch._types.analysis.Analyzer;
+import org.opensearch.client.opensearch._types.analysis.CustomAnalyzer;
+import org.opensearch.client.opensearch._types.analysis.NGramTokenFilter;
+import org.opensearch.client.opensearch._types.analysis.PatternTokenizer;
+import org.opensearch.client.opensearch._types.analysis.TokenFilter;
+import org.opensearch.client.opensearch._types.analysis.TokenFilterDefinition;
+import org.opensearch.client.opensearch._types.analysis.Tokenizer;
+import org.opensearch.client.opensearch._types.analysis.TokenizerDefinition;
+import org.opensearch.client.opensearch.indices.IndexSettings;
+import org.opensearch.client.opensearch.indices.IndexSettingsAnalysis;
class IndexCreationFactoryTest {
private static final IndexName INDEX_NAME = new IndexName("index");
private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
- public static XContentBuilder getValidIndexSetting() throws IOException {
- return jsonBuilder()
- .startObject()
- .startObject("settings")
- .startObject("index")
- .field("max_ngram_diff", 10)
- .endObject()
- .startObject("analysis")
- .startObject(ANALYZER)
- .startObject("email_ngram_filter_analyzer")
- .field(TOKENIZER, "uax_url_email")
- .startArray("filter")
- .value("ngram_filter")
- .endArray()
- .endObject()
- .endObject()
- .startObject("filter")
- .startObject("ngram_filter")
- .field(TYPE, "ngram")
- .field("min_gram", 3)
- .field("max_gram", 13)
- .endObject()
- .endObject()
- .endObject()
- .endObject()
- .endObject();
+ public static IndexSettings getValidIndexSetting() {
+ return new IndexSettings.Builder()
+ .index(new IndexSettings.Builder()
+ .maxNgramDiff(10)
+ .build())
+ .analysis(new IndexSettingsAnalysis.Builder()
+ .analyzer("email_ngram_filter_analyzer", new Analyzer.Builder()
+ .custom(generateAnalyzer())
+ .build())
+ .filter("ngram_filter", new TokenFilter.Builder()
+ .definition(new TokenFilterDefinition.Builder()
+ .ngram(generateFilter())
+ .build())
+ .build())
+ .build())
+ .build();
}
- public static XContentBuilder getInvalidIndexSetting() throws IOException {
- return jsonBuilder()
- .startObject()
- .startObject("settings")
- .startObject("analysis")
- .startObject(ANALYZER)
- .startObject("email_ngram_filter_analyzer")
- .field(TOKENIZER, "uax_url_email")
- .startArray("filter")
- .value("ngram_filter")
- .endArray()
- .endObject()
- .endObject()
- .startObject("filter")
- .startObject("ngram_filter")
- .field(TYPE, "ngram")
- .field("min_gram", 3)
- .field("max_gram", 13)
- .endObject()
- .endObject()
- .endObject()
- .endObject()
- .endObject();
+ public static IndexSettings getInvalidIndexSetting() {
+ return new IndexSettings.Builder()
+ .analysis(new IndexSettingsAnalysis.Builder()
+ .analyzer("email_ngram_filter_analyzer", new Analyzer.Builder()
+ .custom(generateAnalyzer())
+ .build())
+ .filter("ngram_filter", new TokenFilter.Builder()
+ .definition(new TokenFilterDefinition.Builder()
+ .ngram(generateFilter())
+ .build())
+ .build())
+ .build())
+ .build();
+ }
+
+ private static CustomAnalyzer generateAnalyzer() {
+ return new CustomAnalyzer.Builder()
+ .tokenizer("uax_url_email")
+ .filter("ngram_filter")
+ .build();
+ }
+
+ private static NGramTokenFilter generateFilter() {
+ return new NGramTokenFilter.Builder()
+ .minGram(3)
+ .maxGram(13)
+ .build();
}
@RegisterExtension
@@ -133,19 +132,14 @@ class IndexCreationFactoryTest {
new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION)
.useIndex(INDEX_NAME)
.addAlias(ALIAS_NAME)
- .customAnalyzers(IndexCreationCustomElement.from("{" +
- " \"my_custom_analyzer\": {" +
- " \"type\": \"custom\"," +
- " \"tokenizer\": \"standard\"," +
- " \"char_filter\": [" +
- " \"html_strip\"" +
- " ]," +
- " \"filter\": [" +
- " \"lowercase\"," +
- " \"asciifolding\"" +
- " ]" +
- " }" +
- "}"))
+ .customAnalyzers(new IndexCreationCustomAnalyzer("my_custom_analyzer",
+ new Analyzer.Builder()
+ .custom(new CustomAnalyzer.Builder()
+ .tokenizer("standard")
+ .filter("lowercase", "asciifolding")
+ .charFilter("html_strip")
+ .build())
+ .build()))
.createIndexAndAliases(client);
}
@@ -155,14 +149,14 @@ class IndexCreationFactoryTest {
new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION)
.useIndex(INDEX_NAME)
.addAlias(ALIAS_NAME)
- .customAnalyzers(IndexCreationCustomElement.from("{" +
- " \"my_custom_analyzer\": {" +
- " \"type\": \"invalid\"," +
- " \"tokenizer\": \"not_Found_tokenizer\"" +
- " }" +
- "}"))
+ .customAnalyzers(new IndexCreationCustomAnalyzer("my_custom_analyzer",
+ new Analyzer.Builder()
+ .custom(new CustomAnalyzer.Builder()
+ .tokenizer("not_Found_tokenizer")
+ .build())
+ .build()))
.createIndexAndAliases(client))
- .isInstanceOf(OpenSearchStatusException.class);
+ .isInstanceOf(Exception.class);
}
@Test
@@ -170,12 +164,16 @@ class IndexCreationFactoryTest {
new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION)
.useIndex(INDEX_NAME)
.addAlias(ALIAS_NAME)
- .customTokenizers(IndexCreationCustomElement.from("{" +
- " \"custom_tokenizer\": { " +
- " \"type\": \"pattern\"," +
- " \"pattern\": \"[ .,!?]\"" +
- " }" +
- " }"))
+ .customTokenizers(new IndexCreationCustomTokenizer("custom_tokenizer",
+ new Tokenizer.Builder()
+ .definition(new TokenizerDefinition.Builder()
+ .pattern(new PatternTokenizer.Builder()
+ .pattern("[ .,!?]")
+ .flags("CASE_INSENSITIVE|COMMENTS")
+ .group(0)
+ .build())
+ .build())
+ .build()))
.createIndexAndAliases(client);
}
@@ -185,14 +183,16 @@ class IndexCreationFactoryTest {
new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION)
.useIndex(INDEX_NAME)
.addAlias(ALIAS_NAME)
- .customTokenizers(IndexCreationCustomElement.from("{" +
- " \"custom_tokenizer\": { " +
- " \"type\": \"invalidType\"," +
- " \"pattern\": \"[ .,!?]\"" +
- " }" +
- " }"))
+ .customTokenizers(new IndexCreationCustomTokenizer("custom_tokenizer",
+ new Tokenizer.Builder()
+ .definition(new TokenizerDefinition.Builder()
+ .pattern(new PatternTokenizer.Builder()
+ .pattern("[ .,!?]")
+ .build())
+ .build())
+ .build()))
.createIndexAndAliases(client))
- .isInstanceOf(OpenSearchStatusException.class);
+ .isInstanceOf(Exception.class);
}
@Test
@@ -200,25 +200,28 @@ class IndexCreationFactoryTest {
new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION)
.useIndex(INDEX_NAME)
.addAlias(ALIAS_NAME)
- .customAnalyzers(IndexCreationCustomElement.from("{" +
- " \"my_custom_analyzer\": { " +
- " \"tokenizer\": \"custom_tokenizer\"," +
- " \"filter\": [" +
- " \"lowercase\"" +
- " ]" +
- " }" +
- " }"))
- .customTokenizers(IndexCreationCustomElement.from("{" +
- " \"custom_tokenizer\": { " +
- " \"type\": \"pattern\"," +
- " \"pattern\": \"[ .,!?]\"" +
- " }" +
- " }"))
+ .customAnalyzers(new IndexCreationCustomAnalyzer("my_custom_analyzer",
+ new Analyzer.Builder()
+ .custom(new CustomAnalyzer.Builder()
+ .tokenizer("custom_tokenizer")
+ .filter("lowercase")
+ .build())
+ .build()))
+ .customTokenizers(new IndexCreationCustomTokenizer("custom_tokenizer",
+ new Tokenizer.Builder()
+ .definition(new TokenizerDefinition.Builder()
+ .pattern(new PatternTokenizer.Builder()
+ .pattern("[ .,!?]")
+ .flags("CASE_INSENSITIVE|COMMENTS")
+ .group(0)
+ .build())
+ .build())
+ .build()))
.createIndexAndAliases(client);
}
@Test
- void customIndexSettingShouldNotThrowWhenValidSetting() throws IOException {
+ void customIndexSettingShouldNotThrowWhenValidSetting() {
new IndexCreationFactory(OpenSearchConfiguration.DEFAULT_CONFIGURATION)
.useIndex(INDEX_NAME)
.addAlias(ALIAS_NAME)
@@ -232,7 +235,7 @@ class IndexCreationFactoryTest {
.useIndex(INDEX_NAME)
.addAlias(ALIAS_NAME)
.createIndexAndAliases(client, Optional.of(getInvalidIndexSetting()), Optional.empty()))
- .isInstanceOf(OpenSearchStatusException.class);
+ .isInstanceOf(Exception.class);
}
}
\ No newline at end of file
diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchHealthCheckTest.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchHealthCheckTest.java
index 5bacd05967..1e40074c7f 100644
--- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchHealthCheckTest.java
+++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchHealthCheckTest.java
@@ -22,17 +22,32 @@ import static org.assertj.core.api.Assertions.assertThat;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.opensearch.cluster.ClusterName;
-import org.opensearch.cluster.ClusterState;
-import org.opensearch.cluster.block.ClusterBlocks;
-import org.opensearch.cluster.health.ClusterHealthStatus;
-import org.opensearch.cluster.node.DiscoveryNodes;
-import org.opensearch.cluster.routing.RoutingTable;
+import org.opensearch.client.opensearch._types.HealthStatus;
+import org.opensearch.client.opensearch.cluster.HealthResponse;
import com.google.common.collect.ImmutableSet;
class OpenSearchHealthCheckTest {
+ private static HealthResponse fakeHealthResponse(HealthStatus status) {
+ return new HealthResponse.Builder()
+ .clusterName("fake-cluster")
+ .activePrimaryShards(0)
+ .activeShards(0)
+ .activeShardsPercentAsNumber("0")
+ .delayedUnassignedShards(0)
+ .initializingShards(0)
+ .numberOfDataNodes(0)
+ .numberOfInFlightFetch(0)
+ .numberOfNodes(0)
+ .numberOfPendingTasks(0)
+ .relocatingShards(0)
+ .taskMaxWaitingInQueueMillis(String.valueOf(System.currentTimeMillis()))
+ .timedOut(false)
+ .unassignedShards(0)
+ .status(status)
+ .build();
+ }
+
private OpenSearchHealthCheck healthCheck;
@BeforeEach
@@ -42,39 +57,22 @@ class OpenSearchHealthCheckTest {
@Test
void checkShouldReturnHealthyWhenOpenSearchClusterHealthStatusIsGreen() {
- FakeClusterHealthResponse response = new FakeClusterHealthResponse(ClusterHealthStatus.GREEN);
+ HealthResponse response = fakeHealthResponse(HealthStatus.Green);
assertThat(healthCheck.toHealthCheckResult(response).isHealthy()).isTrue();
}
@Test
void checkShouldReturnUnHealthyWhenOpenSearchClusterHealthStatusIsRed() {
- FakeClusterHealthResponse response = new FakeClusterHealthResponse(ClusterHealthStatus.RED);
+ HealthResponse response = fakeHealthResponse(HealthStatus.Red);
assertThat(healthCheck.toHealthCheckResult(response).isUnHealthy()).isTrue();
}
@Test
void checkShouldReturnHealthyWhenOpenSearchClusterHealthStatusIsYellow() {
- FakeClusterHealthResponse response = new FakeClusterHealthResponse(ClusterHealthStatus.YELLOW);
+ HealthResponse response = fakeHealthResponse(HealthStatus.Yellow);
assertThat(healthCheck.toHealthCheckResult(response).isHealthy()).isTrue();
}
-
- private static class FakeClusterHealthResponse extends ClusterHealthResponse {
- private final ClusterHealthStatus status;
-
- private FakeClusterHealthResponse(ClusterHealthStatus clusterHealthStatus) {
- super("fake-cluster", new String[0],
- new ClusterState(new ClusterName("fake-cluster"), 0, null, null, RoutingTable.builder().build(),
- DiscoveryNodes.builder().build(),
- ClusterBlocks.builder().build(), null, 0, false));
- this.status = clusterHealthStatus;
- }
-
- @Override
- public ClusterHealthStatus getStatus() {
- return this.status;
- }
- }
}
diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchIndexerTest.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchIndexerTest.java
index 2c8ba75c6e..9af1bd3806 100644
--- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchIndexerTest.java
+++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/OpenSearchIndexerTest.java
@@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
-import static org.opensearch.index.query.QueryBuilders.termQuery;
import java.io.IOException;
@@ -34,12 +33,13 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.opensearch.action.get.GetResponse;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.index.query.QueryBuilder;
-import org.opensearch.index.query.QueryBuilders;
-import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.client.opensearch._types.FieldValue;
+import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery;
+import org.opensearch.client.opensearch._types.query_dsl.MatchQuery;
+import org.opensearch.client.opensearch._types.query_dsl.Query;
+import org.opensearch.client.opensearch._types.query_dsl.TermQuery;
+import org.opensearch.client.opensearch.core.GetResponse;
+import org.opensearch.client.opensearch.core.SearchRequest;
import com.google.common.collect.ImmutableList;
@@ -79,32 +79,36 @@ class OpenSearchIndexerTest {
}
@Test
- void indexMessageShouldWork() {
+ void indexMessageShouldWork() throws IOException {
DocumentId documentId = DocumentId.fromString("1");
String content = "{\"message\": \"trying out Elasticsearch\"}";
testee.index(documentId, content, useDocumentId(documentId)).block();
- awaitForOpenSearch(QueryBuilders.matchQuery("message", "trying"), 1L);
+ awaitForOpenSearch(new MatchQuery.Builder()
+ .field("message")
+ .query(new FieldValue.Builder().stringValue("trying").build())
+ .build()
+ ._toQuery(), 1L);
}
@Test
void indexMessageShouldThrowWhenJsonIsNull() {
- assertThatThrownBy(() -> testee.index(DOCUMENT_ID, null, ROUTING))
+ assertThatThrownBy(() -> testee.index(DOCUMENT_ID, null, ROUTING).block())
.isInstanceOf(IllegalArgumentException.class);
}
@Test
- void updateMessages() {
+ void updateMessages() throws IOException {
String content = "{\"message\": \"trying out Elasticsearch\",\"field\":\"Should be unchanged\"}";
testee.index(DOCUMENT_ID, content, useDocumentId(DOCUMENT_ID)).block();
- awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L);
+ awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L);
testee.update(ImmutableList.of(new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), useDocumentId(DOCUMENT_ID)).block();
- awaitForOpenSearch(QueryBuilders.matchQuery("message", "mastering"), 1L);
+ awaitForOpenSearch(new MatchQuery.Builder().field("message").query(new FieldValue.Builder().stringValue("mastering").build()).build()._toQuery(), 1L);
- awaitForOpenSearch(QueryBuilders.matchQuery("field", "unchanged"), 1L);
+ awaitForOpenSearch(new MatchQuery.Builder().field("field").query(new FieldValue.Builder().stringValue("unchanged").build()).build()._toQuery(), 1L);
}
@Test
@@ -136,21 +140,21 @@ class OpenSearchIndexerTest {
}
@Test
- void deleteByQueryShouldWorkOnSingleMessage() {
+ void deleteByQueryShouldWorkOnSingleMessage() throws IOException {
DocumentId documentId = DocumentId.fromString("1:2");
String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}";
RoutingKey routingKey = useDocumentId(documentId);
testee.index(documentId, content, routingKey).block();
- awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L);
+ awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L);
- testee.deleteAllMatchingQuery(termQuery("property", "1"), routingKey).block();
+ testee.deleteAllMatchingQuery(new TermQuery.Builder().field("property").value(new FieldValue.Builder().stringValue("1").build()).build()._toQuery(), routingKey).block();
- awaitForOpenSearch(QueryBuilders.matchAllQuery(), 0L);
+ awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 0L);
}
@Test
- void deleteByQueryShouldWorkWhenMultipleMessages() {
+ void deleteByQueryShouldWorkWhenMultipleMessages() throws IOException {
DocumentId documentId = DocumentId.fromString("1:1");
String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}";
@@ -165,28 +169,28 @@ class OpenSearchIndexerTest {
String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"property\":\"2\"}";
testee.index(documentId3, content3, ROUTING).block();
- awaitForOpenSearch(QueryBuilders.matchAllQuery(), 3L);
+ awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 3L);
- testee.deleteAllMatchingQuery(termQuery("property", "1"), ROUTING).block();
+ testee.deleteAllMatchingQuery(new TermQuery.Builder().field("property").value(new FieldValue.Builder().stringValue("1").build()).build()._toQuery(), ROUTING).block();
- awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L);
+ awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L);
}
@Test
- void deleteMessage() {
+ void deleteMessage() throws IOException {
DocumentId documentId = DocumentId.fromString("1:2");
String content = "{\"message\": \"trying out Elasticsearch\"}";
testee.index(documentId, content, useDocumentId(documentId)).block();
- awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L);
+ awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L);
testee.delete(ImmutableList.of(documentId), useDocumentId(documentId)).block();
- awaitForOpenSearch(QueryBuilders.matchAllQuery(), 0L);
+ awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 0L);
}
@Test
- void deleteShouldWorkWhenMultipleMessages() {
+ void deleteShouldWorkWhenMultipleMessages() throws IOException {
DocumentId documentId = DocumentId.fromString("1:1");
String content = "{\"message\": \"trying out Elasticsearch\", \"mailboxId\":\"1\"}";
testee.index(documentId, content, ROUTING).block();
@@ -199,11 +203,11 @@ class OpenSearchIndexerTest {
String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"mailboxId\":\"2\"}";
testee.index(documentId3, content3, ROUTING).block();
- awaitForOpenSearch(QueryBuilders.matchAllQuery(), 3L);
+ awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 3L);
testee.delete(ImmutableList.of(documentId, documentId3), ROUTING).block();
- awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L);
+ awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L);
}
@Test
@@ -219,16 +223,16 @@ class OpenSearchIndexerTest {
}
@Test
- void getShouldWork() {
+ void getShouldWork() throws IOException {
DocumentId documentId = DocumentId.fromString("1");
String content = "{\"message\":\"trying out Elasticsearch\"}";
testee.index(documentId, content, useDocumentId(documentId)).block();
- awaitForOpenSearch(QueryBuilders.matchAllQuery(), 1L);
+ awaitForOpenSearch(new MatchAllQuery.Builder().build()._toQuery(), 1L);
GetResponse getResponse = testee.get(documentId, useDocumentId(documentId)).block();
- assertThat(getResponse.getSourceAsString()).isEqualTo(content);
+ assertThat(getResponse.source().toString()).isEqualTo(content);
}
@Test
@@ -243,13 +247,13 @@ class OpenSearchIndexerTest {
.isInstanceOf(NullPointerException.class);
}
- private void awaitForOpenSearch(QueryBuilder query, long totalHits) {
+ private void awaitForOpenSearch(Query query, long totalHits) {
CALMLY_AWAIT.atMost(Durations.TEN_SECONDS)
.untilAsserted(() -> assertThat(client.search(
- new SearchRequest(INDEX_NAME.getValue())
- .source(new SearchSourceBuilder().query(query)),
- RequestOptions.DEFAULT)
+ new SearchRequest.Builder()
+ .query(query)
+ .build())
.block()
- .getHits().getTotalHits().value).isEqualTo(totalHits));
+ .hits().total().value()).isEqualTo(totalHits));
}
}
diff --git a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/search/ScrolledSearchTest.java b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/search/ScrolledSearchTest.java
index dcd10da200..9e3e32455c 100644
--- a/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/search/ScrolledSearchTest.java
+++ b/backends-common/opensearch/src/test/java/org/apache/james/backends/opensearch/search/ScrolledSearchTest.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import java.io.IOException;
+import java.util.List;
import org.apache.james.backends.opensearch.DockerOpenSearchExtension;
import org.apache.james.backends.opensearch.IndexCreationFactory;
@@ -36,18 +37,20 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import org.opensearch.action.index.IndexRequest;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.common.unit.TimeValue;
-import org.opensearch.index.query.QueryBuilders;
-import org.opensearch.search.SearchHit;
-import org.opensearch.search.builder.SearchSourceBuilder;
+import org.opensearch.client.opensearch._types.Time;
+import org.opensearch.client.opensearch._types.query_dsl.MatchAllQuery;
+import org.opensearch.client.opensearch.core.IndexRequest;
+import org.opensearch.client.opensearch.core.SearchRequest;
+import org.opensearch.client.opensearch.core.search.Hit;
+
+import com.fasterxml.jackson.databind.util.RawValue;
class ScrolledSearchTest {
- private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
+ private static final Time TIMEOUT = new Time.Builder()
+ .time("1m")
+ .build();
private static final int SIZE = 2;
- private static final String MESSAGE = "message";
+ private static final String CONTENT = "{\"message\": \"Sample message\"}";
private static final IndexName INDEX_NAME = new IndexName("index");
private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias");
@@ -74,119 +77,129 @@ class ScrolledSearchTest {
@Test
void scrollIterableShouldWorkWhenEmpty() {
- SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ SearchRequest searchRequest = new SearchRequest.Builder()
+ .index(INDEX_NAME.getValue())
.scroll(TIMEOUT)
- .source(new SearchSourceBuilder()
- .query(QueryBuilders.matchAllQuery())
- .size(SIZE));
+ .query(new MatchAllQuery.Builder().build()._toQuery())
+ .size(SIZE)
+ .build();
assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
.isEmpty();
}
@Test
- void scrollIterableShouldWorkWhenOneElement() throws Exception {
+ void scrollIterableShouldWorkWhenOneElement() throws IOException {
String id = "1";
- client.index(new IndexRequest(INDEX_NAME.getValue())
+ client.index(new IndexRequest.Builder<>()
+ .index(INDEX_NAME.getValue())
.id(id)
- .source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT)
- .block();
+ .document(new RawValue(CONTENT))
+ .build())
+ .block();
openSearch.awaitForOpenSearch();
WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id));
- SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ SearchRequest searchRequest = new SearchRequest.Builder()
+ .index(INDEX_NAME.getValue())
.scroll(TIMEOUT)
- .source(new SearchSourceBuilder()
- .query(QueryBuilders.matchAllQuery())
- .size(SIZE));
+ .query(new MatchAllQuery.Builder().build()._toQuery())
+ .size(SIZE)
+ .build();
assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
- .extracting(SearchHit::getId)
+ .extracting(Hit::id)
.containsOnly(id);
}
@Test
- void scrollIterableShouldWorkWhenSizeElement() {
+ void scrollIterableShouldWorkWhenSizeElement() throws IOException {
String id1 = "1";
- client.index(new IndexRequest(INDEX_NAME.getValue())
+ client.index(new IndexRequest.Builder<>()
+ .index(INDEX_NAME.getValue())
.id(id1)
- .source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT)
+ .document(new RawValue(CONTENT))
+ .build())
.block();
String id2 = "2";
- client.index(new IndexRequest(INDEX_NAME.getValue())
+ client.index(new IndexRequest.Builder<>()
+ .index(INDEX_NAME.getValue())
.id(id2)
- .source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT)
+ .document(new RawValue(CONTENT))
+ .build())
.block();
openSearch.awaitForOpenSearch();
WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2));
- SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ SearchRequest searchRequest = new SearchRequest.Builder()
+ .index(INDEX_NAME.getValue())
.scroll(TIMEOUT)
- .source(new SearchSourceBuilder()
- .query(QueryBuilders.matchAllQuery())
- .size(SIZE));
+ .query(new MatchAllQuery.Builder().build()._toQuery())
+ .size(SIZE)
+ .build();
assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
- .extracting(SearchHit::getId)
+ .extracting(Hit::id)
.containsOnly(id1, id2);
}
@Test
- void scrollIterableShouldWorkWhenMoreThanSizeElement() {
+ void scrollIterableShouldWorkWhenMoreThanSizeElement() throws IOException {
String id1 = "1";
- client.index(new IndexRequest(INDEX_NAME.getValue())
+ client.index(new IndexRequest.Builder<>()
+ .index(INDEX_NAME.getValue())
.id(id1)
- .source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT)
+ .document(new RawValue(CONTENT))
+ .build())
.block();
String id2 = "2";
- client.index(new IndexRequest(INDEX_NAME.getValue())
+ client.index(new IndexRequest.Builder<>()
+ .index(INDEX_NAME.getValue())
.id(id2)
- .source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT)
+ .document(new RawValue(CONTENT))
+ .build())
.block();
String id3 = "3";
- client.index(new IndexRequest(INDEX_NAME.getValue())
+ client.index(new IndexRequest.Builder<>()
+ .index(INDEX_NAME.getValue())
.id(id3)
- .source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT)
+ .document(new RawValue(CONTENT))
+ .build())
.block();
openSearch.awaitForOpenSearch();
WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3));
- SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
+ SearchRequest searchRequest = new SearchRequest.Builder()
+ .index(INDEX_NAME.getValue())
.scroll(TIMEOUT)
- .source(new SearchSourceBuilder()
- .query(QueryBuilders.matchAllQuery())
- .size(SIZE));
+ .query(new MatchAllQuery.Builder().build()._toQuery())
+ .size(SIZE)
+ .build();
assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
- .extracting(SearchHit::getId)
+ .extracting(Hit::id)
.containsOnly(id1, id2, id3);
}
- private void hasIdsInIndex(ReactorOpenSearchClient client, String... ids) {
- SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
- .scroll(TIMEOUT)
- .source(new SearchSourceBuilder()
- .query(QueryBuilders.matchAllQuery()));
+ private void hasIdsInIndex(ReactorOpenSearchClient client, String... ids) throws IOException {
+ SearchRequest searchRequest = new SearchRequest.Builder()
+ .index(INDEX_NAME.getValue())
+ .query(new MatchAllQuery.Builder().build()._toQuery())
+ .build();
- SearchHit[] hits = client.search(searchRequest, RequestOptions.DEFAULT)
- .block()
- .getHits()
- .getHits();
+ List<String> hitIds = client.search(searchRequest)
+ .flatMapIterable(response -> response.hits().hits())
+ .map(Hit::id)
+ .collectList()
+ .block();
- assertThat(hits)
- .extracting(SearchHit::getId)
+ assertThat(hitIds)
.contains(ids);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org