You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/09/24 13:21:07 UTC

[skywalking] branch enhance/socketTimeout created (now b6007c4)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a change to branch enhance/socketTimeout
in repository https://gitbox.apache.org/repos/asf/skywalking.git.


      at b6007c4  Add `socketTimeout` back to the new implementation

This branch includes the following new commits:

     new b6007c4  Add `socketTimeout` back to the new implementation

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[skywalking] 01/01: Add `socketTimeout` back to the new implementation

Posted by ke...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch enhance/socketTimeout
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit b6007c480963c1c7bcc2f3d6395624b38db3fb08
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Sep 24 16:03:15 2021 +0800

    Add `socketTimeout` back to the new implementation
---
 .../library/client/elasticsearch/ElasticSearchClient.java    |  1 +
 .../skywalking/library/elasticsearch/ElasticSearch.java      |  6 +++++-
 .../library/elasticsearch/ElasticSearchBuilder.java          | 12 +++++++++++-
 .../skywalking/library/elasticsearch/bulk/BulkProcessor.java |  7 ++++++-
 4 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
index b6801bf..03a9116 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -117,6 +117,7 @@ public class ElasticSearchClient implements Client, HealthCheckable {
                 .endpoints(clusterNodes.split(","))
                 .protocol(protocol)
                 .connectTimeout(connectTimeout)
+                .socketTimeout(socketTimeout)
                 .numHttpClientThread(numHttpClientThread)
                 .healthyListener(healthy -> {
                     if (healthy) {
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
index 8b416dc..47607b9 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearch.java
@@ -36,6 +36,7 @@ import com.linecorp.armeria.common.util.Exceptions;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -80,7 +81,8 @@ public final class ElasticSearch implements Closeable {
                   String username, String password,
                   EndpointGroup endpointGroup,
                   ClientFactory clientFactory,
-                  Consumer<Boolean> healthyListener) {
+                  Consumer<Boolean> healthyListener,
+                  Duration socketTimeout) {
         this.endpointGroup = endpointGroup;
         this.clientFactory = clientFactory;
         if (healthyListener != null) {
@@ -93,6 +95,8 @@ public final class ElasticSearch implements Closeable {
         final WebClientBuilder builder =
             WebClient.builder(protocol, endpointGroup)
                      .factory(clientFactory)
+                     .responseTimeout(socketTimeout)
+                     .writeTimeout(socketTimeout)
                      .decorator(LoggingClient.builder()
                                              .logger(log)
                                              .newDecorator())
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchBuilder.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchBuilder.java
index d154f32..5e2a834 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchBuilder.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/ElasticSearchBuilder.java
@@ -63,6 +63,8 @@ public final class ElasticSearchBuilder {
 
     private Duration connectTimeout = Duration.ofMillis(500);
 
+    private Duration socketTimeout = Duration.ofSeconds(30);
+
     private Consumer<Boolean> healthyListener;
 
     private int numHttpClientThread;
@@ -117,6 +119,13 @@ public final class ElasticSearchBuilder {
         return this;
     }
 
+
+    public ElasticSearchBuilder socketTimeout(int socketTimeout) {
+        checkArgument(socketTimeout > 0, "socketTimeout must be positive");
+        this.socketTimeout = Duration.ofMillis(socketTimeout);
+        return this;
+    }
+
     public ElasticSearchBuilder healthyListener(Consumer<Boolean> healthyListener) {
         requireNonNull(healthyListener, "healthyListener");
         this.healthyListener = healthyListener;
@@ -183,7 +192,8 @@ public final class ElasticSearchBuilder {
             password,
             endpointGroup,
             clientFactory,
-            healthyListener
+            healthyListener,
+            socketTimeout
         );
     }
 }
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
index a8f72ec..93c49ae 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
@@ -84,9 +84,10 @@ public final class BulkProcessor {
         return this;
     }
 
+    @SneakyThrows
     private void internalAdd(Object request) {
         requireNonNull(request, "request");
-        requests.add(request);
+        requests.put(request);
         flushIfNeeded();
     }
 
@@ -120,6 +121,10 @@ public final class BulkProcessor {
     private CompletableFuture<Void> doFlush(final List<Object> batch) {
         log.debug("Executing bulk with {} requests", batch.size());
 
+        if (batch.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
         final CompletableFuture<Void> future = es.get().version().thenCompose(v -> {
             try {
                 final RequestFactory rf = v.requestFactory();