You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/26 02:27:39 UTC

[pulsar] branch master updated: [improve][es-sink] Add error log for failed bulk records in ES sink (#16177)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c327b634135 [improve][es-sink] Add error log for failed bulk records in ES sink (#16177)
c327b634135 is described below

commit c327b634135de93bb96ba6f07a1080c36e83e15c
Author: Yang Yang <yy...@streamnative.io>
AuthorDate: Tue Jul 26 10:27:31 2022 +0800

    [improve][es-sink] Add error log for failed bulk records in ES sink (#16177)
---
 .../apache/pulsar/io/elasticsearch/ElasticSearchClient.java | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
index dcbdfbc64fb..67b04529a77 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
@@ -76,7 +76,7 @@ public class ElasticSearchClient implements AutoCloseable {
                     final Record record = bulkOperationList.get(index++).getPulsarRecord();
                     if (result.isError()) {
                         record.fail();
-                        checkForIrrecoverableError(result);
+                        checkForIrrecoverableError(record, result);
                     } else {
                         record.ack();
                     }
@@ -106,13 +106,15 @@ public class ElasticSearchClient implements AutoCloseable {
         return irrecoverableError.get() != null;
     }
 
-    void checkForIrrecoverableError(BulkProcessor.BulkOperationResult result) {
+    void checkForIrrecoverableError(Record<?> record, BulkProcessor.BulkOperationResult result) {
         if (!result.isError()) {
             return;
         }
         final String errorCause = result.getError();
+        boolean isMalformed = false;
         for (String error : MALFORMED_ERRORS) {
             if (errorCause.contains(error)) {
+                isMalformed = true;
                 switch (config.getMalformedDocAction()) {
                     case IGNORE:
                         break;
@@ -132,6 +134,13 @@ public class ElasticSearchClient implements AutoCloseable {
                 }
             }
         }
+        if (!isMalformed) {
+            log.warn("Bulk request failed, message id=[{}] index={} error={}",
+                    record.getMessage()
+                            .map(m -> m.getMessageId().toString())
+                            .orElse(""),
+                    result.getIndex(), result.getError());
+        }
     }
 
     public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws Exception {