You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/19 12:40:08 UTC

[skywalking] branch master updated: fix ES flush thread will stop work when flush schedule task have ES connection exception (#8909)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 00b58f6210 fix ES flush thread will stop work when flush schedule task have ES connection exception (#8909)
00b58f6210 is described below

commit 00b58f6210a4fee868ced26b627d5cd18ae23f92
Author: yangyiweigege <28...@qq.com>
AuthorDate: Tue Apr 19 20:39:55 2022 +0800

    fix ES flush thread will stop work when flush schedule task have ES connection exception (#8909)
    
    Co-authored-by: xishui <xi...@wacai.com>
---
 docs/en/changes/changes.md                                            | 1 +
 .../apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java   | 4 +++-
 2 files changed, 4 insertions(+), 1 deletion(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index f69d8c8084..11847baff3 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -20,6 +20,7 @@
 * Set `SW_QUERY_MAX_QUERY_COMPLEXITY` default value to `1000`
 * Webapp module (for UI) enabled compression.
 * [Breaking Change] Add layer field to event, report an event without layer is not allowed.
+* Fix ES flush thread stops when flush schedule task throws exception, such as ElasticSearch flush failed.
 
 #### UI
 
diff --git a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
index 2733b13018..a5d23e0958 100644
--- a/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
+++ b/oap-server/server-library/library-elasticsearch-client/src/main/java/org/apache/skywalking/library/elasticsearch/bulk/BulkProcessor.java
@@ -37,6 +37,7 @@ import org.apache.skywalking.library.elasticsearch.ElasticSearch;
 import org.apache.skywalking.library.elasticsearch.requests.IndexRequest;
 import org.apache.skywalking.library.elasticsearch.requests.UpdateRequest;
 import org.apache.skywalking.library.elasticsearch.requests.factory.RequestFactory;
+import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection;
 
 import static java.util.Objects.requireNonNull;
 
@@ -72,7 +73,8 @@ public final class BulkProcessor {
         scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
         scheduler.setRemoveOnCancelPolicy(true);
         scheduler.scheduleWithFixedDelay(
-            this::flush, 0, flushInterval.getSeconds(), TimeUnit.SECONDS);
+                new RunnableWithExceptionProtection(this::flush,
+                        t -> log.error("flush data to ES failure:", t)), 0, flushInterval.getSeconds(), TimeUnit.SECONDS);
     }
 
     public CompletableFuture<Void> add(IndexRequest request) {