You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ha...@apache.org on 2020/07/14 22:16:19 UTC
[skywalking] 02/02: Add HealthChecker helper
This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch storage-elasticsearch-health
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 8e16d6178c385d4dc123332c9d977336e7869994
Author: Gao Hongtao <ha...@gmail.com>
AuthorDate: Wed Jul 15 06:15:15 2020 +0800
Add HealthChecker helper
Signed-off-by: Gao Hongtao <ha...@gmail.com>
---
.../src/main/resources/application.yml | 1 +
.../client/elasticsearch/ElasticSearchClient.java | 115 ++++++++++++++++-----
.../library/client/healthcheck/HealthChecker.java | 49 +++++++++
.../library/client/healthcheck/HealthListener.java | 30 ++++++
.../client/jdbc/hikaricp/JDBCHikariCPClient.java | 24 ++---
.../StorageModuleElasticsearchConfig.java | 2 +
.../StorageModuleElasticsearchProvider.java | 11 ++
.../storage/plugin/jdbc/h2/H2StorageProvider.java | 8 +-
8 files changed, 195 insertions(+), 45 deletions(-)
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index 0e89003..4293c6f 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -104,6 +104,7 @@ storage:
segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200}
profileTaskQueryMaxSize: ${SW_STORAGE_ES_QUERY_PROFILE_TASK_SIZE:200}
advanced: ${SW_STORAGE_ES_ADVANCED:""}
+ enableHealthCheck: ${SW_STORAGE_ES_ENABLE_HEALTH_CHECK:false}
elasticsearch7:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 03193ea..059d15f 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -37,6 +37,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@@ -58,6 +59,8 @@ import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.client.healthcheck.HealthChecker;
+import org.apache.skywalking.oap.server.library.client.healthcheck.HealthListener;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -105,6 +108,8 @@ public class ElasticSearchClient implements Client {
private volatile String password;
private final List<IndexNameConverter> indexNameConverters;
protected volatile RestHighLevelClient client;
+ private HealthChecker healthChecker = HealthChecker.DEFAULT_CHECKER;
+ private final ReentrantLock connectLock = new ReentrantLock();
public ElasticSearchClient(String clusterNodes,
String protocol,
@@ -124,16 +129,25 @@ public class ElasticSearchClient implements Client {
@Override
public void connect() throws IOException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException, CertificateException {
- List<HttpHost> hosts = parseClusterNodes(protocol, clusterNodes);
- if (client != null) {
- try {
- client.close();
- } catch (Throwable t) {
- log.error("ElasticSearch client reconnection fails based on new config", t);
+ connectLock.lock();
+ try {
+ List<HttpHost> hosts = parseClusterNodes(protocol, clusterNodes);
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Throwable t) {
+ log.error("ElasticSearch client reconnection fails based on new config", t);
+ }
}
+ client = createClient(hosts);
+ client.ping(RequestOptions.DEFAULT);
+ } finally {
+ connectLock.unlock();
}
- client = createClient(hosts);
- client.ping(RequestOptions.DEFAULT);
+ }
+
+ public void activeHealthChecker(HealthListener healthListener) {
+ healthChecker = new HealthChecker(healthListener);
}
protected RestHighLevelClient createClient(
@@ -210,10 +224,23 @@ public class ElasticSearchClient implements Client {
public List<String> retrievalIndexByAliases(String aliases) throws IOException {
aliases = formatIndexName(aliases);
- Response response = client.getLowLevelClient().performRequest(new Request(HttpGet.METHOD_NAME, "/_alias/" + aliases));
+ Response response;
+ try {
+ response = client.getLowLevelClient().performRequest(new Request(HttpGet.METHOD_NAME, "/_alias/" + aliases));
+ healthChecker.health();
+ } catch (Throwable t) {
+ healthChecker.unHealth(t);
+ throw t;
+ }
if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) {
Gson gson = new Gson();
- InputStreamReader reader = new InputStreamReader(response.getEntity().getContent());
+ InputStreamReader reader;
+ try {
+ reader = new InputStreamReader(response.getEntity().getContent());
+ } catch (Throwable t) {
+ healthChecker.unHealth(t);
+ throw t;
+ }
JsonObject responseJson = gson.fromJson(reader, JsonObject.class);
log.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson);
return new ArrayList<>(responseJson.keySet());
@@ -308,13 +335,39 @@ public class ElasticSearchClient implements Client {
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(TYPE);
searchRequest.source(searchSourceBuilder);
- return client.search(searchRequest, RequestOptions.DEFAULT);
+ try {
+ SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
+ healthChecker.health();
+ return response;
+ } catch (Throwable t) {
+ healthChecker.unHealth(t);
+ if (t instanceof IllegalStateException) {
+ IllegalStateException ise = (IllegalStateException) t;
+ // Fixed the issue described in https://github.com/elastic/elasticsearch/issues/39946
+ if (ise.getMessage().contains("I/O reactor status: STOPPED") &&
+ connectLock.tryLock()) {
+ try {
+ connect();
+ } catch (KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) {
+ throw new IllegalStateException("Can't reconnect to Elasticsearch", e);
+ }
+ }
+ }
+ throw t;
+ }
}
public GetResponse get(String indexName, String id) throws IOException {
indexName = formatIndexName(indexName);
GetRequest request = new GetRequest(indexName, TYPE, id);
- return client.get(request, RequestOptions.DEFAULT);
+ try {
+ GetResponse response = client.get(request, RequestOptions.DEFAULT);
+ healthChecker.health();
+ return response;
+ } catch (Throwable t) {
+ healthChecker.unHealth(t);
+ throw t;
+ }
}
public SearchResponse ids(String indexName, String[] ids) throws IOException {
@@ -323,28 +376,39 @@ public class ElasticSearchClient implements Client {
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(TYPE);
searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length);
- return client.search(searchRequest, RequestOptions.DEFAULT);
+ try {
+ SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
+ healthChecker.health();
+ return response;
+ } catch (Throwable t) {
+ healthChecker.unHealth(t);
+ throw t;
+ }
}
public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
IndexRequest request = (IndexRequest) prepareInsert(indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- client.index(request, RequestOptions.DEFAULT);
- }
-
- public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
- org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
- indexName, id, source);
- request.version(version);
- request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- client.update(request, RequestOptions.DEFAULT);
+ try {
+ client.index(request, RequestOptions.DEFAULT);
+ healthChecker.health();
+ } catch (Throwable t) {
+ healthChecker.unHealth(t);
+ throw t;
+ }
}
public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(
indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
- client.update(request, RequestOptions.DEFAULT);
+ try {
+ client.update(request, RequestOptions.DEFAULT);
+ healthChecker.health();
+ } catch (Throwable t) {
+ healthChecker.unHealth(t);
+ throw t;
+ }
}
public InsertRequest prepareInsert(String indexName, String id, XContentBuilder source) {
@@ -377,8 +441,9 @@ public class ElasticSearchClient implements Client {
int size = request.requests().size();
BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT);
log.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
- } catch (IOException e) {
- log.error(e.getMessage(), e);
+ healthChecker.health();
+ } catch (Throwable t) {
+ healthChecker.unHealth(t);
}
}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthChecker.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthChecker.java
new file mode 100644
index 0000000..49de2b4
--- /dev/null
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthChecker.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.healthcheck;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * HealthChecker could provide health status to the listener.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class HealthChecker {
+ public static final HealthChecker DEFAULT_CHECKER = new HealthChecker(health -> { });
+
+ private final HealthListener listener;
+
+ /**
+ * Invoking when service is healthy.
+ */
+ public void health() {
+ listener.listen(true);
+ }
+
+ /**
+ * Invoking when service is unhealthy.
+ * @param t the reason of unhealthy status.
+ */
+ public void unHealth(Throwable t) {
+ log.error("Elasticsearch health check is failed", t);
+ listener.listen(false);
+ }
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthListener.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthListener.java
new file mode 100644
index 0000000..700db1c
--- /dev/null
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/healthcheck/HealthListener.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.healthcheck;
+
+/**
+ * HealthChecker checks service health.
+ */
+public interface HealthListener {
+
+ /**
+ * Listening health status.
+ */
+ void listen(boolean isHealthy);
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
index f99a6a0..6cf15de 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
@@ -26,11 +26,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.client.healthcheck.HealthChecker;
+import org.apache.skywalking.oap.server.library.client.healthcheck.HealthListener;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,20 +41,14 @@ public class JDBCHikariCPClient implements Client {
private HikariDataSource dataSource;
private HikariConfig hikariConfig;
+ private HealthChecker healthChecker = HealthChecker.DEFAULT_CHECKER;
public JDBCHikariCPClient(Properties properties) {
hikariConfig = new HikariConfig(properties);
}
- public void setHealthCheckListener(Consumer<Boolean> healthListener) {
- ScheduledExecutorService asyncHealthScheduler = Executors.newSingleThreadScheduledExecutor();
- asyncHealthScheduler.scheduleAtFixedRate(() -> {
- try (Connection c = dataSource.getConnection()) {
- healthListener.accept(true);
- } catch (SQLException ignored) {
- healthListener.accept(false);
- }
- }, 0, 3, TimeUnit.SECONDS);
+ public void activeHealthCheck(HealthListener listener) {
+ healthChecker = new HealthChecker(listener);
}
@Override
@@ -93,7 +85,9 @@ public class JDBCHikariCPClient implements Client {
logger.debug("execute aql: {}", sql);
try (Statement statement = connection.createStatement()) {
statement.execute(sql);
+ healthChecker.health();
} catch (SQLException e) {
+ healthChecker.unHealth(e);
throw new JDBCClientException(e.getMessage(), e);
}
}
@@ -107,6 +101,7 @@ public class JDBCHikariCPClient implements Client {
setStatementParam(statement, params);
result = statement.execute();
statement.closeOnCompletion();
+ healthChecker.health();
} catch (SQLException e) {
if (statement != null) {
try {
@@ -114,6 +109,7 @@ public class JDBCHikariCPClient implements Client {
} catch (SQLException e1) {
}
}
+ healthChecker.unHealth(e);
throw new JDBCClientException(e.getMessage(), e);
}
@@ -129,6 +125,7 @@ public class JDBCHikariCPClient implements Client {
setStatementParam(statement, params);
rs = statement.executeQuery();
statement.closeOnCompletion();
+ healthChecker.health();
} catch (SQLException e) {
if (statement != null) {
try {
@@ -136,6 +133,7 @@ public class JDBCHikariCPClient implements Client {
} catch (SQLException e1) {
}
}
+ healthChecker.unHealth(e);
throw new JDBCClientException(e.getMessage(), e);
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index a33e992..5435e72 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -92,5 +92,7 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
private int profileTaskQueryMaxSize = 200;
@Setter
private String advanced;
+ @Setter
+ private boolean enableHealthCheck;
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 9f591cb..8622d63 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -74,6 +74,10 @@ import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopNR
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.UITemplateManagementEsDAO;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.GaugeMetrics;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
/**
* The storage provider for ElasticSearch 6.
@@ -181,6 +185,9 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
@Override
public void start() throws ModuleStartException {
+ MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME).provider().getService(MetricsCreator.class);
+ GaugeMetrics healthChecker = metricCreator.createHealthCheckerGauge("storage_elasticsearch", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
+ healthChecker.setValue(1);
try {
elasticSearchClient.connect();
StorageEsInstaller installer = new StorageEsInstaller(elasticSearchClient, getManager(), config);
@@ -189,6 +196,10 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
} catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) {
throw new ModuleStartException(e.getMessage(), e);
}
+ if (!config.isEnableHealthCheck()) {
+ return;
+ }
+ elasticSearchClient.activeHealthChecker(isHealthy -> healthChecker.setValue(isHealthy ? 0 : 1));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index 067bf2c..88288cc 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -146,13 +146,7 @@ public class H2StorageProvider extends ModuleProvider {
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
- h2Client.setHealthCheckListener(isHealthy -> {
- if (isHealthy) {
- healthChecker.setValue(0);
- } else {
- healthChecker.setValue(1);
- }
- });
+ h2Client.activeHealthCheck(healthy -> healthChecker.setValue(healthy ? 0 : 1));
}
@Override