You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2020/07/14 22:16:19 UTC

[skywalking] 02/02: Add HealthChecker helper

This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch storage-elasticsearch-health
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 8e16d6178c385d4dc123332c9d977336e7869994
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Jul 15 06:15:15 2020 +0800

    Add HealthChecker helper
    
    Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
 .../src/main/resources/application.yml             |   1 +
 .../client/elasticsearch/ElasticSearchClient.java  | 115 ++++++++++++++++-----
 .../library/client/healthcheck/HealthChecker.java  |  49 +++++++++
 .../library/client/healthcheck/HealthListener.java |  30 ++++++
 .../client/jdbc/hikaricp/JDBCHikariCPClient.java   |  24 ++---
 .../StorageModuleElasticsearchConfig.java          |   2 +
 .../StorageModuleElasticsearchProvider.java        |  11 ++
 .../storage/plugin/jdbc/h2/H2StorageProvider.java  |   8 +-
 8 files changed, 195 insertions(+), 45 deletions(-)

diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index 0e89003..4293c6f 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -104,6 +104,7 @@ storage:
     segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200}
     profileTaskQueryMaxSize: ${SW_STORAGE_ES_QUERY_PROFILE_TASK_SIZE:200}
     advanced: ${SW_STORAGE_ES_ADVANCED:""}
+    enableHealthCheck: ${SW_STORAGE_ES_ENABLE_HEALTH_CHECK:false}
   elasticsearch7:
     nameSpace: ${SW_NAMESPACE:""}
     clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 03193ea..059d15f 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -37,6 +37,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.net.ssl.SSLContext;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -58,6 +59,8 @@ import org.apache.http.ssl.SSLContextBuilder;
 import org.apache.http.ssl.SSLContexts;
 import org.apache.skywalking.apm.util.StringUtil;
 import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.client.healthcheck.HealthChecker;
+import org.apache.skywalking.oap.server.library.client.healthcheck.HealthListener;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -105,6 +108,8 @@ public class ElasticSearchClient implements Client {
     private volatile String password;
     private final List<IndexNameConverter> indexNameConverters;
     protected volatile RestHighLevelClient client;
+    private HealthChecker healthChecker = HealthChecker.DEFAULT_CHECKER;
+    private final ReentrantLock connectLock = new ReentrantLock();
 
     public ElasticSearchClient(String clusterNodes,
                                String protocol,
@@ -124,16 +129,25 @@ public class ElasticSearchClient implements Client {
 
     @Override
     public void connect() throws IOException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException, CertificateException {
-        List<HttpHost> hosts = parseClusterNodes(protocol, clusterNodes);
-        if (client != null) {
-            try {
-                client.close();
-            } catch (Throwable t) {
-                log.error("ElasticSearch client reconnection fails based on new config", t);
+        connectLock.lock();
+        try {
+            List<HttpHost> hosts = parseClusterNodes(protocol, clusterNodes);
+            if (client != null) {
+                try {
+                    client.close();
+                } catch (Throwable t) {
+                    log.error("ElasticSearch client reconnection fails based on new config", t);
+                }
             }
+            client = createClient(hosts);
+            client.ping(RequestOptions.DEFAULT);
+        } finally {
+            connectLock.unlock();
         }
-        client = createClient(hosts);
-        client.ping(RequestOptions.DEFAULT);
+    }
+
+    public void activeHealthChecker(HealthListener healthListener) {
+        healthChecker = new HealthChecker(healthListener);
     }
 
     protected RestHighLevelClient createClient(
@@ -210,10 +224,23 @@ public class ElasticSearchClient implements Client {
 
     public List<String> retrievalIndexByAliases(String aliases) throws IOException {
         aliases = formatIndexName(aliases);
-        Response response = client.getLowLevelClient().performRequest(new Request(HttpGet.METHOD_NAME, "/_alias/" + aliases));
+        Response response;
+        try {
+            response = client.getLowLevelClient().performRequest(new Request(HttpGet.METHOD_NAME, "/_alias/" + aliases));
+            healthChecker.health();
+        } catch (Throwable t) {
+            healthChecker.unHealth(t);
+            throw t;
+        }
         if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) {
             Gson gson = new Gson();
-            InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
+            InputStreamReader reader;
+            try {
+                reader = new InputStreamReader(response.getEntity().getContent());
+            } catch (Throwable t) {
+                healthChecker.unHealth(t);
+                throw t;
+            }
             JsonObject responseJson = gson.fromJson(reader, JsonObject.class);
             log.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson);
             return new ArrayList<>(responseJson.keySet());
@@ -308,13 +335,39 @@ public class ElasticSearchClient implements Client {
         SearchRequest searchRequest = new SearchRequest(indexName);
         searchRequest.types(TYPE);
         searchRequest.source(searchSourceBuilder);
-        return client.search(searchRequest, RequestOptions.DEFAULT);
+        try {
+            SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
+            healthChecker.health();
+            return response;
+        } catch (Throwable t) {
+            healthChecker.unHealth(t);
+            if (t instanceof IllegalStateException) {
+                IllegalStateException ise = (IllegalStateException) t;
+                // Fixed the issue described in https://github.com/elastic/elasticsearch/issues/39946
+                if (ise.getMessage().contains("I/O reactor status: STOPPED") &&
+                    connectLock.tryLock()) {
+                    try {
+                        connect();
+                    } catch (KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) {
+                        throw new IllegalStateException("Can't reconnect to Elasticsearch", e);
+                    }
+                }
+            }
+            throw t;
+        }
     }
 
     public GetResponse get(String indexName, String id) throws IOException {
         indexName = formatIndexName(indexName);
         GetRequest request = new GetRequest(indexName, TYPE, id);
-        return client.get(request, RequestOptions.DEFAULT);
+        try {
+            GetResponse response = client.get(request, RequestOptions.DEFAULT);
+            healthChecker.health();
+            return response;
+        } catch (Throwable t) {
+            healthChecker.unHealth(t);
+            throw t;
+        }
     }
 
     public SearchResponse ids(String indexName, String[] ids) throws IOException {
@@ -323,28 +376,39 @@ public class ElasticSearchClient implements Client {
         SearchRequest searchRequest = new SearchRequest(indexName);
         searchRequest.types(TYPE);
         searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length);
-        return client.search(searchRequest, RequestOptions.DEFAULT);
+        try {
+            SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
+            healthChecker.health();
+            return response;
+        } catch (Throwable t) {
+            healthChecker.unHealth(t);
+            throw t;
+        }
     }
 
     public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
         IndexRequest request = (IndexRequest) prepareInsert(indexName, id, source);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.index(request, RequestOptions.DEFAULT);
-    }
-
-    public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
-        org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
-            indexName, id, source);
-        request.version(version);
-        request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.update(request, RequestOptions.DEFAULT);
+        try {
+            client.index(request, RequestOptions.DEFAULT);
+            healthChecker.health();
+        } catch (Throwable t) {
+            healthChecker.unHealth(t);
+            throw t;
+        }
     }
 
     public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
         org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
             indexName, id, source);
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        client.update(request, RequestOptions.DEFAULT);
+        try {
+            client.update(request, RequestOptions.DEFAULT);
+            healthChecker.health();
+        } catch (Throwable t) {
+            healthChecker.unHealth(t);
+            throw t;
+        }
     }
 
     public InsertRequest prepareInsert(String indexName, String id, XContentBuilder source) {
@@ -377,8 +441,9 @@ public class ElasticSearchClient implements Client {
             int size = request.requests().size();
             BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT);
             log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
-        } catch (IOException e) {
-            log.error(e.getMessage(), e);
+            healthChecker.health();
+        } catch (Throwable t) {
+            healthChecker.unHealth(t);
         }
     }
 
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthChecker.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthChecker.java
new file mode 100644
index 0000000..49de2b4
--- /dev/null
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthChecker.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.healthcheck;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * HealthChecker could provide health status to the listener.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class HealthChecker {
+    public static final HealthChecker DEFAULT_CHECKER = new HealthChecker(health -> { });
+
+    private final HealthListener listener;
+
+    /**
+     * Invoking when service is healthy.
+     */
+    public void health() {
+        listener.listen(true);
+    }
+
+    /**
+     * Invoking when service is unhealthy.
+     * @param t the reason of unhealthy status.
+     */
+    public void unHealth(Throwable t) {
+        log.error("Elasticsearch health check is failed", t);
+        listener.listen(false);
+    }
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthListener.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthListener.java
new file mode 100644
index 0000000..700db1c
--- /dev/null
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.healthcheck;
+
+/**
+ * HealthChecker checks service health.
+ */
+public interface HealthListener {
+
+    /**
+     * Listening health status.
+     */
+    void listen(boolean isHealthy);
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
index f99a6a0..6cf15de 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
@@ -26,11 +26,9 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.client.healthcheck.HealthChecker;
+import org.apache.skywalking.oap.server.library.client.healthcheck.HealthListener;
 import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,20 +41,14 @@ public class JDBCHikariCPClient implements Client {
 
     private HikariDataSource dataSource;
     private HikariConfig hikariConfig;
+    private HealthChecker healthChecker = HealthChecker.DEFAULT_CHECKER;
 
     public JDBCHikariCPClient(Properties properties) {
         hikariConfig = new HikariConfig(properties);
     }
 
-    public void setHealthCheckListener(Consumer<Boolean> healthListener) {
-        ScheduledExecutorService asyncHealthScheduler = Executors.newSingleThreadScheduledExecutor();
-        asyncHealthScheduler.scheduleAtFixedRate(() -> {
-            try (Connection c = dataSource.getConnection()) {
-                healthListener.accept(true);
-            } catch (SQLException ignored) {
-                healthListener.accept(false);
-            }
-        }, 0, 3, TimeUnit.SECONDS);
+    public void activeHealthCheck(HealthListener listener) {
+        healthChecker = new HealthChecker(listener);
     }
 
     @Override
@@ -93,7 +85,9 @@ public class JDBCHikariCPClient implements Client {
         logger.debug("execute aql: {}", sql);
         try (Statement statement = connection.createStatement()) {
             statement.execute(sql);
+            healthChecker.health();
         } catch (SQLException e) {
+            healthChecker.unHealth(e);
             throw new JDBCClientException(e.getMessage(), e);
         }
     }
@@ -107,6 +101,7 @@ public class JDBCHikariCPClient implements Client {
             setStatementParam(statement, params);
             result = statement.execute();
             statement.closeOnCompletion();
+            healthChecker.health();
         } catch (SQLException e) {
             if (statement != null) {
                 try {
@@ -114,6 +109,7 @@ public class JDBCHikariCPClient implements Client {
                 } catch (SQLException e1) {
                 }
             }
+            healthChecker.unHealth(e);
             throw new JDBCClientException(e.getMessage(), e);
         }
 
@@ -129,6 +125,7 @@ public class JDBCHikariCPClient implements Client {
             setStatementParam(statement, params);
             rs = statement.executeQuery();
             statement.closeOnCompletion();
+            healthChecker.health();
         } catch (SQLException e) {
             if (statement != null) {
                 try {
@@ -136,6 +133,7 @@ public class JDBCHikariCPClient implements Client {
                 } catch (SQLException e1) {
                 }
             }
+            healthChecker.unHealth(e);
             throw new JDBCClientException(e.getMessage(), e);
         }
 
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index a33e992..5435e72 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -92,5 +92,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
     private int profileTaskQueryMaxSize = 200;
     @Setter
     private String advanced;
+    @Setter
+    private boolean enableHealthCheck;
 
 }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 9f591cb..8622d63 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -74,6 +74,10 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopNR
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
 import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.UITemplateManagementEsDAO;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 
 /**
  * The storage provider for ElasticSearch 6.
@@ -181,6 +185,9 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
 
     @Override
     public void start() throws ModuleStartException {
+        MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
+        GaugeMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_elasticsearch", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+        healthChecker.setValue(1);
         try {
             elasticSearchClient.connect();
             StorageEsInstaller installer = new StorageEsInstaller(elasticSearchClient, getManager(), config);
@@ -189,6 +196,10 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
         } catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
+        if (!config.isEnableHealthCheck()) {
+            return;
+        }
+        elasticSearchClient.activeHealthChecker(isHealthy -> healthChecker.setValue(isHealthy ? 0 : 1));
     }
 
     @Override
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index 067bf2c..88288cc 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -146,13 +146,7 @@ public class H2StorageProvider extends ModuleProvider {
         } catch (StorageException e) {
             throw new ModuleStartException(e.getMessage(), e);
         }
-        h2Client.setHealthCheckListener(isHealthy -> {
-            if (isHealthy) {
-                healthChecker.setValue(0);
-            } else {
-                healthChecker.setValue(1);
-            }
-        });
+        h2Client.activeHealthCheck(healthy -> healthChecker.setValue(healthy ? 0 : 1));
     }
 
     @Override