You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/17 15:30:39 UTC
[james-project] 05/16: JAMES-3117 Reactive ElasticSearch healthCHeck
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7c8d8b19443879fc967ffaf679eca28258e68c08
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 17 08:18:03 2020 +0700
JAMES-3117 Reactive ElasticSearch healthCHeck
---
.../james/backends/es/ElasticSearchHealthCheck.java | 17 +++++++----------
.../james/backends/es/ReactorElasticSearchClient.java | 12 +++++++-----
2 files changed, 14 insertions(+), 15 deletions(-)
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java
index 89037e7..4dded30 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java
@@ -30,11 +30,12 @@ import org.apache.james.core.healthcheck.HealthCheck;
import org.apache.james.core.healthcheck.Result;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Requests;
import com.google.common.annotations.VisibleForTesting;
+import reactor.core.publisher.Mono;
+
public class ElasticSearchHealthCheck implements HealthCheck {
private static final ComponentName COMPONENT_NAME = new ComponentName("ElasticSearch Backend");
@@ -54,20 +55,16 @@ public class ElasticSearchHealthCheck implements HealthCheck {
}
@Override
- public Result check() {
+ public Mono<Result> checkReactive() {
String[] indices = indexNames.stream()
.map(IndexName::getValue)
.toArray(String[]::new);
ClusterHealthRequest request = Requests.clusterHealthRequest(indices);
- try {
- ClusterHealthResponse response = client.cluster()
- .health(request, RequestOptions.DEFAULT);
-
- return toHealthCheckResult(response);
- } catch (IOException e) {
- return Result.unhealthy(COMPONENT_NAME, "Error while contacting cluster", e);
- }
+ return client.health(request)
+ .map(this::toHealthCheckResult)
+ .onErrorResume(IOException.class, e -> Mono.just(
+ Result.unhealthy(COMPONENT_NAME, "Error while contacting cluster", e)));
}
@VisibleForTesting
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java
index 3b0c18b..4885b08 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java
@@ -23,6 +23,8 @@ import java.io.IOException;
import java.util.function.Consumer;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptResponse;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
@@ -45,7 +47,6 @@ import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
-import org.elasticsearch.client.ClusterClient;
import org.elasticsearch.client.IndicesClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
@@ -76,10 +77,6 @@ public class ReactorElasticSearchClient implements AutoCloseable {
return toReactor(listener -> client.clearScrollAsync(clearScrollRequest, options, listener));
}
- public ClusterClient cluster() {
- return client.cluster();
- }
-
public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException {
return client.delete(deleteRequest, options);
}
@@ -141,6 +138,11 @@ public class ReactorElasticSearchClient implements AutoCloseable {
return toReactor(listener -> client.searchAsync(searchRequest, options, listener));
}
+ public Mono<ClusterHealthResponse> health(ClusterHealthRequest request) {
+ return toReactor(listener -> client.cluster()
+ .healthAsync(request, RequestOptions.DEFAULT, listener));
+ }
+
public Mono<SearchTemplateResponse> searchTemplate(SearchTemplateRequest searchTemplateRequest, RequestOptions options) {
return toReactor(listener -> client.searchTemplateAsync(searchTemplateRequest, options, listener));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org