You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/06/30 12:58:05 UTC

[skywalking] 01/01: Support connectTimeout and socketTimeout settings for ElasticSearch6 and ElasticSearch7 storages.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch es-timeout-setting
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 43987bf41a1b1c53cff6551ff1b60bf9253006ac
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Wed Jun 30 20:57:49 2021 +0800

    Support connectTimeout and socketTimeout settings for ElasticSearch6 and ElasticSearch7 storages.
---
 CHANGES.md                                                  |  1 +
 docs/en/setup/backend/configuration-vocabulary.md           |  4 ++++
 .../server-bootstrap/src/main/resources/application.yml     |  2 ++
 .../library/client/elasticsearch/ElasticSearchClient.java   | 13 ++++++++++++-
 .../elasticsearch/StorageModuleElasticsearchConfig.java     | 12 ++++++++++++
 .../elasticsearch/StorageModuleElasticsearchProvider.java   |  4 ++--
 .../elasticsearch7/StorageModuleElasticsearch7Provider.java |  2 +-
 .../plugin/elasticsearch7/client/ElasticSearch7Client.java  |  9 ++++-----
 8 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 9323e99..5c62152 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -60,6 +60,7 @@ Release Notes.
 * Performance: enhance persistent session mechanism, about differentiating cache timeout for different dimensionality
   metrics. The timeout of the cache for minute and hour level metrics has been prolonged to ~5 min.
 * Performance: Add L1 aggregation flush period, which reduce the CPU load and help young GC.
+* Support connectTimeout and socketTimeout settings for ElasticSearch6 and ElasticSearch7 storages.
 
 #### UI
 
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index 3fa9376..6a79c62 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -81,6 +81,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
 | - | - | nameSpace | Prefix of indexes created and used by SkyWalking. | SW_NAMESPACE | - |
 | - | - | clusterNodes | ElasticSearch cluster nodes for client connection.| SW_STORAGE_ES_CLUSTER_NODES |localhost|
 | - | - | protocol | HTTP or HTTPs. | SW_STORAGE_ES_HTTP_PROTOCOL | HTTP|
+| - | - | connectTimeout | Connect timeout of ElasticSearch client. Unit is ms. | SW_STORAGE_ES_CONNECT_TIMEOUT | 500|
+| - | - | socketTimeout | Socket timeout of ElasticSearch client. Unit is ms. | SW_STORAGE_ES_SOCKET_TIMEOUT | 60000|
 | - | - | user| User name of ElasticSearch cluster| SW_ES_USER | - |
 | - | - | password | Password of ElasticSearch cluster | SW_ES_PASSWORD | - |
 | - | - | trustStorePath | Trust JKS file path. Only work when user name and password opened | SW_STORAGE_ES_SSL_JKS_PATH | - |
@@ -104,6 +106,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
 | - | - | nameSpace | Prefix of indexes created and used by SkyWalking. | SW_NAMESPACE | - |
 | - | - | clusterNodes | ElasticSearch cluster nodes for client connection.| SW_STORAGE_ES_CLUSTER_NODES |localhost|
 | - | - | protocol | HTTP or HTTPs. | SW_STORAGE_ES_HTTP_PROTOCOL | HTTP|
+| - | - | connectTimeout | Connect timeout of ElasticSearch client. Unit is ms. | SW_STORAGE_ES_CONNECT_TIMEOUT | 500|
+| - | - | socketTimeout | Socket timeout of ElasticSearch client. Unit is ms. | SW_STORAGE_ES_SOCKET_TIMEOUT | 60000|
 | - | - | user| User name of ElasticSearch cluster| SW_ES_USER | - |
 | - | - | password | Password of ElasticSearch cluster | SW_ES_PASSWORD | - |
 | - | - | trustStorePath | Trust JKS file path. Only work when user name and password opened | SW_STORAGE_ES_SSL_JKS_PATH | - |
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml
index 2f888fd..ff307c3 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -120,6 +120,8 @@ storage:
     nameSpace: ${SW_NAMESPACE:""}
     clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
     protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
+    connectTimeout: ${SW_STORAGE_ES_CONNECT_TIMEOUT:500}
+    socketTimeout: ${SW_STORAGE_ES_SOCKET_TIMEOUT:60000}
     user: ${SW_ES_USER:""}
     password: ${SW_ES_PASSWORD:""}
     trustStorePath: ${SW_STORAGE_ES_SSL_JKS_PATH:""}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index 503fa30..3727fd8 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -119,6 +119,8 @@ public class ElasticSearchClient implements Client, HealthCheckable {
     protected volatile RestHighLevelClient client;
     protected DelegatedHealthChecker healthChecker = new DelegatedHealthChecker();
     protected final ReentrantLock connectLock = new ReentrantLock();
+    private final int connectTimeout;
+    private final int socketTimeout;
 
     public ElasticSearchClient(String clusterNodes,
                                String protocol,
@@ -126,7 +128,9 @@ public class ElasticSearchClient implements Client, HealthCheckable {
                                String trustStorePass,
                                String user,
                                String password,
-                               List<IndexNameConverter> indexNameConverters) {
+                               List<IndexNameConverter> indexNameConverters,
+                               int connectTimeout,
+                               int socketTimeout) {
         this.clusterNodes = clusterNodes;
         this.protocol = protocol;
         this.user = user;
@@ -134,6 +138,8 @@ public class ElasticSearchClient implements Client, HealthCheckable {
         this.indexNameConverters = indexNameConverters;
         this.trustStorePath = trustStorePath;
         this.trustStorePass = trustStorePass;
+        this.connectTimeout = connectTimeout;
+        this.socketTimeout = socketTimeout;
     }
 
     @Override
@@ -183,6 +189,11 @@ public class ElasticSearchClient implements Client, HealthCheckable {
         } else {
             builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
         }
+        builder.setRequestConfigCallback(
+            requestConfigBuilder -> requestConfigBuilder
+                .setConnectTimeout(connectTimeout)
+                .setSocketTimeout(socketTimeout)
+        );
 
         return new RestHighLevelClient(builder);
     }
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 126efc2..b62c3f2 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -30,6 +30,18 @@ public class StorageModuleElasticsearchConfig extends ModuleConfig {
     private String clusterNodes;
     String protocol = "http";
     /**
+     * Connect timeout of ElasticSearch client.
+     *
+     * @since 8.7.0
+     */
+    private int connectTimeout = 500;
+    /**
+     * Socket timeout of ElasticSearch client.
+     *
+     * @since 8.7.0
+     */
+    private int socketTimeout = 60000;
+    /**
      * Since 6.4.0, the index of metrics and traces data in minute/hour/month precision are organized in days. ES
      * storage creates new indexes in every day.
      *
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 5b8019f..ce76619 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -163,11 +163,11 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
         elasticSearchClient = new ElasticSearchClient(
             config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
             .getTrustStorePass(), config.getUser(), config.getPassword(),
-            indexNameConverters(config.getNameSpace())
+            indexNameConverters(config.getNameSpace()), config.getConnectTimeout(), config.getSocketTimeout()
         );
         this.registerServiceImplementation(
             IBatchDAO.class,
-            new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(),  config
+            new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
                 .getFlushInterval(), config.getConcurrentRequests())
         );
         this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
index b0eebef..2eb218e 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/StorageModuleElasticsearch7Provider.java
@@ -161,7 +161,7 @@ public class StorageModuleElasticsearch7Provider extends ModuleProvider {
         elasticSearch7Client = new ElasticSearch7Client(
             config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
             .getTrustStorePass(), config.getUser(), config.getPassword(),
-            indexNameConverters(config.getNameSpace())
+            indexNameConverters(config.getNameSpace()), config.getConnectTimeout(), config.getSocketTimeout()
         );
         this.registerServiceImplementation(
             IBatchDAO.class,
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
index c0e0d72..2d720f0 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch7-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch7/client/ElasticSearch7Client.java
@@ -78,9 +78,6 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
 import org.elasticsearch.index.reindex.DeleteByQueryRequest;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 
-/**
- *
- */
 @Slf4j
 public class ElasticSearch7Client extends ElasticSearchClient {
     public ElasticSearch7Client(final String clusterNodes,
@@ -89,10 +86,12 @@ public class ElasticSearch7Client extends ElasticSearchClient {
                                 final String trustStorePass,
                                 final String user,
                                 final String password,
-                                List<IndexNameConverter> indexNameConverters) {
+                                List<IndexNameConverter> indexNameConverters,
+                                int connectTimeout,
+                                int socketTimeout) {
         super(
             clusterNodes, protocol, trustStorePath, trustStorePass, user, password,
-            indexNameConverters
+            indexNameConverters, connectTimeout, socketTimeout
         );
     }