You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/05/25 09:50:12 UTC
[james-project] 02/02: WIP JAMES-3771 upgrade to elasticsearch 8.2.1
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch upgrade-es-8.2
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e6ceddfcab35c75aafe167156c24bbc359c46e8f
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Wed May 25 16:49:32 2022 +0700
WIP JAMES-3771 upgrade to elasticsearch 8.2.1
---
backends-common/elasticsearch-v7/pom.xml | 20 ++-
.../james/backends/es/v7/ClientProvider.java | 34 +++--
.../james/backends/es/v7/ElasticSearchIndexer.java | 18 +--
.../backends/es/v7/ReactorElasticSearchClient.java | 148 +++++++--------------
4 files changed, 89 insertions(+), 131 deletions(-)
diff --git a/backends-common/elasticsearch-v7/pom.xml b/backends-common/elasticsearch-v7/pom.xml
index abeeee5789..0d4b107d6d 100644
--- a/backends-common/elasticsearch-v7/pom.xml
+++ b/backends-common/elasticsearch-v7/pom.xml
@@ -47,6 +47,15 @@
<artifactId>testing-base</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>co.elastic.clients</groupId>
+ <artifactId>elasticsearch-java</artifactId>
+ <version>8.2.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
<!-- Prevents https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-28491 -->
<dependency>
@@ -84,17 +93,6 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
- <dependency>
- <groupId>org.elasticsearch.client</groupId>
- <artifactId>elasticsearch-rest-high-level-client</artifactId>
- <version>7.10.2</version>
- <exclusions>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
diff --git a/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ClientProvider.java b/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ClientProvider.java
index a8b16ffc22..1d9fc96cf2 100644
--- a/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ClientProvider.java
+++ b/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ClientProvider.java
@@ -50,10 +50,13 @@ import org.apache.james.backends.es.v7.ElasticSearchConfiguration.SSLConfigurati
import org.apache.james.backends.es.v7.ElasticSearchConfiguration.SSLConfiguration.SSLValidationStrategy;
import org.apache.james.util.concurrent.NamedThreadFactory;
import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
@@ -185,7 +188,8 @@ public class ClientProvider implements Provider<ReactorElasticSearchClient> {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class);
private final ElasticSearchConfiguration configuration;
- private final RestHighLevelClient elasticSearchRestHighLevelClient;
+ private final RestClient lowLevelRestClient;
+ private final ElasticsearchAsyncClient elasticSearchClient;
private final HttpAsyncClientConfigurer httpAsyncClientConfigurer;
private final ReactorElasticSearchClient client;
@@ -193,15 +197,22 @@ public class ClientProvider implements Provider<ReactorElasticSearchClient> {
public ClientProvider(ElasticSearchConfiguration configuration) {
this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration);
this.configuration = configuration;
- this.elasticSearchRestHighLevelClient = connect(configuration);
- this.client = new ReactorElasticSearchClient(this.elasticSearchRestHighLevelClient);
+ this.lowLevelRestClient = buildRestClient();
+ this.elasticSearchClient = connect();
+ this.client = new ReactorElasticSearchClient(this.elasticSearchClient);
}
- private RestHighLevelClient connect(ElasticSearchConfiguration configuration) {
+ private RestClient buildRestClient() {
+ return RestClient.builder(hostsToHttpHosts())
+ .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure)
+ .build();
+ }
+
+ private ElasticsearchAsyncClient connect() {
Duration waitDelay = Duration.ofMillis(configuration.getMinDelay());
boolean suppressLeadingZeroElements = true;
boolean suppressTrailingZeroElements = true;
- return Mono.fromCallable(() -> connectToCluster(configuration))
+ return Mono.fromCallable(() -> connectToCluster())
.doOnError(e -> LOGGER.warn("Error establishing ElasticSearch connection. Next retry scheduled in {}",
DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), suppressLeadingZeroElements, suppressTrailingZeroElements), e))
.retryWhen(Retry.backoff(configuration.getMaxRetries(), waitDelay).scheduler(Schedulers.elastic()))
@@ -209,13 +220,12 @@ public class ClientProvider implements Provider<ReactorElasticSearchClient> {
.block();
}
- private RestHighLevelClient connectToCluster(ElasticSearchConfiguration configuration) {
+ private ElasticsearchAsyncClient connectToCluster() {
LOGGER.info("Trying to connect to ElasticSearch service at {}", LocalDateTime.now());
- return new RestHighLevelClient(
- RestClient
- .builder(hostsToHttpHosts())
- .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure));
+ ElasticsearchTransport transport = new RestClientTransport(lowLevelRestClient, new JacksonJsonpMapper());
+
+ return new ElasticsearchAsyncClient(transport);
}
private HttpHost[] hostsToHttpHosts() {
@@ -231,6 +241,6 @@ public class ClientProvider implements Provider<ReactorElasticSearchClient> {
@PreDestroy
public void close() throws IOException {
- elasticSearchRestHighLevelClient.close();
+ lowLevelRestClient.close();
}
}
diff --git a/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ElasticSearchIndexer.java b/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ElasticSearchIndexer.java
index 552f555f33..fa4904e492 100644
--- a/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ElasticSearchIndexer.java
+++ b/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ElasticSearchIndexer.java
@@ -19,6 +19,7 @@
package org.apache.james.backends.es.v7;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkRequest;
@@ -26,8 +27,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.ValidationException;
@@ -38,6 +37,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.core.IndexResponse;
import reactor.core.publisher.Mono;
public class ElasticSearchIndexer {
@@ -56,14 +57,15 @@ public class ElasticSearchIndexer {
this.aliasName = aliasName;
}
- public Mono<IndexResponse> index(DocumentId id, String content, RoutingKey routingKey) {
+ public Mono<CompletableFuture<IndexResponse>> index(DocumentId id, String content, RoutingKey routingKey) {
checkArgument(content);
logContent(id, content);
- return client.index(new IndexRequest(aliasName.getValue())
- .id(id.asString())
- .source(content, XContentType.JSON)
- .routing(routingKey.asString()),
- RequestOptions.DEFAULT);
+ return client.index(new IndexRequest.Builder<>()
+ .index(aliasName.getValue())
+ .id(id.asString())
+ .withJson(content) // InputStream? Or we could pass directly the object actually with .document() method
+ .routing(routingKey.asString())
+ .build());
}
private void logContent(DocumentId id, String content) {
diff --git a/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ReactorElasticSearchClient.java b/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ReactorElasticSearchClient.java
index f24b371a72..e9f5711404 100644
--- a/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ReactorElasticSearchClient.java
+++ b/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ReactorElasticSearchClient.java
@@ -20,56 +20,23 @@
package org.apache.james.backends.es.v7;
import java.io.IOException;
-import java.util.function.Consumer;
-
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
-import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
-import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.delete.DeleteResponse;
-import org.elasticsearch.action.explain.ExplainRequest;
-import org.elasticsearch.action.explain.ExplainResponse;
-import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
-import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.ClearScrollRequest;
-import org.elasticsearch.action.search.ClearScrollResponse;
-import org.elasticsearch.action.search.MultiSearchRequest;
-import org.elasticsearch.action.search.MultiSearchResponse;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchScrollRequest;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.IndicesClient;
+import java.util.concurrent.CompletableFuture;
+
import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.core.MainResponse;
-import org.elasticsearch.index.rankeval.RankEvalRequest;
-import org.elasticsearch.index.rankeval.RankEvalResponse;
-import org.elasticsearch.index.reindex.BulkByScrollResponse;
-import org.elasticsearch.index.reindex.DeleteByQueryRequest;
-import org.elasticsearch.script.mustache.MultiSearchTemplateRequest;
-import org.elasticsearch.script.mustache.MultiSearchTemplateResponse;
-import org.elasticsearch.script.mustache.SearchTemplateRequest;
-import org.elasticsearch.script.mustache.SearchTemplateResponse;
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
+import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.ClearScrollRequest;
+import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest;
+import co.elastic.clients.elasticsearch.core.DeleteRequest;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.core.IndexResponse;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.MonoSink;
-import reactor.core.scheduler.Schedulers;
public class ReactorElasticSearchClient implements AutoCloseable {
- private final RestHighLevelClient client;
+ private final ElasticsearchAsyncClient client;
- public ReactorElasticSearchClient(RestHighLevelClient client) {
+ public ReactorElasticSearchClient(ElasticsearchAsyncClient client) {
this.client = client;
}
@@ -89,28 +56,28 @@ public class ReactorElasticSearchClient implements AutoCloseable {
return toReactor(listener -> client.deleteByQueryAsync(deleteRequest, options, listener));
}
- public Mono<AcknowledgedResponse> deleteScript(DeleteStoredScriptRequest request, RequestOptions options) {
- return toReactor(listener -> client.deleteScriptAsync(request, options, listener));
- }
-
- public Mono<ExplainResponse> explain(ExplainRequest explainRequest, RequestOptions options) {
- return toReactor(listener -> client.explainAsync(explainRequest, options, listener));
- }
-
- public Mono<FieldCapabilitiesResponse> fieldCaps(FieldCapabilitiesRequest fieldCapabilitiesRequest, RequestOptions options) {
- return toReactor(listener -> client.fieldCapsAsync(fieldCapabilitiesRequest, options, listener));
- }
-
- public RestClient getLowLevelClient() {
- return client.getLowLevelClient();
- }
-
- public Mono<GetStoredScriptResponse> getScript(GetStoredScriptRequest request, RequestOptions options) {
- return toReactor(listener -> client.getScriptAsync(request, options, listener));
- }
-
- public Mono<IndexResponse> index(IndexRequest indexRequest, RequestOptions options) {
- return toReactor(listener -> client.indexAsync(indexRequest, options, listener));
+// public Mono<AcknowledgedResponse> deleteScript(DeleteStoredScriptRequest request, RequestOptions options) {
+// return toReactor(listener -> client.deleteScriptAsync(request, options, listener));
+// }
+//
+// public Mono<ExplainResponse> explain(ExplainRequest explainRequest, RequestOptions options) {
+// return toReactor(listener -> client.explainAsync(explainRequest, options, listener));
+// }
+//
+// public Mono<FieldCapabilitiesResponse> fieldCaps(FieldCapabilitiesRequest fieldCapabilitiesRequest, RequestOptions options) {
+// return toReactor(listener -> client.fieldCapsAsync(fieldCapabilitiesRequest, options, listener));
+// }
+//
+// public RestClient getLowLevelClient() {
+// return client.getLowLevelClient();
+// }
+//
+// public Mono<GetStoredScriptResponse> getScript(GetStoredScriptRequest request, RequestOptions options) {
+// return toReactor(listener -> client.getScriptAsync(request, options, listener));
+// }
+
+ public <T> Mono<CompletableFuture<IndexResponse>> index(IndexRequest<T> indexRequest) {
+ return Mono.fromCallable(() -> client.index(indexRequest));
}
public IndicesClient indices() {
@@ -121,17 +88,17 @@ public class ReactorElasticSearchClient implements AutoCloseable {
return client.info(options);
}
- public Mono<MultiSearchResponse> msearch(MultiSearchRequest multiSearchRequest, RequestOptions options) {
- return toReactor(listener -> client.msearchAsync(multiSearchRequest, options, listener));
- }
-
- public Mono<MultiSearchTemplateResponse> msearchTemplate(MultiSearchTemplateRequest multiSearchTemplateRequest, RequestOptions options) {
- return toReactor(listener -> client.msearchTemplateAsync(multiSearchTemplateRequest, options, listener));
- }
-
- public Mono<RankEvalResponse> rankEval(RankEvalRequest rankEvalRequest, RequestOptions options) {
- return toReactor(listener -> client.rankEvalAsync(rankEvalRequest, options, listener));
- }
+// public Mono<MultiSearchResponse> msearch(MultiSearchRequest multiSearchRequest, RequestOptions options) {
+// return toReactor(listener -> client.msearchAsync(multiSearchRequest, options, listener));
+// }
+//
+// public Mono<MultiSearchTemplateResponse> msearchTemplate(MultiSearchTemplateRequest multiSearchTemplateRequest, RequestOptions options) {
+// return toReactor(listener -> client.msearchTemplateAsync(multiSearchTemplateRequest, options, listener));
+// }
+//
+// public Mono<RankEvalResponse> rankEval(RankEvalRequest rankEvalRequest, RequestOptions options) {
+// return toReactor(listener -> client.rankEvalAsync(rankEvalRequest, options, listener));
+// }
public Mono<SearchResponse> scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) {
return toReactor(listener -> client.scrollAsync(searchScrollRequest, options, listener));
@@ -146,35 +113,16 @@ public class ReactorElasticSearchClient implements AutoCloseable {
.healthAsync(request, RequestOptions.DEFAULT, listener));
}
- public Mono<SearchTemplateResponse> searchTemplate(SearchTemplateRequest searchTemplateRequest, RequestOptions options) {
- return toReactor(listener -> client.searchTemplateAsync(searchTemplateRequest, options, listener));
- }
+// public Mono<SearchTemplateResponse> searchTemplate(SearchTemplateRequest searchTemplateRequest, RequestOptions options) {
+// return toReactor(listener -> client.searchTemplateAsync(searchTemplateRequest, options, listener));
+// }
public Mono<GetResponse> get(GetRequest getRequest, RequestOptions options) {
- return toReactor(listener -> client.getAsync(getRequest, options, listener));
+ return toReactor(listener -> client.get(getRequest, options, listener));
}
@Override
public void close() throws IOException {
client.close();
}
-
- private static <T> Mono<T> toReactor(Consumer<ActionListener<T>> async) {
- return Mono.<T>create(sink -> async.accept(getListener(sink)))
- .publishOn(Schedulers.elastic());
- }
-
- private static <T> ActionListener<T> getListener(MonoSink<T> sink) {
- return new ActionListener<T>() {
- @Override
- public void onResponse(T t) {
- sink.success(t);
- }
-
- @Override
- public void onFailure(Exception e) {
- sink.error(e);
- }
- };
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org