You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/05 13:37:06 UTC

[GitHub] [beam] Ironlink opened a new issue, #22160: [Bug]: Java ElasticsearchIO fails to handle retry on HTTP 429

Ironlink opened a new issue, #22160:
URL: https://github.com/apache/beam/issues/22160

   ### What happened?
   
   It seems to me that the retry functionality in ElasticsearchIO does not work in my batch pipeline running on Dataflow. Here is an extract from my logs:
   ```
   org.apache.beam.sdk.util.UserCodeException: org.elasticsearch.client.ResponseException: method [POST], host [http://10.x.y.z:9200], URI [/my_index/_bulk], status line [HTTP/1.1 429 Too Many Requests]
   {"error":{"root_cause":[{"type":"es_rejected_execution_exception","reason":"rejected execution of coordinating operation [coordinating_and_primary_bytes=663765981, replica_bytes=0, all_bytes=663765981, coordinating_operation_bytes=47402063, max_coordinating_and_primary_bytes=675282944]"}],"type":"es_rejected_execution_exception","reason":"rejected execution of coordinating operation [coordinating_and_primary_bytes=663765981, replica_bytes=0, all_bytes=663765981, coordinating_operation_bytes=47402063, max_coordinating_and_primary_bytes=675282944]"},"status":429}
   	at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
   	at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBundleFn$DoFnInvoker.invokeProcessElement(Unknown Source)
   	at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:779)
   	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
   	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
   	at org.apache.beam.fn.harness.MapFnRunners$ExplodedWindowedValueMapperFactory.lambda$create$0(MapFnRunners.java:164)
   	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:255)
   	at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:209)
   	at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1769)
   	at org.apache.beam.fn.harness.FnApiDoFnRunner.access$2800(FnApiDoFnRunner.java:142)
   	at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2338)
   	at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2508)
   	at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn.processElement(ElasticsearchIO.java:1638)
   	at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$DocToBulk$DocToBulkFn$DoFnInvoker.invokeProcessElement(Unknown Source)
   <<snip>>
   Caused by: org.elasticsearch.client.ResponseException: method [POST], host [http://10.x.y.z:9200], URI [/my_index/_bulk], status line [HTTP/1.1 429 Too Many Requests]
   {"error":{"root_cause":[{"type":"es_rejected_execution_exception","reason":"rejected execution of coordinating operation [coordinating_and_primary_bytes=663765981, replica_bytes=0, all_bytes=663765981, coordinating_operation_bytes=47402063, max_coordinating_and_primary_bytes=675282944]"}],"type":"es_rejected_execution_exception","reason":"rejected execution of coordinating operation [coordinating_and_primary_bytes=663765981, replica_bytes=0, all_bytes=663765981, coordinating_operation_bytes=47402063, max_coordinating_and_primary_bytes=675282944]"},"status":429}
   	at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:346)
   	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:312)
   	at org.elasticsearch.client.RestClient.performRequest(RestClient.java:287)
   	at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushBatch(ElasticsearchIO.java:2502)
   	at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2435)
   	at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2448)
   	at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBundleFn.processElement(ElasticsearchIO.java:2333)
   ```
   
   The Elasticsearch `RestClient` throws an exception when the response status is an error, unless a special HTTP request parameter called "ignore" is defined and includes the status code in question.
   
   I've created this reproducer:
   * Code: https://gitlab.com/joakim.edenholm/apache-beam-repro/-/tree/elastic-throttling
   * CI logs: https://gitlab.com/joakim.edenholm/apache-beam-repro/-/jobs/2679857923
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: io-java-elasticsearch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] egalpin commented on issue #22160: [Bug]: Java ElasticsearchIO fails to handle retry on HTTP 429

Posted by GitBox <gi...@apache.org>.
egalpin commented on issue #22160:
URL: https://github.com/apache/beam/issues/22160#issuecomment-1182614344

   @Ironlink  can you comment on the size of your ES cluster? Seems there’s reports of this issue in highly resource constrained environments: https://discuss.elastic.co/t/es-rejected-execution-exception-what-steps-do-i-need-to-take/306836 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Ironlink commented on issue #22160: [Bug]: Java ElasticsearchIO fails to handle retry on HTTP 429

Posted by GitBox <gi...@apache.org>.
Ironlink commented on issue #22160:
URL: https://github.com/apache/beam/issues/22160#issuecomment-1175096348

   @egalpin I don't think I can, as the predicate receives the response body as its input, and the `RestClient` throws an exception (which is caught and rethrown). As such, the response body is not available to pass into the predicate. Besides, the RetryPredicate is package-private and marked `@VisibleForTesting`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] egalpin commented on issue #22160: [Bug]: Java ElasticsearchIO fails to handle retry on HTTP 429

Posted by GitBox <gi...@apache.org>.
egalpin commented on issue #22160:
URL: https://github.com/apache/beam/issues/22160#issuecomment-1175075698

   Thanks for the report and the repro (super helpful to have!).  Definitely 429 is something that's intended, and important, to handle.  I can investigate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Ironlink commented on issue #22160: [Bug]: Java ElasticsearchIO fails to handle retry on HTTP 429

Posted by GitBox <gi...@apache.org>.
Ironlink commented on issue #22160:
URL: https://github.com/apache/beam/issues/22160#issuecomment-1182979277

   @egalpin I currently have:
   * 6.5 TBs of source documents
   * 8.5 TBs of primary shard data (measured as index disk usage when index has no replicas)
   * 6 data nodes with 29 GBs of machine memory each (3 GBs lost to overhead)
   * 2 coordinator nodes with 13 GBs of machine memory each (3 GBs lost to overhead)
   
   In my pipeline, I have a maximum bulk load size of 45 MBs, and a maximum number of workers of 20 which results in 40 threads on Dataflow. 40 threads times 45 MBs gives 1 800 MBs of concurrent indexing data. Bulk requests are load balanced between the two coordinator nodes, which split and route the requests to respective data nodes based on document routing.
   
   By default, the ES coordinator nodes had allocated 50 % of memory to application heap, and the other 50 % to OS file system caching. As noted by the linked blog entry, the default limit for concurrent indexing data is 10 % of the application heap. In absolute numbers, this means the default limit for concurrent indexing data was about 600 MBs per coordinator node.
   
   I came across the linked blog entry as part of my troubleshooting, and adjusted my coordinator nodes as a result. Specifically, I raised the heap size from 6 GBs to 10 GBs, and raised `indexing_pressure.memory.limit` from 10 % to 20 % since the coordinators do nothing but split and forward requests. With this, the limit per coordinator becomes 2 GBs. I have not yet had the time to run my pipeline with this configuration, but it seems likely that this will work without any rejected requests since each coordinator on its own is able to fit the full 1 800 MBs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] egalpin commented on issue #22160: [Bug]: Java ElasticsearchIO fails to handle retry on HTTP 429

Posted by GitBox <gi...@apache.org>.
egalpin commented on issue #22160:
URL: https://github.com/apache/beam/issues/22160#issuecomment-1175088000

   My initial thoughts are that the error contents may be different from that expected by the default retry predicate[1].  It looks to me that the default predicate was implemented on the assumption that the error would arise from a partially successful bulk indexing operation.  The error in the initial report here arises from cluster coordination.
   
   @Ironlink Are you able to change the behaviour that you're seeing via creation of a custom RetryPredicate?
   
   [1] https://github.com/apache/beam/blob/b53b16f1fb41913b0e8bbe9d64d71b8e3ebfbbf6/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1177-L1191


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org