You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/26 02:27:39 UTC
[pulsar] branch master updated: [improve][es-sink] Add error log for failed bulk records in ES sink (#16177)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c327b634135 [improve][es-sink] Add error log for failed bulk records in ES sink (#16177)
c327b634135 is described below
commit c327b634135de93bb96ba6f07a1080c36e83e15c
Author: Yang Yang <yy...@streamnative.io>
AuthorDate: Tue Jul 26 10:27:31 2022 +0800
[improve][es-sink] Add error log for failed bulk records in ES sink (#16177)
---
.../apache/pulsar/io/elasticsearch/ElasticSearchClient.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
index dcbdfbc64fb..67b04529a77 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
@@ -76,7 +76,7 @@ public class ElasticSearchClient implements AutoCloseable {
final Record record = bulkOperationList.get(index++).getPulsarRecord();
if (result.isError()) {
record.fail();
- checkForIrrecoverableError(result);
+ checkForIrrecoverableError(record, result);
} else {
record.ack();
}
@@ -106,13 +106,15 @@ public class ElasticSearchClient implements AutoCloseable {
return irrecoverableError.get() != null;
}
- void checkForIrrecoverableError(BulkProcessor.BulkOperationResult result) {
+ void checkForIrrecoverableError(Record<?> record, BulkProcessor.BulkOperationResult result) {
if (!result.isError()) {
return;
}
final String errorCause = result.getError();
+ boolean isMalformed = false;
for (String error : MALFORMED_ERRORS) {
if (errorCause.contains(error)) {
+ isMalformed = true;
switch (config.getMalformedDocAction()) {
case IGNORE:
break;
@@ -132,6 +134,13 @@ public class ElasticSearchClient implements AutoCloseable {
}
}
}
+ if (!isMalformed) {
+ log.warn("Bulk request failed, message id=[{}] index={} error={}",
+ record.getMessage()
+ .map(m -> m.getMessageId().toString())
+ .orElse(""),
+ result.getIndex(), result.getError());
+ }
}
public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws Exception {