You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/08 02:25:48 UTC
[james-project] 03/06: JAMES-3144 Use ElasticSearch reactively
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 97e06e9ccafa1d5a8869fdcc2b5ead42564a3c90
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Fri Mar 13 16:05:21 2020 +0100
JAMES-3144 Use ElasticSearch reactively
---
.../apache/james/backends/es/ClientProvider.java | 12 +-
.../james/backends/es/DeleteByQueryPerformer.java | 17 +-
.../backends/es/ElasticSearchHealthCheck.java | 5 +-
.../james/backends/es/ElasticSearchIndexer.java | 67 ++++----
.../james/backends/es/IndexCreationFactory.java | 11 +-
.../apache/james/backends/es/ListenerToFuture.java | 51 ------
.../james/backends/es/NodeMappingFactory.java | 7 +-
.../backends/es/ReactorElasticSearchClient.java | 171 +++++++++++++++++++++
.../james/backends/es/search/ScrolledSearch.java | 113 ++++++--------
.../es/ClientProviderImplConnectionContract.java | 5 +-
.../es/ElasticSearchHealthCheckConnectionTest.java | 3 +-
.../backends/es/ElasticSearchIndexerTest.java | 72 +++++----
.../backends/es/IndexCreationFactoryTest.java | 3 +-
.../backends/es/NodeMappingFactoryAuthTest.java | 3 +-
.../james/backends/es/NodeMappingFactoryTest.java | 3 +-
.../backends/es/search/ScrolledSearchTest.java | 37 +++--
.../elasticsearch/MailboxIndexCreationUtil.java | 14 +-
.../ElasticSearchListeningMessageSearchIndex.java | 38 +++--
.../search/ElasticSearchSearcher.java | 23 +--
.../ElasticSearchIntegrationTest.java | 4 +-
...asticSearchListeningMessageSearchIndexTest.java | 10 +-
.../search/ElasticSearchSearcherTest.java | 4 +-
.../elasticsearch/ElasticSearchQuotaSearcher.java | 35 ++---
.../QuotaSearchIndexCreationUtil.java | 6 +-
.../events/ElasticSearchQuotaMailboxListener.java | 8 +-
...lasticSearchQuotaSearchTestSystemExtension.java | 4 +-
.../ElasticSearchQuotaMailboxListenerTest.java | 13 +-
.../host/ElasticSearchHostSystem.java | 4 +-
.../modules/mailbox/ElasticSearchClientModule.java | 4 +-
.../mailbox/ElasticSearchMailboxModule.java | 10 +-
.../mailbox/ElasticSearchQuotaSearcherModule.java | 10 +-
.../modules/mailbox/ElasticSearchStartUpCheck.java | 6 +-
.../test/java/org/apache/james/ESReporterTest.java | 5 +-
.../routes/ElasticSearchQuotaSearchExtension.java | 4 +-
34 files changed, 450 insertions(+), 332 deletions(-)
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
index b879a88..247ed50 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java
@@ -57,7 +57,7 @@ import com.google.common.annotations.VisibleForTesting;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
-public class ClientProvider implements Provider<RestHighLevelClient> {
+public class ClientProvider implements Provider<ReactorElasticSearchClient> {
private static class HttpAsyncClientConfigurer {
@@ -165,15 +165,17 @@ public class ClientProvider implements Provider<RestHighLevelClient> {
private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class);
private final ElasticSearchConfiguration configuration;
- private final RestHighLevelClient client;
+ private final RestHighLevelClient elasticSearchRestHighLevelClient;
private final HttpAsyncClientConfigurer httpAsyncClientConfigurer;
+ private final ReactorElasticSearchClient client;
@Inject
@VisibleForTesting
ClientProvider(ElasticSearchConfiguration configuration) {
this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration);
this.configuration = configuration;
- this.client = connect(configuration);
+ this.elasticSearchRestHighLevelClient = connect(configuration);
+ this.client = new ReactorElasticSearchClient(this.elasticSearchRestHighLevelClient);
}
private RestHighLevelClient connect(ElasticSearchConfiguration configuration) {
@@ -206,12 +208,12 @@ public class ClientProvider implements Provider<RestHighLevelClient> {
}
@Override
- public RestHighLevelClient get() {
+ public ReactorElasticSearchClient get() {
return client;
}
@PreDestroy
public void close() throws IOException {
- client.close();
+ elasticSearchRestHighLevelClient.close();
}
}
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
index 66ad682..5277694 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java
@@ -26,7 +26,6 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
@@ -34,18 +33,17 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import com.google.common.annotations.VisibleForTesting;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class DeleteByQueryPerformer {
private static final TimeValue TIMEOUT = new TimeValue(60000);
- private final RestHighLevelClient client;
+ private final ReactorElasticSearchClient client;
private final int batchSize;
private final WriteAliasName aliasName;
@VisibleForTesting
- DeleteByQueryPerformer(RestHighLevelClient client, int batchSize, WriteAliasName aliasName) {
+ DeleteByQueryPerformer(ReactorElasticSearchClient client, int batchSize, WriteAliasName aliasName) {
this.client = client;
this.batchSize = batchSize;
this.aliasName = aliasName;
@@ -54,9 +52,10 @@ public class DeleteByQueryPerformer {
public Mono<Void> perform(QueryBuilder queryBuilder, RoutingKey routingKey) {
SearchRequest searchRequest = prepareSearch(queryBuilder, routingKey);
- return Flux.fromStream(new ScrolledSearch(client, searchRequest).searchResponses())
- .flatMap(searchResponse -> deleteRetrievedIds(client, searchResponse, routingKey))
- .thenEmpty(Mono.empty());
+ return new ScrolledSearch(client, searchRequest).searchResponses()
+ .filter(searchResponse -> searchResponse.getHits().getHits().length > 0)
+ .flatMap(searchResponse -> deleteRetrievedIds(searchResponse, routingKey))
+ .then();
}
private SearchRequest prepareSearch(QueryBuilder queryBuilder, RoutingKey routingKey) {
@@ -73,7 +72,7 @@ public class DeleteByQueryPerformer {
.size(batchSize);
}
- private Mono<BulkResponse> deleteRetrievedIds(RestHighLevelClient client, SearchResponse searchResponse, RoutingKey routingKey) {
+ private Mono<BulkResponse> deleteRetrievedIds(SearchResponse searchResponse, RoutingKey routingKey) {
BulkRequest request = new BulkRequest();
for (SearchHit hit : searchResponse.getHits()) {
@@ -84,6 +83,6 @@ public class DeleteByQueryPerformer {
.routing(routingKey.asString()));
}
- return Mono.fromCallable(() -> client.bulk(request, RequestOptions.DEFAULT));
+ return client.bulk(request, RequestOptions.DEFAULT);
}
}
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java
index 1305420..58cd813 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java
@@ -32,7 +32,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Requests;
-import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,10 +43,10 @@ public class ElasticSearchHealthCheck implements HealthCheck {
private static final ComponentName COMPONENT_NAME = new ComponentName("ElasticSearch Backend");
private final Set<IndexName> indexNames;
- private final RestHighLevelClient client;
+ private final ReactorElasticSearchClient client;
@Inject
- ElasticSearchHealthCheck(RestHighLevelClient client, Set<IndexName> indexNames) {
+ ElasticSearchHealthCheck(ReactorElasticSearchClient client, Set<IndexName> indexNames) {
this.client = client;
this.indexNames = indexNames;
}
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
index 99a9f80..0fcbc11 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java
@@ -18,9 +18,7 @@
****************************************************************/
package org.apache.james.backends.es;
-import java.io.IOException;
import java.util.List;
-import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkRequest;
@@ -30,7 +28,6 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
@@ -40,23 +37,25 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import reactor.core.publisher.Mono;
+
public class ElasticSearchIndexer {
private static final int DEBUG_MAX_LENGTH_CONTENT = 1000;
private static final int DEFAULT_BATCH_SIZE = 100;
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class);
- private final RestHighLevelClient client;
+ private final ReactorElasticSearchClient client;
private final AliasName aliasName;
private final DeleteByQueryPerformer deleteByQueryPerformer;
- public ElasticSearchIndexer(RestHighLevelClient client,
+ public ElasticSearchIndexer(ReactorElasticSearchClient client,
WriteAliasName aliasName) {
this(client, aliasName, DEFAULT_BATCH_SIZE);
}
@VisibleForTesting
- public ElasticSearchIndexer(RestHighLevelClient client,
+ public ElasticSearchIndexer(ReactorElasticSearchClient client,
WriteAliasName aliasName,
int batchSize) {
this.client = client;
@@ -64,7 +63,7 @@ public class ElasticSearchIndexer {
this.aliasName = aliasName;
}
- public IndexResponse index(DocumentId id, String content, RoutingKey routingKey) throws IOException {
+ public Mono<IndexResponse> index(DocumentId id, String content, RoutingKey routingKey) {
checkArgument(content);
logContent(id, content);
return client.index(new IndexRequest(aliasName.getValue())
@@ -81,37 +80,37 @@ public class ElasticSearchIndexer {
}
}
- public Optional<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts, RoutingKey routingKey) throws IOException {
- try {
- Preconditions.checkNotNull(updatedDocumentParts);
- Preconditions.checkNotNull(routingKey);
- BulkRequest request = new BulkRequest();
- updatedDocumentParts.forEach(updatedDocumentPart -> request.add(
- new UpdateRequest(aliasName.getValue(),
- NodeMappingFactory.DEFAULT_MAPPING_NAME,
- updatedDocumentPart.getId().asString())
+ public Mono<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts, RoutingKey routingKey) {
+ Preconditions.checkNotNull(updatedDocumentParts);
+ Preconditions.checkNotNull(routingKey);
+ BulkRequest request = new BulkRequest();
+ updatedDocumentParts.forEach(updatedDocumentPart -> request.add(
+ new UpdateRequest(aliasName.getValue(),
+ NodeMappingFactory.DEFAULT_MAPPING_NAME,
+ updatedDocumentPart.getId().asString())
.doc(updatedDocumentPart.getUpdatedDocumentPart(), XContentType.JSON)
.routing(routingKey.asString())));
- return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
- } catch (ValidationException e) {
- LOGGER.warn("Error while updating index", e);
- return Optional.empty();
- }
+
+ return client.bulk(request, RequestOptions.DEFAULT)
+ .onErrorResume(ValidationException.class, exception -> {
+ LOGGER.warn("Error while updating index", exception);
+ return Mono.empty();
+ });
}
- public Optional<BulkResponse> delete(List<DocumentId> ids, RoutingKey routingKey) throws IOException {
- try {
- BulkRequest request = new BulkRequest();
- ids.forEach(id -> request.add(
- new DeleteRequest(aliasName.getValue())
- .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
- .id(id.asString())
- .routing(routingKey.asString())));
- return Optional.of(client.bulk(request, RequestOptions.DEFAULT));
- } catch (ValidationException e) {
- LOGGER.warn("Error while deleting index", e);
- return Optional.empty();
- }
+ public Mono<BulkResponse> delete(List<DocumentId> ids, RoutingKey routingKey) {
+ BulkRequest request = new BulkRequest();
+ ids.forEach(id -> request.add(
+ new DeleteRequest(aliasName.getValue())
+ .type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
+ .id(id.asString())
+ .routing(routingKey.asString())));
+
+ return client.bulk(request, RequestOptions.DEFAULT)
+ .onErrorResume(ValidationException.class, exception -> {
+ LOGGER.warn("Error while deleting index", exception);
+ return Mono.empty();
+ });
}
public void deleteAllMatchingQuery(QueryBuilder queryBuilder, RoutingKey routingKey) {
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java
index 9d6f834..07e41e8 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java
@@ -31,7 +31,6 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +62,7 @@ public class IndexCreationFactory {
return this;
}
- public RestHighLevelClient createIndexAndAliases(RestHighLevelClient client) {
+ public ReactorElasticSearchClient createIndexAndAliases(ReactorElasticSearchClient client) {
return new IndexCreationPerformer(nbShards, nbReplica, waitForActiveShards, indexName, aliases.build()).createIndexAndAliases(client);
}
}
@@ -83,7 +82,7 @@ public class IndexCreationFactory {
this.aliases = aliases;
}
- public RestHighLevelClient createIndexAndAliases(RestHighLevelClient client) {
+ public ReactorElasticSearchClient createIndexAndAliases(ReactorElasticSearchClient client) {
Preconditions.checkNotNull(indexName);
try {
createIndexIfNeeded(client, indexName, generateSetting(nbShards, nbReplica, waitForActiveShards));
@@ -95,7 +94,7 @@ public class IndexCreationFactory {
return client;
}
- private void createAliasIfNeeded(RestHighLevelClient client, IndexName indexName, AliasName aliasName) throws IOException {
+ private void createAliasIfNeeded(ReactorElasticSearchClient client, IndexName indexName, AliasName aliasName) throws IOException {
if (!aliasExist(client, aliasName)) {
client.indices()
.updateAliases(
@@ -107,12 +106,12 @@ public class IndexCreationFactory {
}
}
- private boolean aliasExist(RestHighLevelClient client, AliasName aliasName) throws IOException {
+ private boolean aliasExist(ReactorElasticSearchClient client, AliasName aliasName) throws IOException {
return client.indices()
.existsAlias(new GetAliasesRequest().aliases(aliasName.getValue()), RequestOptions.DEFAULT);
}
- private void createIndexIfNeeded(RestHighLevelClient client, IndexName indexName, XContentBuilder settings) throws IOException {
+ private void createIndexIfNeeded(ReactorElasticSearchClient client, IndexName indexName, XContentBuilder settings) throws IOException {
try {
client.indices()
.create(
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ListenerToFuture.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ListenerToFuture.java
deleted file mode 100644
index 1df57b0..0000000
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ListenerToFuture.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-
-package org.apache.james.backends.es;
-
-import java.util.concurrent.CompletableFuture;
-
-import org.elasticsearch.action.ActionListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ListenerToFuture<T> implements ActionListener<T> {
- private static final Logger LOGGER = LoggerFactory.getLogger(ListenerToFuture.class);
-
- private CompletableFuture<T> future;
-
- public ListenerToFuture() {
- this.future = new CompletableFuture<>();
- }
-
- @Override
- public void onResponse(T t) {
- future.complete(t);
- }
-
- @Override
- public void onFailure(Exception e) {
- LOGGER.warn("Error while waiting ElasticSearch query execution: ", e);
- future.completeExceptionally(e);
- }
-
- public CompletableFuture<T> getFuture() {
- return future;
- }
-}
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java
index f4359f4..25b82e7 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java
@@ -25,7 +25,6 @@ import org.apache.http.HttpStatus;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.ResponseException;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
public class NodeMappingFactory {
@@ -55,7 +54,7 @@ public class NodeMappingFactory {
public static final String SNOWBALL = "snowball";
public static final String IGNORE_ABOVE = "ignore_above";
- public static RestHighLevelClient applyMapping(RestHighLevelClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
+ public static ReactorElasticSearchClient applyMapping(ReactorElasticSearchClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
if (!mappingAlreadyExist(client, indexName)) {
createMapping(client, indexName, mappingsSources);
}
@@ -64,7 +63,7 @@ public class NodeMappingFactory {
// ElasticSearch 6.3.2 does not support field master_timeout that is set up by 6.4.3 REST client when relying on getMapping
@SuppressWarnings("deprecation")
- public static boolean mappingAlreadyExist(RestHighLevelClient client, IndexName indexName) throws IOException {
+ public static boolean mappingAlreadyExist(ReactorElasticSearchClient client, IndexName indexName) throws IOException {
try {
client.getLowLevelClient().performRequest("GET", "/" + indexName.getValue() + "/_mapping/" + NodeMappingFactory.DEFAULT_MAPPING_NAME);
return true;
@@ -76,7 +75,7 @@ public class NodeMappingFactory {
return false;
}
- public static void createMapping(RestHighLevelClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
+ public static void createMapping(ReactorElasticSearchClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException {
client.indices().putMapping(
new PutMappingRequest(indexName.getValue())
.type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java
new file mode 100644
index 0000000..3b0c18b
--- /dev/null
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java
@@ -0,0 +1,171 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.backends.es;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
+import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptResponse;
+import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
+import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.explain.ExplainRequest;
+import org.elasticsearch.action.explain.ExplainResponse;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.main.MainResponse;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.ClearScrollResponse;
+import org.elasticsearch.action.search.MultiSearchRequest;
+import org.elasticsearch.action.search.MultiSearchResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.ClusterClient;
+import org.elasticsearch.client.IndicesClient;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.rankeval.RankEvalRequest;
+import org.elasticsearch.index.rankeval.RankEvalResponse;
+import org.elasticsearch.script.mustache.MultiSearchTemplateRequest;
+import org.elasticsearch.script.mustache.MultiSearchTemplateResponse;
+import org.elasticsearch.script.mustache.SearchTemplateRequest;
+import org.elasticsearch.script.mustache.SearchTemplateResponse;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
+import reactor.core.scheduler.Schedulers;
+
+public class ReactorElasticSearchClient implements AutoCloseable {
+ private final RestHighLevelClient client;
+
+ public ReactorElasticSearchClient(RestHighLevelClient client) {
+ this.client = client;
+ }
+
+ public Mono<BulkResponse> bulk(BulkRequest bulkRequest, RequestOptions options) {
+ return toReactor(listener -> client.bulkAsync(bulkRequest, options, listener));
+ }
+
+ public Mono<ClearScrollResponse> clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) {
+ return toReactor(listener -> client.clearScrollAsync(clearScrollRequest, options, listener));
+ }
+
+ public ClusterClient cluster() {
+ return client.cluster();
+ }
+
+ public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException {
+ return client.delete(deleteRequest, options);
+ }
+
+ public Mono<DeleteStoredScriptResponse> deleteScript(DeleteStoredScriptRequest request, RequestOptions options) {
+ return toReactor(listener -> client.deleteScriptAsync(request, options, listener));
+ }
+
+ public Mono<ExplainResponse> explain(ExplainRequest explainRequest, RequestOptions options) {
+ return toReactor(listener -> client.explainAsync(explainRequest, options, listener));
+ }
+
+ public Mono<FieldCapabilitiesResponse> fieldCaps(FieldCapabilitiesRequest fieldCapabilitiesRequest, RequestOptions options) {
+ return toReactor(listener -> client.fieldCapsAsync(fieldCapabilitiesRequest, options, listener));
+ }
+
+ public RestClient getLowLevelClient() {
+ return client.getLowLevelClient();
+ }
+
+ public Mono<GetStoredScriptResponse> getScript(GetStoredScriptRequest request, RequestOptions options) {
+ return toReactor(listener -> client.getScriptAsync(request, options, listener));
+ }
+
+ public Mono<IndexResponse> index(IndexRequest indexRequest, RequestOptions options) {
+ return toReactor(listener -> client.indexAsync(indexRequest, options, listener));
+ }
+
+ public IndicesClient indices() {
+ return client.indices();
+ }
+
+ public MainResponse info(RequestOptions options) throws IOException {
+ return client.info(options);
+ }
+
+ public Mono<MultiSearchResponse> msearch(MultiSearchRequest multiSearchRequest, RequestOptions options) {
+ return toReactor(listener -> client.msearchAsync(multiSearchRequest, options, listener));
+ }
+
+ public Mono<MultiSearchTemplateResponse> msearchTemplate(MultiSearchTemplateRequest multiSearchTemplateRequest, RequestOptions options) {
+ return toReactor(listener -> client.msearchTemplateAsync(multiSearchTemplateRequest, options, listener));
+ }
+
+ public Mono<RankEvalResponse> rankEval(RankEvalRequest rankEvalRequest, RequestOptions options) {
+ return toReactor(listener -> client.rankEvalAsync(rankEvalRequest, options, listener));
+ }
+
+ public Mono<SearchResponse> scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) {
+ return toReactor(listener -> client.scrollAsync(searchScrollRequest, options, listener));
+ }
+
+ @Deprecated
+ public Mono<SearchResponse> search(SearchRequest searchRequest) {
+ return toReactor(listener -> client.searchAsync(searchRequest, listener));
+ }
+
+ public Mono<SearchResponse> search(SearchRequest searchRequest, RequestOptions options) {
+ return toReactor(listener -> client.searchAsync(searchRequest, options, listener));
+ }
+
+ public Mono<SearchTemplateResponse> searchTemplate(SearchTemplateRequest searchTemplateRequest, RequestOptions options) {
+ return toReactor(listener -> client.searchTemplateAsync(searchTemplateRequest, options, listener));
+ }
+
+ @Override
+ public void close() throws IOException {
+ client.close();
+ }
+
+ private static <T> Mono<T> toReactor(Consumer<ActionListener<T>> async) {
+ return Mono.<T>create(sink -> async.accept(getListener(sink)))
+ .subscribeOn(Schedulers.elastic());
+ }
+
+ private static <T> ActionListener<T> getListener(MonoSink<T> sink) {
+ return new ActionListener<T>() {
+ @Override
+ public void onResponse(T t) {
+ sink.success(t);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ sink.error(e);
+ }
+ };
+ }
+}
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
index 834f801..10146ea 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
@@ -19,94 +19,79 @@
package org.apache.james.backends.es.search;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Stream;
-
-import org.apache.james.backends.es.ListenerToFuture;
-import org.apache.james.util.streams.Iterators;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
-import com.github.fge.lambdas.Throwing;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class ScrolledSearch {
- private static class ScrollIterator implements Iterator<SearchResponse>, Closeable {
- private final RestHighLevelClient client;
- private CompletableFuture<SearchResponse> searchResponseFuture;
+ private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
- ScrollIterator(RestHighLevelClient client, SearchRequest searchRequest) {
- this.client = client;
- ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>();
- client.searchAsync(searchRequest, RequestOptions.DEFAULT, listener);
+ private final ReactorElasticSearchClient client;
+ private final SearchRequest searchRequest;
- this.searchResponseFuture = listener.getFuture();
- }
+ public ScrolledSearch(ReactorElasticSearchClient client, SearchRequest searchRequest) {
+ this.client = client;
+ this.searchRequest = searchRequest;
+ }
- @Override
- public void close() throws IOException {
- ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
- clearScrollRequest.addScrollId(searchResponseFuture.join().getScrollId());
- client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
- }
- @Override
- public boolean hasNext() {
- SearchResponse join = searchResponseFuture.join();
- return !allSearchResponsesConsumed(join);
- }
+ public Flux<SearchHit> searchHits() {
+ return searchResponses()
+ .flatMap(searchResponse -> Flux.fromArray(searchResponse.getHits().getHits()));
+ }
- @Override
- public SearchResponse next() {
- SearchResponse result = searchResponseFuture.join();
- ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>();
- client.scrollAsync(
- new SearchScrollRequest()
- .scrollId(result.getScrollId())
- .scroll(TIMEOUT),
- RequestOptions.DEFAULT,
- listener);
- searchResponseFuture = listener.getFuture();
- return result;
- }
+ public Flux<SearchResponse> searchResponses() {
+ return ensureClosing(Flux.from(startScrolling(searchRequest))
+ .expand(this::nextResponse));
+ }
- public Stream<SearchResponse> stream() {
- return Iterators.toStream(this)
- .onClose(Throwing.runnable(this::close));
- }
+ private Mono<SearchResponse> startScrolling(SearchRequest searchRequest) {
+ return client.search(searchRequest, RequestOptions.DEFAULT);
+ }
- private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
- return searchResponse.getHits().getHits().length == 0;
+ public Mono<SearchResponse> nextResponse(SearchResponse previous) {
+ if (allSearchResponsesConsumed(previous)) {
+ return Mono.empty();
}
- }
- private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
+ return client.scroll(
+ new SearchScrollRequest()
+ .scrollId(previous.getScrollId())
+ .scroll(TIMEOUT),
+ RequestOptions.DEFAULT);
+ }
- private final RestHighLevelClient client;
- private final SearchRequest searchRequest;
+ private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
+ return searchResponse.getHits().getHits().length == 0;
+ }
- public ScrolledSearch(RestHighLevelClient client, SearchRequest searchRequest) {
- this.client = client;
- this.searchRequest = searchRequest;
+ private Flux<SearchResponse> ensureClosing(Flux<SearchResponse> origin) {
+ AtomicReference<SearchResponse> latest = new AtomicReference<>();
+ return origin
+ .doOnNext(latest::set)
+ .doOnTerminate(close(latest));
}
- public Stream<SearchHit> searchHits() {
- return searchResponses()
- .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits()));
+ public Runnable close(AtomicReference<SearchResponse> latest) {
+ return () -> Optional.ofNullable(latest.getAndSet(null)).map(this::clearScroll);
}
- @SuppressWarnings("resource")
- public Stream<SearchResponse> searchResponses() {
- return new ScrollIterator(client, searchRequest)
- .stream();
+ private Disposable clearScroll(SearchResponse current) {
+ ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+ clearScrollRequest.addScrollId(current.getScrollId());
+
+ return client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT).subscribe();
}
}
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionContract.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionContract.java
index 05a7ec4..4a4328c 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionContract.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionContract.java
@@ -25,7 +25,6 @@ import org.apache.james.backends.es.ElasticSearchClusterExtension.ElasticSearchC
import org.awaitility.Awaitility;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
@@ -75,11 +74,11 @@ interface ClientProviderImplConnectionContract {
}
default boolean isConnected(ClientProvider clientProvider) {
- try (RestHighLevelClient client = clientProvider.get()) {
+ try (ReactorElasticSearchClient client = clientProvider.get()) {
client.search(
new SearchRequest()
.source(new SearchSourceBuilder().query(QueryBuilders.existsQuery("any"))),
- RequestOptions.DEFAULT);
+ RequestOptions.DEFAULT).block();
return true;
} catch (Exception e) {
LOGGER.info("Caught exception while trying to connect", e);
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchHealthCheckConnectionTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchHealthCheckConnectionTest.java
index 51e1b2e..0f5ddd6 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchHealthCheckConnectionTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchHealthCheckConnectionTest.java
@@ -22,7 +22,6 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
-import org.elasticsearch.client.RestHighLevelClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -38,7 +37,7 @@ class ElasticSearchHealthCheckConnectionTest {
@BeforeEach
void setUp() {
- RestHighLevelClient client = elasticSearch.getDockerElasticSearch().clientProvider(REQUEST_TIMEOUT).get();
+ ReactorElasticSearchClient client = elasticSearch.getDockerElasticSearch().clientProvider(REQUEST_TIMEOUT).get();
elasticSearchHealthCheck = new ElasticSearchHealthCheck(client, ImmutableSet.of());
}
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
index c7372a5..f5f8a40 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java
@@ -33,7 +33,6 @@ import org.awaitility.core.ConditionFactory;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.AfterEach;
@@ -62,7 +61,7 @@ class ElasticSearchIndexerTest {
@RegisterExtension
public DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
private ElasticSearchIndexer testee;
- private RestHighLevelClient client;
+ private ReactorElasticSearchClient client;
@BeforeEach
void setup() {
@@ -80,17 +79,18 @@ class ElasticSearchIndexerTest {
}
@Test
- void indexMessageShouldWork() throws Exception {
+ void indexMessageShouldWork() {
DocumentId documentId = DocumentId.fromString("1");
String content = "{\"message\": \"trying out Elasticsearch\"}";
- testee.index(documentId, content, useDocumentId(documentId));
+ testee.index(documentId, content, useDocumentId(documentId)).block();
elasticSearch.awaitForElasticSearch();
-
+
SearchResponse searchResponse = client.search(
new SearchRequest(INDEX_NAME.getValue())
.source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "trying"))),
- RequestOptions.DEFAULT);
+ RequestOptions.DEFAULT)
+ .block();
assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
}
@@ -101,64 +101,66 @@ class ElasticSearchIndexerTest {
}
@Test
- void updateMessages() throws Exception {
+ void updateMessages() {
String content = "{\"message\": \"trying out Elasticsearch\",\"field\":\"Should be unchanged\"}";
- testee.index(DOCUMENT_ID, content, useDocumentId(DOCUMENT_ID));
+ testee.index(DOCUMENT_ID, content, useDocumentId(DOCUMENT_ID)).block();
elasticSearch.awaitForElasticSearch();
- testee.update(ImmutableList.of(new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), useDocumentId(DOCUMENT_ID));
+ testee.update(ImmutableList.of(new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), useDocumentId(DOCUMENT_ID)).block();
elasticSearch.awaitForElasticSearch();
SearchResponse searchResponse = client.search(
new SearchRequest(INDEX_NAME.getValue())
.source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "mastering"))),
- RequestOptions.DEFAULT);
+ RequestOptions.DEFAULT)
+ .block();
assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
SearchResponse searchResponse2 = client.search(
new SearchRequest(INDEX_NAME.getValue())
.source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("field", "unchanged"))),
- RequestOptions.DEFAULT);
+ RequestOptions.DEFAULT)
+ .block();
assertThat(searchResponse2.getHits().getTotalHits()).isEqualTo(1);
}
@Test
void updateMessageShouldThrowWhenJsonIsNull() {
assertThatThrownBy(() -> testee.update(ImmutableList.of(
- new UpdatedRepresentation(DOCUMENT_ID, null)), ROUTING))
+ new UpdatedRepresentation(DOCUMENT_ID, null)), ROUTING).block())
.isInstanceOf(IllegalArgumentException.class);
}
@Test
void updateMessageShouldThrowWhenIdIsNull() {
assertThatThrownBy(() -> testee.update(ImmutableList.of(
- new UpdatedRepresentation(null, "{\"message\": \"mastering out Elasticsearch\"}")), ROUTING))
+ new UpdatedRepresentation(null, "{\"message\": \"mastering out Elasticsearch\"}")), ROUTING).block())
.isInstanceOf(NullPointerException.class);
}
@Test
void updateMessageShouldThrowWhenJsonIsEmpty() {
assertThatThrownBy(() -> testee.update(ImmutableList.of(
- new UpdatedRepresentation(DOCUMENT_ID, "")), ROUTING))
+ new UpdatedRepresentation(DOCUMENT_ID, "")), ROUTING).block())
.isInstanceOf(IllegalArgumentException.class);
}
@Test
void updateMessageShouldThrowWhenRoutingKeyIsNull() {
assertThatThrownBy(() -> testee.update(ImmutableList.of(
- new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), null))
+ new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), null).block())
.isInstanceOf(NullPointerException.class);
}
@Test
- void deleteByQueryShouldWorkOnSingleMessage() throws Exception {
+ void deleteByQueryShouldWorkOnSingleMessage() {
DocumentId documentId = DocumentId.fromString("1:2");
String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}";
RoutingKey routingKey = useDocumentId(documentId);
- testee.index(documentId, content, routingKey);
+ testee.index(documentId, content, routingKey).block();
elasticSearch.awaitForElasticSearch();
testee.deleteAllMatchingQuery(termQuery("property", "1"), routingKey);
@@ -169,25 +171,26 @@ class ElasticSearchIndexerTest {
new SearchRequest(INDEX_NAME.getValue())
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
RequestOptions.DEFAULT)
+ .block()
.getHits().getTotalHits() == 0);
}
@Test
- void deleteByQueryShouldWorkWhenMultipleMessages() throws Exception {
+ void deleteByQueryShouldWorkWhenMultipleMessages() {
DocumentId documentId = DocumentId.fromString("1:1");
String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}";
- testee.index(documentId, content, ROUTING);
+ testee.index(documentId, content, ROUTING).block();
DocumentId documentId2 = DocumentId.fromString("1:2");
String content2 = "{\"message\": \"trying out Elasticsearch 2\", \"property\":\"1\"}";
- testee.index(documentId2, content2, ROUTING);
+ testee.index(documentId2, content2, ROUTING).block();
DocumentId documentId3 = DocumentId.fromString("2:3");
String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"property\":\"2\"}";
- testee.index(documentId3, content3, ROUTING);
+ testee.index(documentId3, content3, ROUTING).block();
elasticSearch.awaitForElasticSearch();
testee.deleteAllMatchingQuery(termQuery("property", "1"), ROUTING);
@@ -198,64 +201,67 @@ class ElasticSearchIndexerTest {
new SearchRequest(INDEX_NAME.getValue())
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
RequestOptions.DEFAULT)
+ .block()
.getHits().getTotalHits() == 1);
}
@Test
- void deleteMessage() throws Exception {
+ void deleteMessage() {
DocumentId documentId = DocumentId.fromString("1:2");
String content = "{\"message\": \"trying out Elasticsearch\"}";
- testee.index(documentId, content, useDocumentId(documentId));
+ testee.index(documentId, content, useDocumentId(documentId)).block();
elasticSearch.awaitForElasticSearch();
- testee.delete(ImmutableList.of(documentId), useDocumentId(documentId));
+ testee.delete(ImmutableList.of(documentId), useDocumentId(documentId)).block();
elasticSearch.awaitForElasticSearch();
SearchResponse searchResponse = client.search(
new SearchRequest(INDEX_NAME.getValue())
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
- RequestOptions.DEFAULT);
+ RequestOptions.DEFAULT)
+ .block();
assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0);
}
@Test
- void deleteShouldWorkWhenMultipleMessages() throws Exception {
+ void deleteShouldWorkWhenMultipleMessages() {
DocumentId documentId = DocumentId.fromString("1:1");
String content = "{\"message\": \"trying out Elasticsearch\", \"mailboxId\":\"1\"}";
- testee.index(documentId, content, ROUTING);
+ testee.index(documentId, content, ROUTING).block();
DocumentId documentId2 = DocumentId.fromString("1:2");
String content2 = "{\"message\": \"trying out Elasticsearch 2\", \"mailboxId\":\"1\"}";
- testee.index(documentId2, content2, ROUTING);
+ testee.index(documentId2, content2, ROUTING).block();
DocumentId documentId3 = DocumentId.fromString("2:3");
String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"mailboxId\":\"2\"}";
- testee.index(documentId3, content3, ROUTING);
+ testee.index(documentId3, content3, ROUTING).block();
elasticSearch.awaitForElasticSearch();
- testee.delete(ImmutableList.of(documentId, documentId3), ROUTING);
+ testee.delete(ImmutableList.of(documentId, documentId3), ROUTING).block();
elasticSearch.awaitForElasticSearch();
SearchResponse searchResponse = client.search(
new SearchRequest(INDEX_NAME.getValue())
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
- RequestOptions.DEFAULT);
+ RequestOptions.DEFAULT)
+ .block();
assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
}
@Test
void updateMessagesShouldNotThrowWhenEmptyList() {
- assertThatCode(() -> testee.update(ImmutableList.of(), ROUTING))
+ assertThatCode(() -> testee.update(ImmutableList.of(), ROUTING).block())
.doesNotThrowAnyException();
}
@Test
void deleteMessagesShouldNotThrowWhenEmptyList() {
- assertThatCode(() -> testee.delete(ImmutableList.of(), ROUTING))
+ assertThatCode(() -> testee.delete(ImmutableList.of(), ROUTING).block())
.doesNotThrowAnyException();
}
}
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java
index 0862711..6ff2f7e 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java
@@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.IOException;
-import org.elasticsearch.client.RestHighLevelClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -35,7 +34,7 @@ class IndexCreationFactoryTest {
@RegisterExtension
public DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
- private RestHighLevelClient client;
+ private ReactorElasticSearchClient client;
@BeforeEach
void setUp() {
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryAuthTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryAuthTest.java
index dddd34a..8c29fca 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryAuthTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryAuthTest.java
@@ -25,7 +25,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.IOException;
import java.util.Optional;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -42,7 +41,7 @@ class NodeMappingFactoryAuthTest {
DockerAuthElasticSearchSingleton.INSTANCE,
new DockerElasticSearch.WithAuth()));
- private RestHighLevelClient client;
+ private ReactorElasticSearchClient client;
@BeforeEach
void setUp(ElasticSearchClusterExtension.ElasticSearchCluster esCluster) throws Exception {
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java
index 7d2f52e..e686473 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java
@@ -24,7 +24,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import java.io.IOException;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -38,7 +37,7 @@ class NodeMappingFactoryTest {
@RegisterExtension
public DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
- private RestHighLevelClient client;
+ private ReactorElasticSearchClient client;
@BeforeEach
void setUp() throws Exception {
diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java
index c4e9e2b..e07368e 100644
--- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java
+++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java
@@ -29,13 +29,13 @@ import org.apache.james.backends.es.ElasticSearchConfiguration;
import org.apache.james.backends.es.IndexCreationFactory;
import org.apache.james.backends.es.IndexName;
import org.apache.james.backends.es.NodeMappingFactory;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.backends.es.ReadAliasName;
import org.awaitility.Duration;
import org.awaitility.core.ConditionFactory;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
@@ -56,7 +56,7 @@ class ScrolledSearchTest {
@RegisterExtension
public DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
- private RestHighLevelClient client;
+ private ReactorElasticSearchClient client;
@BeforeEach
void setUp() {
@@ -81,7 +81,7 @@ class ScrolledSearchTest {
.query(QueryBuilders.matchAllQuery())
.size(SIZE));
- assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+ assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
.isEmpty();
}
@@ -92,7 +92,8 @@ class ScrolledSearchTest {
.type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
.id(id)
.source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT);
+ RequestOptions.DEFAULT)
+ .block();
elasticSearch.awaitForElasticSearch();
WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id));
@@ -103,26 +104,28 @@ class ScrolledSearchTest {
.query(QueryBuilders.matchAllQuery())
.size(SIZE));
- assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+ assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
.extracting(SearchHit::getId)
.containsOnly(id);
}
@Test
- void scrollIterableShouldWorkWhenSizeElement() throws Exception {
+ void scrollIterableShouldWorkWhenSizeElement() {
String id1 = "1";
client.index(new IndexRequest(INDEX_NAME.getValue())
.type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
.id(id1)
.source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT);
+ RequestOptions.DEFAULT)
+ .block();
String id2 = "2";
client.index(new IndexRequest(INDEX_NAME.getValue())
.type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
.id(id2)
.source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT);
+ RequestOptions.DEFAULT)
+ .block();
elasticSearch.awaitForElasticSearch();
WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2));
@@ -133,33 +136,36 @@ class ScrolledSearchTest {
.query(QueryBuilders.matchAllQuery())
.size(SIZE));
- assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+ assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
.extracting(SearchHit::getId)
.containsOnly(id1, id2);
}
@Test
- void scrollIterableShouldWorkWhenMoreThanSizeElement() throws Exception {
+ void scrollIterableShouldWorkWhenMoreThanSizeElement() {
String id1 = "1";
client.index(new IndexRequest(INDEX_NAME.getValue())
.type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
.id(id1)
.source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT);
+ RequestOptions.DEFAULT)
+ .block();
String id2 = "2";
client.index(new IndexRequest(INDEX_NAME.getValue())
.type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
.id(id2)
.source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT);
+ RequestOptions.DEFAULT)
+ .block();
String id3 = "3";
client.index(new IndexRequest(INDEX_NAME.getValue())
.type(NodeMappingFactory.DEFAULT_MAPPING_NAME)
.id(id3)
.source(MESSAGE, "Sample message"),
- RequestOptions.DEFAULT);
+ RequestOptions.DEFAULT)
+ .block();
elasticSearch.awaitForElasticSearch();
WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3));
@@ -170,18 +176,19 @@ class ScrolledSearchTest {
.query(QueryBuilders.matchAllQuery())
.size(SIZE));
- assertThat(new ScrolledSearch(client, searchRequest).searchHits())
+ assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block())
.extracting(SearchHit::getId)
.containsOnly(id1, id2, id3);
}
- private void hasIdsInIndex(RestHighLevelClient client, String... ids) throws IOException {
+ private void hasIdsInIndex(ReactorElasticSearchClient client, String... ids) {
SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue())
.scroll(TIMEOUT)
.source(new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery()));
SearchHit[] hits = client.search(searchRequest, RequestOptions.DEFAULT)
+ .block()
.getHits()
.getHits();
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxIndexCreationUtil.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxIndexCreationUtil.java
index 4b76dad..9871600 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxIndexCreationUtil.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxIndexCreationUtil.java
@@ -25,17 +25,17 @@ import org.apache.james.backends.es.ElasticSearchConfiguration;
import org.apache.james.backends.es.IndexCreationFactory;
import org.apache.james.backends.es.IndexName;
import org.apache.james.backends.es.NodeMappingFactory;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.backends.es.ReadAliasName;
import org.apache.james.backends.es.WriteAliasName;
-import org.elasticsearch.client.RestHighLevelClient;
public class MailboxIndexCreationUtil {
- public static RestHighLevelClient prepareClient(RestHighLevelClient client,
- ReadAliasName readAlias,
- WriteAliasName writeAlias,
- IndexName indexName,
- ElasticSearchConfiguration configuration) throws IOException {
+ public static ReactorElasticSearchClient prepareClient(ReactorElasticSearchClient client,
+ ReadAliasName readAlias,
+ WriteAliasName writeAlias,
+ IndexName indexName,
+ ElasticSearchConfiguration configuration) throws IOException {
return NodeMappingFactory.applyMapping(
new IndexCreationFactory(configuration)
.useIndex(indexName)
@@ -46,7 +46,7 @@ public class MailboxIndexCreationUtil {
MailboxMappingFactory.getMappingContent());
}
- public static RestHighLevelClient prepareDefaultClient(RestHighLevelClient client, ElasticSearchConfiguration configuration) throws IOException {
+ public static ReactorElasticSearchClient prepareDefaultClient(ReactorElasticSearchClient client, ElasticSearchConfiguration configuration) throws IOException {
return prepareClient(client,
MailboxElasticSearchConstants.DEFAULT_MAILBOX_READ_ALIAS,
MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS,
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index c3d8595..1430111 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -52,7 +52,6 @@ import org.apache.james.mailbox.model.UpdatedFlags;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
-import org.apache.james.util.OptionalUtils;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +62,8 @@ import com.github.steveash.guavate.Guavate;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Mono;
+
public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSearchIndex {
public static class ElasticSearchListeningMessageSearchIndexGroup extends Group {
@@ -112,7 +113,8 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
return searcher
.search(ImmutableList.of(mailbox.getMailboxId()), searchQuery, noLimit)
- .map(SearchResult::getMessageUid);
+ .map(SearchResult::getMessageUid)
+ .toStream();
}
@Override
@@ -123,15 +125,14 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
return ImmutableList.of();
}
- try (Stream<SearchResult> searchResults = searcher.search(mailboxIds, searchQuery, Optional.empty())) {
- return searchResults
- .peek(this::logIfNoMessageId)
- .map(SearchResult::getMessageId)
- .flatMap(OptionalUtils::toStream)
- .distinct()
- .limit(limit)
- .collect(Guavate.toImmutableList());
- }
+ return searcher.search(mailboxIds, searchQuery, Optional.empty())
+ .doOnNext(this::logIfNoMessageId)
+ .map(SearchResult::getMessageId)
+ .flatMap(Mono::justOrEmpty)
+ .distinct()
+ .take(limit)
+ .collect(Guavate.toImmutableList())
+ .block();
}
@Override
@@ -144,7 +145,9 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
String jsonContent = generateIndexedJson(mailbox, message, session);
- elasticSearchIndexer.index(indexIdFor(mailbox, message.getUid()), jsonContent, routingKeyFactory.from(mailbox.getMailboxId()));
+ elasticSearchIndexer
+ .index(indexIdFor(mailbox, message.getUid()), jsonContent, routingKeyFactory.from(mailbox.getMailboxId()))
+ .block();
}
private String generateIndexedJson(Mailbox mailbox, MailboxMessage message, MailboxSession session) throws JsonProcessingException {
@@ -162,12 +165,13 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
}
@Override
- public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) throws IOException {
+ public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
elasticSearchIndexer
.delete(expungedUids.stream()
.map(uid -> indexIdFor(mailbox, uid))
.collect(Guavate.toImmutableList()),
- routingKeyFactory.from(mailbox.getMailboxId()));
+ routingKeyFactory.from(mailbox.getMailboxId()))
+ .block();
}
@Override
@@ -181,14 +185,16 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
}
@Override
- public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws IOException {
+ public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
ImmutableList<UpdatedRepresentation> updates = updatedFlagsList.stream()
.map(Throwing.<UpdatedFlags, UpdatedRepresentation>function(
updatedFlags -> createUpdatedDocumentPartFromUpdatedFlags(mailbox, updatedFlags))
.sneakyThrow())
.collect(Guavate.toImmutableList());
- elasticSearchIndexer.update(updates, routingKeyFactory.from(mailbox.getMailboxId()));
+ elasticSearchIndexer
+ .update(updates, routingKeyFactory.from(mailbox.getMailboxId()))
+ .block();
}
private UpdatedRepresentation createUpdatedDocumentPartFromUpdatedFlags(Mailbox mailbox, UpdatedFlags updatedFlags) throws JsonProcessingException {
diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
index 85cceb1..7dd6509 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java
@@ -21,10 +21,10 @@ package org.apache.james.mailbox.elasticsearch.search;
import java.util.Collection;
import java.util.Optional;
-import java.util.stream.Stream;
import org.apache.james.backends.es.AliasName;
import org.apache.james.backends.es.NodeMappingFactory;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.backends.es.ReadAliasName;
import org.apache.james.backends.es.RoutingKey;
import org.apache.james.backends.es.search.ScrolledSearch;
@@ -37,7 +37,6 @@ import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.SearchQuery;
import org.apache.james.mailbox.store.search.MessageSearchIndex;
import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
@@ -47,6 +46,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Flux;
+
public class ElasticSearchSearcher {
public static final int DEFAULT_SEARCH_SIZE = 100;
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchSearcher.class);
@@ -55,7 +56,7 @@ public class ElasticSearchSearcher {
JsonMessageConstants.UID, JsonMessageConstants.MESSAGE_ID);
private static final int MAX_ROUTING_KEY = 5;
- private final RestHighLevelClient client;
+ private final ReactorElasticSearchClient client;
private final QueryConverter queryConverter;
private final int size;
private final MailboxId.Factory mailboxIdFactory;
@@ -63,7 +64,7 @@ public class ElasticSearchSearcher {
private final AliasName aliasName;
private final RoutingKey.Factory<MailboxId> routingKeyFactory;
- public ElasticSearchSearcher(RestHighLevelClient client, QueryConverter queryConverter, int size,
+ public ElasticSearchSearcher(ReactorElasticSearchClient client, QueryConverter queryConverter, int size,
MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory,
ReadAliasName aliasName, RoutingKey.Factory<MailboxId> routingKeyFactory) {
this.client = client;
@@ -75,14 +76,14 @@ public class ElasticSearchSearcher {
this.routingKeyFactory = routingKeyFactory;
}
- public Stream<MessageSearchIndex.SearchResult> search(Collection<MailboxId> mailboxIds, SearchQuery query,
- Optional<Integer> limit) {
+ public Flux<MessageSearchIndex.SearchResult> search(Collection<MailboxId> mailboxIds, SearchQuery query,
+ Optional<Integer> limit) {
SearchRequest searchRequest = prepareSearch(mailboxIds, query, limit);
- Stream<MessageSearchIndex.SearchResult> pairStream = new ScrolledSearch(client, searchRequest)
+ Flux<MessageSearchIndex.SearchResult> pairStream = new ScrolledSearch(client, searchRequest)
.searchHits()
.flatMap(this::extractContentFromHit);
- return limit.map(pairStream::limit)
+ return limit.map(pairStream::take)
.orElse(pairStream);
}
@@ -122,20 +123,20 @@ public class ElasticSearchSearcher {
.orElse(size);
}
- private Stream<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) {
+ private Flux<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) {
DocumentField mailboxId = hit.field(JsonMessageConstants.MAILBOX_ID);
DocumentField uid = hit.field(JsonMessageConstants.UID);
Optional<DocumentField> id = retrieveMessageIdField(hit);
if (mailboxId != null && uid != null) {
Number uidAsNumber = uid.getValue();
- return Stream.of(
+ return Flux.just(
new MessageSearchIndex.SearchResult(
id.map(field -> messageIdFactory.fromString(field.getValue())),
mailboxIdFactory.fromString(mailboxId.getValue()),
MessageUid.of(uidAsNumber.longValue())));
} else {
LOGGER.warn("Can not extract UID, MessageID and/or MailboxId for search result {}", hit.getId());
- return Stream.empty();
+ return Flux.empty();
}
}
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
index 04bc2d6..1813ef3 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
@@ -27,6 +27,7 @@ import java.time.ZoneId;
import org.apache.james.backends.es.DockerElasticSearchExtension;
import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MailboxSessionUtil;
import org.apache.james.mailbox.MessageManager;
@@ -48,7 +49,6 @@ import org.apache.james.mailbox.tika.TikaHttpClientImpl;
import org.apache.james.mailbox.tika.TikaTextExtractor;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.mime4j.dom.Message;
-import org.elasticsearch.client.RestHighLevelClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -67,7 +67,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
TikaTextExtractor textExtractor;
- RestHighLevelClient client;
+ ReactorElasticSearchClient client;
@AfterEach
void tearDown() throws IOException {
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
index f6d1391..95b67f8 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
@@ -33,6 +33,7 @@ import javax.mail.util.SharedByteArrayInputStream;
import org.apache.james.backends.es.DockerElasticSearchExtension;
import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.core.Username;
import org.apache.james.mailbox.DefaultMailboxes;
import org.apache.james.mailbox.MailboxSession;
@@ -73,7 +74,6 @@ import org.apache.james.mailbox.store.extractor.DefaultTextExtractor;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
import org.awaitility.Duration;
-import org.elasticsearch.client.RestHighLevelClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -155,7 +155,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory();
- RestHighLevelClient client = MailboxIndexCreationUtil.prepareDefaultClient(
+ ReactorElasticSearchClient client = MailboxIndexCreationUtil.prepareDefaultClient(
elasticSearch.getDockerElasticSearch().clientProvider().get(),
elasticSearch.getDockerElasticSearch().configuration());
@@ -256,7 +256,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
Thread.sleep(Duration.FIVE_SECONDS.getValueInMS()); // Docker pause is asynchronous and we found no way to poll for it
assertThatThrownBy(() -> testee.add(session, mailbox, MESSAGE_1))
- .isInstanceOf(IOException.class);
+ .hasCauseInstanceOf(IOException.class);
elasticSearch.getDockerElasticSearch().unpause();
}
@@ -330,7 +330,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
Thread.sleep(Duration.FIVE_SECONDS.getValueInMS()); // Docker pause is asynchronous and we found no way to poll for it
assertThatThrownBy(() -> testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1)))
- .isInstanceOf(IOException.class);
+ .hasCauseInstanceOf(IOException.class);
elasticSearch.getDockerElasticSearch().unpause();
}
@@ -413,7 +413,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
.build();
assertThatThrownBy(() -> testee.update(session, mailbox, Lists.newArrayList(updatedFlags)))
- .isInstanceOf(IOException.class);
+ .hasCauseInstanceOf(IOException.class);
elasticSearch.getDockerElasticSearch().unpause();
}
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
index f8be9a0..f7d6564 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java
@@ -30,6 +30,7 @@ import java.util.stream.IntStream;
import org.apache.james.backends.es.DockerElasticSearchExtension;
import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.core.Username;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MailboxSessionUtil;
@@ -58,7 +59,6 @@ import org.apache.james.mailbox.tika.TikaHttpClientImpl;
import org.apache.james.mailbox.tika.TikaTextExtractor;
import org.apache.james.metrics.tests.RecordingMetricFactory;
import org.apache.james.mime4j.dom.Message;
-import org.elasticsearch.client.RestHighLevelClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -80,7 +80,7 @@ class ElasticSearchSearcherTest {
DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
TikaTextExtractor textExtractor;
- RestHighLevelClient client;
+ ReactorElasticSearchClient client;
private InMemoryMailboxManager storeMailboxManager;
@BeforeEach
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java
index 0a1e082..8765345 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java
@@ -21,13 +21,11 @@ package org.apache.james.quota.search.elasticsearch;
import static org.apache.james.quota.search.elasticsearch.json.JsonMessageConstants.USER;
-import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
-import java.util.stream.Stream;
import org.apache.james.backends.es.AliasName;
import org.apache.james.backends.es.NodeMappingFactory;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.backends.es.ReadAliasName;
import org.apache.james.backends.es.search.ScrolledSearch;
import org.apache.james.core.Username;
@@ -35,7 +33,6 @@ import org.apache.james.quota.search.QuotaQuery;
import org.apache.james.quota.search.QuotaSearcher;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -44,14 +41,16 @@ import org.elasticsearch.search.sort.SortOrder;
import com.github.steveash.guavate.Guavate;
+import reactor.core.publisher.Flux;
+
public class ElasticSearchQuotaSearcher implements QuotaSearcher {
private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
- private final RestHighLevelClient client;
+ private final ReactorElasticSearchClient client;
private final AliasName readAlias;
private final QuotaQueryConverter quotaQueryConverter;
- public ElasticSearchQuotaSearcher(RestHighLevelClient client, ReadAliasName readAlias) {
+ public ElasticSearchQuotaSearcher(ReactorElasticSearchClient client, ReadAliasName readAlias) {
this.client = client;
this.readAlias = readAlias;
this.quotaQueryConverter = new QuotaQueryConverter();
@@ -60,18 +59,17 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher {
@Override
public List<Username> search(QuotaQuery query) {
try {
- try (Stream<SearchHit> searchHits = searchHits(query)) {
- return searchHits
- .map(SearchHit::getId)
- .map(Username::of)
- .collect(Guavate.toImmutableList());
- }
- } catch (IOException e) {
+ return searchHits(query)
+ .map(SearchHit::getId)
+ .map(Username::of)
+ .collect(Guavate.toImmutableList())
+ .block();
+ } catch (Exception e) {
throw new RuntimeException("Unexpected exception while executing " + query, e);
}
}
- private Stream<SearchHit> searchHits(QuotaQuery query) throws IOException {
+ private Flux<SearchHit> searchHits(QuotaQuery query) {
if (query.getLimit().isLimited()) {
return executeSingleSearch(query);
} else {
@@ -79,7 +77,7 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher {
}
}
- private Stream<SearchHit> executeSingleSearch(QuotaQuery query) throws IOException {
+ private Flux<SearchHit> executeSingleSearch(QuotaQuery query) {
SearchSourceBuilder searchSourceBuilder = searchSourceBuilder(query)
.from(query.getOffset().getValue());
query.getLimit().getValue()
@@ -89,12 +87,11 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher {
.types(NodeMappingFactory.DEFAULT_MAPPING_NAME)
.source(searchSourceBuilder);
- return Arrays.stream(client.search(searchRequest, RequestOptions.DEFAULT)
- .getHits()
- .getHits());
+ return client.search(searchRequest, RequestOptions.DEFAULT)
+ .flatMapMany(searchResponse -> Flux.fromArray(searchResponse.getHits().getHits()));
}
- private Stream<SearchHit> executeScrolledSearch(QuotaQuery query) {
+ private Flux<SearchHit> executeScrolledSearch(QuotaQuery query) {
return new ScrolledSearch(client,
new SearchRequest(readAlias.getValue())
.types(NodeMappingFactory.DEFAULT_MAPPING_NAME)
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaSearchIndexCreationUtil.java b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaSearchIndexCreationUtil.java
index c1918a9..cf1aad6 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaSearchIndexCreationUtil.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaSearchIndexCreationUtil.java
@@ -26,11 +26,11 @@ import org.apache.james.backends.es.ElasticSearchConfiguration;
import org.apache.james.backends.es.IndexCreationFactory;
import org.apache.james.backends.es.IndexName;
import org.apache.james.backends.es.NodeMappingFactory;
-import org.elasticsearch.client.RestHighLevelClient;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
public class QuotaSearchIndexCreationUtil {
- public static RestHighLevelClient prepareClient(RestHighLevelClient client,
+ public static ReactorElasticSearchClient prepareClient(ReactorElasticSearchClient client,
AliasName readAlias,
AliasName writeAlias,
IndexName indexName,
@@ -46,7 +46,7 @@ public class QuotaSearchIndexCreationUtil {
QuotaRatioMappingFactory.getMappingContent());
}
- public static RestHighLevelClient prepareDefaultClient(RestHighLevelClient client, ElasticSearchConfiguration configuration) throws IOException {
+ public static ReactorElasticSearchClient prepareDefaultClient(ReactorElasticSearchClient client, ElasticSearchConfiguration configuration) throws IOException {
return prepareClient(client,
QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_READ_ALIAS,
QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_WRITE_ALIAS,
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java
index b124c18..0a80a56 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java
@@ -70,9 +70,11 @@ public class ElasticSearchQuotaMailboxListener implements MailboxListener.GroupM
private void handleEvent(QuotaUsageUpdatedEvent event) throws IOException {
Username user = event.getUsername();
- indexer.index(toDocumentId(user),
- quotaRatioToElasticSearchJson.convertToJson(event),
- routingKeyFactory.from(user));
+ indexer
+ .index(toDocumentId(user),
+ quotaRatioToElasticSearchJson.convertToJson(event),
+ routingKeyFactory.from(user))
+ .block();
}
private DocumentId toDocumentId(Username user) {
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
index cc7d8b2..991c5d2 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import org.apache.james.backends.es.DockerElasticSearch;
import org.apache.james.backends.es.DockerElasticSearchSingleton;
import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.dnsservice.api.DNSService;
import org.apache.james.domainlist.memory.MemoryDomainList;
import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
@@ -34,7 +35,6 @@ import org.apache.james.quota.search.QuotaSearchTestSystem;
import org.apache.james.quota.search.elasticsearch.events.ElasticSearchQuotaMailboxListener;
import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson;
import org.apache.james.user.memory.MemoryUsersRepository;
-import org.elasticsearch.client.RestHighLevelClient;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
@@ -45,7 +45,7 @@ import org.junit.jupiter.api.extension.ParameterResolver;
public class ElasticSearchQuotaSearchTestSystemExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback {
private final DockerElasticSearch elasticSearch = DockerElasticSearchSingleton.INSTANCE;
- private RestHighLevelClient client;
+ private ReactorElasticSearchClient client;
@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
index ba482aa..9e5671b 100644
--- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
+++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java
@@ -30,6 +30,7 @@ import org.apache.james.backends.es.DockerElasticSearchExtension;
import org.apache.james.backends.es.ElasticSearchConfiguration;
import org.apache.james.backends.es.ElasticSearchIndexer;
import org.apache.james.backends.es.NodeMappingFactory;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.mailbox.events.Event;
import org.apache.james.mailbox.events.Group;
import org.apache.james.mailbox.quota.QuotaFixture.Counts;
@@ -41,7 +42,6 @@ import org.apache.james.quota.search.elasticsearch.UserRoutingKeyFactory;
import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.AfterEach;
@@ -58,7 +58,7 @@ class ElasticSearchQuotaMailboxListenerTest {
DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension();
ElasticSearchQuotaMailboxListener quotaMailboxListener;
- RestHighLevelClient client;
+ ReactorElasticSearchClient client;
@BeforeEach
void setUp() throws IOException {
@@ -100,10 +100,11 @@ class ElasticSearchQuotaMailboxListenerTest {
elasticSearch.awaitForElasticSearch();
- SearchResponse searchResponse = client.search(new SearchRequest(QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_READ_ALIAS.getValue())
- .types(NodeMappingFactory.DEFAULT_MAPPING_NAME)
- .source(new SearchSourceBuilder()
- .query(QueryBuilders.matchAllQuery())));
+ SearchRequest searchRequest = new SearchRequest(QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_READ_ALIAS.getValue())
+ .types(NodeMappingFactory.DEFAULT_MAPPING_NAME)
+ .source(new SearchSourceBuilder()
+ .query(QueryBuilders.matchAllQuery()));
+ SearchResponse searchResponse = client.search(searchRequest).block();
assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1);
}
diff --git a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
index 38e1d5c..233fb26 100644
--- a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
+++ b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java
@@ -27,6 +27,7 @@ import org.apache.james.backends.es.DockerElasticSearch;
import org.apache.james.backends.es.DockerElasticSearchSingleton;
import org.apache.james.backends.es.ElasticSearchConfiguration;
import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.core.quota.QuotaCountLimit;
import org.apache.james.core.quota.QuotaSizeLimit;
import org.apache.james.imap.api.process.ImapProcessor;
@@ -54,7 +55,6 @@ import org.apache.james.metrics.logger.DefaultMetricFactory;
import org.apache.james.mpt.api.ImapFeatures;
import org.apache.james.mpt.api.ImapFeatures.Feature;
import org.apache.james.mpt.host.JamesImapHostSystem;
-import org.elasticsearch.client.RestHighLevelClient;
public class ElasticSearchHostSystem extends JamesImapHostSystem {
@@ -63,7 +63,7 @@ public class ElasticSearchHostSystem extends JamesImapHostSystem {
private DockerElasticSearch dockerElasticSearch;
private StoreMailboxManager mailboxManager;
- private RestHighLevelClient client;
+ private ReactorElasticSearchClient client;
@Override
public void beforeTest() throws Exception {
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
index 7aab691..ed9b27e 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java
@@ -24,10 +24,10 @@ import java.util.Set;
import org.apache.james.backends.es.ClientProvider;
import org.apache.james.backends.es.ElasticSearchHealthCheck;
import org.apache.james.backends.es.IndexName;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.core.healthcheck.HealthCheck;
import org.apache.james.mailbox.elasticsearch.ElasticSearchMailboxConfiguration;
import org.apache.james.quota.search.elasticsearch.ElasticSearchQuotaConfiguration;
-import org.elasticsearch.client.RestHighLevelClient;
import com.google.common.collect.ImmutableSet;
import com.google.inject.AbstractModule;
@@ -41,7 +41,7 @@ public class ElasticSearchClientModule extends AbstractModule {
@Override
protected void configure() {
bind(ClientProvider.class).in(Scopes.SINGLETON);
- bind(RestHighLevelClient.class).toProvider(ClientProvider.class);
+ bind(ReactorElasticSearchClient.class).toProvider(ClientProvider.class);
Multibinder.newSetBinder(binder(), HealthCheck.class)
.addBinding()
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
index bcc8148..b02c837 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java
@@ -32,6 +32,7 @@ import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.james.backends.es.ElasticSearchConfiguration;
import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.backends.es.RoutingKey;
import org.apache.james.lifecycle.api.StartUpCheck;
import org.apache.james.lifecycle.api.Startable;
@@ -51,7 +52,6 @@ import org.apache.james.mailbox.store.search.MessageSearchIndex;
import org.apache.james.utils.InitializationOperation;
import org.apache.james.utils.InitilizationOperationBuilder;
import org.apache.james.utils.PropertiesProvider;
-import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,12 +68,12 @@ public class ElasticSearchMailboxModule extends AbstractModule {
private final ElasticSearchConfiguration configuration;
private final ElasticSearchMailboxConfiguration mailboxConfiguration;
- private final RestHighLevelClient client;
+ private final ReactorElasticSearchClient client;
@Inject
MailboxIndexCreator(ElasticSearchConfiguration configuration,
ElasticSearchMailboxConfiguration mailboxConfiguration,
- RestHighLevelClient client) {
+ ReactorElasticSearchClient client) {
this.configuration = configuration;
this.mailboxConfiguration = mailboxConfiguration;
this.client = client;
@@ -114,7 +114,7 @@ public class ElasticSearchMailboxModule extends AbstractModule {
@Provides
@Singleton
@Named(MailboxElasticSearchConstants.InjectionNames.MAILBOX)
- private ElasticSearchIndexer createMailboxElasticSearchIndexer(RestHighLevelClient client,
+ private ElasticSearchIndexer createMailboxElasticSearchIndexer(ReactorElasticSearchClient client,
ElasticSearchMailboxConfiguration configuration) {
return new ElasticSearchIndexer(
client,
@@ -123,7 +123,7 @@ public class ElasticSearchMailboxModule extends AbstractModule {
@Provides
@Singleton
- private ElasticSearchSearcher createMailboxElasticSearchSearcher(RestHighLevelClient client,
+ private ElasticSearchSearcher createMailboxElasticSearchSearcher(ReactorElasticSearchClient client,
QueryConverter queryConverter,
MailboxId.Factory mailboxIdFactory,
MessageId.Factory messageIdFactory,
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchQuotaSearcherModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchQuotaSearcherModule.java
index eb4f9b4..123b02f 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchQuotaSearcherModule.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchQuotaSearcherModule.java
@@ -30,6 +30,7 @@ import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.james.backends.es.ElasticSearchConfiguration;
import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.lifecycle.api.Startable;
import org.apache.james.mailbox.events.MailboxListener;
import org.apache.james.quota.search.QuotaSearcher;
@@ -42,7 +43,6 @@ import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearc
import org.apache.james.utils.InitializationOperation;
import org.apache.james.utils.InitilizationOperationBuilder;
import org.apache.james.utils.PropertiesProvider;
-import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,12 +57,12 @@ public class ElasticSearchQuotaSearcherModule extends AbstractModule {
static class ElasticSearchQuotaIndexCreator implements Startable {
private final ElasticSearchConfiguration configuration;
private final ElasticSearchQuotaConfiguration quotaConfiguration;
- private final RestHighLevelClient client;
+ private final ReactorElasticSearchClient client;
@Inject
ElasticSearchQuotaIndexCreator(ElasticSearchConfiguration configuration,
ElasticSearchQuotaConfiguration quotaConfiguration,
- RestHighLevelClient client) {
+ ReactorElasticSearchClient client) {
this.configuration = configuration;
this.quotaConfiguration = quotaConfiguration;
this.client = client;
@@ -88,7 +88,7 @@ public class ElasticSearchQuotaSearcherModule extends AbstractModule {
@Provides
@Singleton
- public QuotaSearcher provideSearcher(RestHighLevelClient client, ElasticSearchQuotaConfiguration configuration) {
+ public QuotaSearcher provideSearcher(ReactorElasticSearchClient client, ElasticSearchQuotaConfiguration configuration) {
return new ElasticSearchQuotaSearcher(client,
configuration.getReadAliasQuotaRatioName());
}
@@ -107,7 +107,7 @@ public class ElasticSearchQuotaSearcherModule extends AbstractModule {
@Provides
@Singleton
- public ElasticSearchQuotaMailboxListener provideListener(RestHighLevelClient client,
+ public ElasticSearchQuotaMailboxListener provideListener(ReactorElasticSearchClient client,
ElasticSearchQuotaConfiguration configuration) {
return new ElasticSearchQuotaMailboxListener(
new ElasticSearchIndexer(client,
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchStartUpCheck.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchStartUpCheck.java
index 4bf89d7..3565b38 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchStartUpCheck.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchStartUpCheck.java
@@ -24,10 +24,10 @@ import java.io.IOException;
import javax.inject.Inject;
import org.apache.james.backends.es.ElasticSearchConfiguration;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.lifecycle.api.StartUpCheck;
import org.elasticsearch.Version;
import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,10 +40,10 @@ public class ElasticSearchStartUpCheck implements StartUpCheck {
public static final String CHECK_NAME = "ElasticSearchStartUpCheck";
- private final RestHighLevelClient client;
+ private final ReactorElasticSearchClient client;
@Inject
- private ElasticSearchStartUpCheck(RestHighLevelClient client) {
+ private ElasticSearchStartUpCheck(ReactorElasticSearchClient client) {
this.client = client;
}
diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/ESReporterTest.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/ESReporterTest.java
index 23e7cff..81aa4c0 100644
--- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/ESReporterTest.java
+++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/ESReporterTest.java
@@ -34,6 +34,7 @@ import java.util.TimerTask;
import java.util.stream.Collectors;
import org.apache.commons.net.imap.IMAPClient;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.core.Username;
import org.apache.james.jmap.AccessToken;
import org.apache.james.jmap.draft.JmapGuiceProbe;
@@ -45,7 +46,6 @@ import org.apache.james.modules.protocols.ImapGuiceProbe;
import org.apache.james.utils.DataProbeImpl;
import org.awaitility.Duration;
import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.AfterEach;
@@ -152,12 +152,13 @@ class ESReporterTest {
}
private boolean checkMetricRecordedInElasticSearch() {
- try (RestHighLevelClient client = elasticSearchExtension.getDockerES().clientProvider().get()) {
+ try (ReactorElasticSearchClient client = elasticSearchExtension.getDockerES().clientProvider().get()) {
SearchRequest searchRequest = new SearchRequest()
.source(new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery()));
return !Arrays.stream(client
.search(searchRequest)
+ .block()
.getHits()
.getHits())
.filter(searchHit -> searchHit.getIndex().startsWith(TestDockerESMetricReporterModule.METRICS_INDEX))
diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
index e57771f..392b859 100644
--- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
+++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java
@@ -27,6 +27,7 @@ import org.apache.james.backends.es.DockerElasticSearch;
import org.apache.james.backends.es.DockerElasticSearchSingleton;
import org.apache.james.backends.es.ElasticSearchConfiguration;
import org.apache.james.backends.es.ElasticSearchIndexer;
+import org.apache.james.backends.es.ReactorElasticSearchClient;
import org.apache.james.dnsservice.api.DNSService;
import org.apache.james.domainlist.memory.MemoryDomainList;
import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
@@ -39,7 +40,6 @@ import org.apache.james.quota.search.elasticsearch.UserRoutingKeyFactory;
import org.apache.james.quota.search.elasticsearch.events.ElasticSearchQuotaMailboxListener;
import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson;
import org.apache.james.user.memory.MemoryUsersRepository;
-import org.elasticsearch.client.RestHighLevelClient;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
@@ -53,7 +53,7 @@ public class ElasticSearchQuotaSearchExtension implements ParameterResolver, Bef
private final DockerElasticSearch elasticSearch = DockerElasticSearchSingleton.INSTANCE;
private WebAdminQuotaSearchTestSystem restQuotaSearchTestSystem;
private TemporaryFolder temporaryFolder = new TemporaryFolder();
- private RestHighLevelClient client;
+ private ReactorElasticSearchClient client;
@Override
public void beforeEach(ExtensionContext context) {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org