You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/05/25 09:50:12 UTC

[james-project] 02/02: WIP JAMES-3771 upgrade to elasticsearch 8.2.1

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch upgrade-es-8.2
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e6ceddfcab35c75aafe167156c24bbc359c46e8f
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Wed May 25 16:49:32 2022 +0700

    WIP JAMES-3771 upgrade to elasticsearch 8.2.1
---
 backends-common/elasticsearch-v7/pom.xml           |  20 ++-
 .../james/backends/es/v7/ClientProvider.java       |  34 +++--
 .../james/backends/es/v7/ElasticSearchIndexer.java |  18 +--
 .../backends/es/v7/ReactorElasticSearchClient.java | 148 +++++++--------------
 4 files changed, 89 insertions(+), 131 deletions(-)

diff --git a/backends-common/elasticsearch-v7/pom.xml b/backends-common/elasticsearch-v7/pom.xml
index abeeee5789..0d4b107d6d 100644
--- a/backends-common/elasticsearch-v7/pom.xml
+++ b/backends-common/elasticsearch-v7/pom.xml
@@ -47,6 +47,15 @@
             <artifactId>testing-base</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>co.elastic.clients</groupId>
+            <artifactId>elasticsearch-java</artifactId>
+            <version>8.2.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
 
         <!-- Prevents https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-28491 -->
         <dependency>
@@ -84,17 +93,6 @@
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.elasticsearch.client</groupId>
-            <artifactId>elasticsearch-rest-high-level-client</artifactId>
-            <version>7.10.2</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>commons-logging</groupId>
-                    <artifactId>commons-logging</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>jcl-over-slf4j</artifactId>
diff --git a/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ClientProvider.java b/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ClientProvider.java
index a8b16ffc22..1d9fc96cf2 100644
--- a/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ClientProvider.java
+++ b/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ClientProvider.java
@@ -50,10 +50,13 @@ import org.apache.james.backends.es.v7.ElasticSearchConfiguration.SSLConfigurati
 import org.apache.james.backends.es.v7.ElasticSearchConfiguration.SSLConfiguration.SSLValidationStrategy;
 import org.apache.james.util.concurrent.NamedThreadFactory;
 import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 import reactor.util.retry.Retry;
@@ -185,7 +188,8 @@ public class ClientProvider implements Provider<ReactorElasticSearchClient> {
     private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class);
 
     private final ElasticSearchConfiguration configuration;
-    private final RestHighLevelClient elasticSearchRestHighLevelClient;
+    private final RestClient lowLevelRestClient;
+    private final ElasticsearchAsyncClient elasticSearchClient;
     private final HttpAsyncClientConfigurer httpAsyncClientConfigurer;
     private final ReactorElasticSearchClient client;
 
@@ -193,15 +197,22 @@ public class ClientProvider implements Provider<ReactorElasticSearchClient> {
     public ClientProvider(ElasticSearchConfiguration configuration) {
         this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration);
         this.configuration = configuration;
-        this.elasticSearchRestHighLevelClient = connect(configuration);
-        this.client = new ReactorElasticSearchClient(this.elasticSearchRestHighLevelClient);
+        this.lowLevelRestClient = buildRestClient();
+        this.elasticSearchClient = connect();
+        this.client = new ReactorElasticSearchClient(this.elasticSearchClient);
     }
 
-    private RestHighLevelClient connect(ElasticSearchConfiguration configuration) {
+    private RestClient buildRestClient() {
+        return RestClient.builder(hostsToHttpHosts())
+            .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure)
+            .build();
+    }
+
+    private ElasticsearchAsyncClient connect() {
         Duration waitDelay = Duration.ofMillis(configuration.getMinDelay());
         boolean suppressLeadingZeroElements = true;
         boolean suppressTrailingZeroElements = true;
-        return Mono.fromCallable(() -> connectToCluster(configuration))
+        return Mono.fromCallable(() -> connectToCluster())
             .doOnError(e -> LOGGER.warn("Error establishing ElasticSearch connection. Next retry scheduled in {}",
                 DurationFormatUtils.formatDurationWords(waitDelay.toMillis(), suppressLeadingZeroElements, suppressTrailingZeroElements), e))
             .retryWhen(Retry.backoff(configuration.getMaxRetries(), waitDelay).scheduler(Schedulers.elastic()))
@@ -209,13 +220,12 @@ public class ClientProvider implements Provider<ReactorElasticSearchClient> {
             .block();
     }
 
-    private RestHighLevelClient connectToCluster(ElasticSearchConfiguration configuration) {
+    private ElasticsearchAsyncClient connectToCluster() {
         LOGGER.info("Trying to connect to ElasticSearch service at {}", LocalDateTime.now());
 
-        return new RestHighLevelClient(
-            RestClient
-                .builder(hostsToHttpHosts())
-                .setHttpClientConfigCallback(httpAsyncClientConfigurer::configure));
+        ElasticsearchTransport transport = new RestClientTransport(lowLevelRestClient, new JacksonJsonpMapper());
+
+        return new ElasticsearchAsyncClient(transport);
     }
 
     private HttpHost[] hostsToHttpHosts() {
@@ -231,6 +241,6 @@ public class ClientProvider implements Provider<ReactorElasticSearchClient> {
 
     @PreDestroy
     public void close() throws IOException {
-        elasticSearchRestHighLevelClient.close();
+        lowLevelRestClient.close();
     }
 }
diff --git a/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ElasticSearchIndexer.java b/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ElasticSearchIndexer.java
index 552f555f33..fa4904e492 100644
--- a/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ElasticSearchIndexer.java
+++ b/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ElasticSearchIndexer.java
@@ -19,6 +19,7 @@
 package org.apache.james.backends.es.v7;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.lang3.StringUtils;
 import org.elasticsearch.action.bulk.BulkRequest;
@@ -26,8 +27,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.common.ValidationException;
@@ -38,6 +37,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.core.IndexResponse;
 import reactor.core.publisher.Mono;
 
 public class ElasticSearchIndexer {
@@ -56,14 +57,15 @@ public class ElasticSearchIndexer {
         this.aliasName = aliasName;
     }
 
-    public Mono<IndexResponse> index(DocumentId id, String content, RoutingKey routingKey) {
+    public Mono<CompletableFuture<IndexResponse>> index(DocumentId id, String content, RoutingKey routingKey) {
         checkArgument(content);
         logContent(id, content);
-        return client.index(new IndexRequest(aliasName.getValue())
-                .id(id.asString())
-                .source(content, XContentType.JSON)
-                .routing(routingKey.asString()),
-            RequestOptions.DEFAULT);
+        return client.index(new IndexRequest.Builder<>()
+            .index(aliasName.getValue())
+            .id(id.asString())
+            .withJson(content) // InputStream? Or we could pass directly the object actually with .document() method
+            .routing(routingKey.asString())
+            .build());
     }
 
     private void logContent(DocumentId id, String content) {
diff --git a/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ReactorElasticSearchClient.java b/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ReactorElasticSearchClient.java
index f24b371a72..e9f5711404 100644
--- a/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ReactorElasticSearchClient.java
+++ b/backends-common/elasticsearch-v7/src/main/java/org/apache/james/backends/es/v7/ReactorElasticSearchClient.java
@@ -20,56 +20,23 @@
 package org.apache.james.backends.es.v7;
 
 import java.io.IOException;
-import java.util.function.Consumer;
-
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
-import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
-import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
-import org.elasticsearch.action.delete.DeleteResponse;
-import org.elasticsearch.action.explain.ExplainRequest;
-import org.elasticsearch.action.explain.ExplainResponse;
-import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
-import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.ClearScrollRequest;
-import org.elasticsearch.action.search.ClearScrollResponse;
-import org.elasticsearch.action.search.MultiSearchRequest;
-import org.elasticsearch.action.search.MultiSearchResponse;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchScrollRequest;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.IndicesClient;
+import java.util.concurrent.CompletableFuture;
+
 import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.core.MainResponse;
-import org.elasticsearch.index.rankeval.RankEvalRequest;
-import org.elasticsearch.index.rankeval.RankEvalResponse;
-import org.elasticsearch.index.reindex.BulkByScrollResponse;
-import org.elasticsearch.index.reindex.DeleteByQueryRequest;
-import org.elasticsearch.script.mustache.MultiSearchTemplateRequest;
-import org.elasticsearch.script.mustache.MultiSearchTemplateResponse;
-import org.elasticsearch.script.mustache.SearchTemplateRequest;
-import org.elasticsearch.script.mustache.SearchTemplateResponse;
 
+import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
+import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.ClearScrollRequest;
+import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest;
+import co.elastic.clients.elasticsearch.core.DeleteRequest;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.core.IndexResponse;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.MonoSink;
-import reactor.core.scheduler.Schedulers;
 
 public class ReactorElasticSearchClient implements AutoCloseable {
-    private final RestHighLevelClient client;
+    private final ElasticsearchAsyncClient client;
 
-    public ReactorElasticSearchClient(RestHighLevelClient client) {
+    public ReactorElasticSearchClient(ElasticsearchAsyncClient client) {
         this.client = client;
     }
 
@@ -89,28 +56,28 @@ public class ReactorElasticSearchClient implements AutoCloseable {
         return toReactor(listener -> client.deleteByQueryAsync(deleteRequest, options, listener));
     }
 
-    public Mono<AcknowledgedResponse> deleteScript(DeleteStoredScriptRequest request, RequestOptions options) {
-        return toReactor(listener -> client.deleteScriptAsync(request, options, listener));
-    }
-
-    public Mono<ExplainResponse> explain(ExplainRequest explainRequest, RequestOptions options) {
-        return toReactor(listener -> client.explainAsync(explainRequest, options, listener));
-    }
-
-    public Mono<FieldCapabilitiesResponse> fieldCaps(FieldCapabilitiesRequest fieldCapabilitiesRequest, RequestOptions options) {
-        return toReactor(listener -> client.fieldCapsAsync(fieldCapabilitiesRequest, options, listener));
-    }
-
-    public RestClient getLowLevelClient() {
-        return client.getLowLevelClient();
-    }
-
-    public Mono<GetStoredScriptResponse> getScript(GetStoredScriptRequest request, RequestOptions options) {
-        return toReactor(listener -> client.getScriptAsync(request, options, listener));
-    }
-
-    public Mono<IndexResponse> index(IndexRequest indexRequest, RequestOptions options) {
-        return toReactor(listener -> client.indexAsync(indexRequest, options, listener));
+//    public Mono<AcknowledgedResponse> deleteScript(DeleteStoredScriptRequest request, RequestOptions options) {
+//        return toReactor(listener -> client.deleteScriptAsync(request, options, listener));
+//    }
+//
+//    public Mono<ExplainResponse> explain(ExplainRequest explainRequest, RequestOptions options) {
+//        return toReactor(listener -> client.explainAsync(explainRequest, options, listener));
+//    }
+//
+//    public Mono<FieldCapabilitiesResponse> fieldCaps(FieldCapabilitiesRequest fieldCapabilitiesRequest, RequestOptions options) {
+//        return toReactor(listener -> client.fieldCapsAsync(fieldCapabilitiesRequest, options, listener));
+//    }
+//
+//    public RestClient getLowLevelClient() {
+//        return client.getLowLevelClient();
+//    }
+//
+//    public Mono<GetStoredScriptResponse> getScript(GetStoredScriptRequest request, RequestOptions options) {
+//        return toReactor(listener -> client.getScriptAsync(request, options, listener));
+//    }
+
+    public <T> Mono<CompletableFuture<IndexResponse>> index(IndexRequest<T> indexRequest) {
+        return Mono.fromCallable(() -> client.index(indexRequest));
     }
 
     public IndicesClient indices() {
@@ -121,17 +88,17 @@ public class ReactorElasticSearchClient implements AutoCloseable {
         return client.info(options);
     }
 
-    public Mono<MultiSearchResponse> msearch(MultiSearchRequest multiSearchRequest, RequestOptions options) {
-        return toReactor(listener -> client.msearchAsync(multiSearchRequest, options, listener));
-    }
-
-    public Mono<MultiSearchTemplateResponse> msearchTemplate(MultiSearchTemplateRequest multiSearchTemplateRequest, RequestOptions options) {
-        return toReactor(listener -> client.msearchTemplateAsync(multiSearchTemplateRequest, options, listener));
-    }
-
-    public Mono<RankEvalResponse> rankEval(RankEvalRequest rankEvalRequest, RequestOptions options) {
-        return toReactor(listener -> client.rankEvalAsync(rankEvalRequest, options, listener));
-    }
+//    public Mono<MultiSearchResponse> msearch(MultiSearchRequest multiSearchRequest, RequestOptions options) {
+//        return toReactor(listener -> client.msearchAsync(multiSearchRequest, options, listener));
+//    }
+//
+//    public Mono<MultiSearchTemplateResponse> msearchTemplate(MultiSearchTemplateRequest multiSearchTemplateRequest, RequestOptions options) {
+//        return toReactor(listener -> client.msearchTemplateAsync(multiSearchTemplateRequest, options, listener));
+//    }
+//
+//    public Mono<RankEvalResponse> rankEval(RankEvalRequest rankEvalRequest, RequestOptions options) {
+//        return toReactor(listener -> client.rankEvalAsync(rankEvalRequest, options, listener));
+//    }
 
     public Mono<SearchResponse> scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) {
         return toReactor(listener -> client.scrollAsync(searchScrollRequest, options, listener));
@@ -146,35 +113,16 @@ public class ReactorElasticSearchClient implements AutoCloseable {
             .healthAsync(request, RequestOptions.DEFAULT, listener));
     }
 
-    public Mono<SearchTemplateResponse> searchTemplate(SearchTemplateRequest searchTemplateRequest, RequestOptions options) {
-        return toReactor(listener -> client.searchTemplateAsync(searchTemplateRequest, options, listener));
-    }
+//    public Mono<SearchTemplateResponse> searchTemplate(SearchTemplateRequest searchTemplateRequest, RequestOptions options) {
+//        return toReactor(listener -> client.searchTemplateAsync(searchTemplateRequest, options, listener));
+//    }
 
     public Mono<GetResponse> get(GetRequest getRequest, RequestOptions options) {
-        return toReactor(listener -> client.getAsync(getRequest, options, listener));
+        return toReactor(listener -> client.get(getRequest, options, listener));
     }
 
     @Override
     public void close() throws IOException {
         client.close();
     }
-
-    private static <T> Mono<T> toReactor(Consumer<ActionListener<T>> async) {
-        return Mono.<T>create(sink -> async.accept(getListener(sink)))
-            .publishOn(Schedulers.elastic());
-    }
-
-    private static <T> ActionListener<T> getListener(MonoSink<T> sink) {
-        return new ActionListener<T>() {
-            @Override
-            public void onResponse(T t) {
-                sink.success(t);
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                sink.error(e);
-            }
-        };
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org