You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by static-max <gi...@git.apache.org> on 2017/01/03 10:16:12 UTC

[GitHub] flink pull request #2861: [FLINK-5122] Index requests will be retried if the...

Github user static-max commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2861#discussion_r94382504
  
    --- Diff: flink-streaming-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java ---
    @@ -186,22 +198,47 @@ public void beforeBulk(long executionId, BulkRequest request) {
     
     			@Override
     			public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
    +				boolean allRequestsRepeatable = true;
     				if (response.hasFailures()) {
     					for (BulkItemResponse itemResp : response.getItems()) {
     						if (itemResp.isFailed()) {
    -							LOG.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
    -							failureThrowable.compareAndSet(null, new RuntimeException(itemResp.getFailureMessage()));
    +							// Check if index request can be retried
    +							String failureMessageLowercase = itemResp.getFailureMessage().toLowerCase();
    +							if (checkErrorAndRetryBulk && (
    +									failureMessageLowercase.contains("timeout") || failureMessageLowercase.contains("timed out") // Generic timeout errors
    +									|| failureMessageLowercase.contains("UnavailableShardsException".toLowerCase()) // Shard not available due to rebalancing or node down
    +									|| (failureMessageLowercase.contains("data/write/bulk") && failureMessageLowercase.contains("bulk")) // Bulk index queue on node full
    +									)
    +								) {
    +								LOG.debug("Retry bulk: {}", itemResp.getFailureMessage());
    +								reAddBulkRequest(request);
    --- End diff --
    
    Your're right, it gets added multiple times, I'll fix that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---