You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/17 15:30:39 UTC

[james-project] 05/16: JAMES-3117 Reactive ElasticSearch healthCHeck

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7c8d8b19443879fc967ffaf679eca28258e68c08
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 17 08:18:03 2020 +0700

    JAMES-3117 Reactive ElasticSearch healthCHeck
---
 .../james/backends/es/ElasticSearchHealthCheck.java     | 17 +++++++----------
 .../james/backends/es/ReactorElasticSearchClient.java   | 12 +++++++-----
 2 files changed, 14 insertions(+), 15 deletions(-)

diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java
index 89037e7..4dded30 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java
@@ -30,11 +30,12 @@ import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.core.healthcheck.Result;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.Requests;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import reactor.core.publisher.Mono;
+
 
 public class ElasticSearchHealthCheck implements HealthCheck {
     private static final ComponentName COMPONENT_NAME = new ComponentName("ElasticSearch Backend");
@@ -54,20 +55,16 @@ public class ElasticSearchHealthCheck implements HealthCheck {
     }
 
     @Override
-    public Result check() {
+    public Mono<Result> checkReactive() {
         String[] indices = indexNames.stream()
             .map(IndexName::getValue)
             .toArray(String[]::new);
         ClusterHealthRequest request = Requests.clusterHealthRequest(indices);
 
-        try {
-            ClusterHealthResponse response = client.cluster()
-                .health(request, RequestOptions.DEFAULT);
-
-            return toHealthCheckResult(response);
-        } catch (IOException e) {
-            return Result.unhealthy(COMPONENT_NAME, "Error while contacting cluster", e);
-        }
+        return client.health(request)
+            .map(this::toHealthCheckResult)
+            .onErrorResume(IOException.class, e -> Mono.just(
+                Result.unhealthy(COMPONENT_NAME, "Error while contacting cluster", e)));
     }
 
     @VisibleForTesting
diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java
index 3b0c18b..4885b08 100644
--- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java
+++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.util.function.Consumer;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
 import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptResponse;
 import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
@@ -45,7 +47,6 @@ import org.elasticsearch.action.search.MultiSearchResponse;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchScrollRequest;
-import org.elasticsearch.client.ClusterClient;
 import org.elasticsearch.client.IndicesClient;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RestClient;
@@ -76,10 +77,6 @@ public class ReactorElasticSearchClient implements AutoCloseable {
         return toReactor(listener -> client.clearScrollAsync(clearScrollRequest, options, listener));
     }
 
-    public ClusterClient cluster() {
-        return client.cluster();
-    }
-
     public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException {
         return client.delete(deleteRequest, options);
     }
@@ -141,6 +138,11 @@ public class ReactorElasticSearchClient implements AutoCloseable {
         return toReactor(listener -> client.searchAsync(searchRequest, options, listener));
     }
 
+    public Mono<ClusterHealthResponse> health(ClusterHealthRequest request) {
+        return toReactor(listener -> client.cluster()
+            .healthAsync(request, RequestOptions.DEFAULT, listener));
+    }
+
     public Mono<SearchTemplateResponse> searchTemplate(SearchTemplateRequest searchTemplateRequest, RequestOptions options) {
         return toReactor(listener -> client.searchTemplateAsync(searchTemplateRequest, options, listener));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org