You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Etienne Chauchot (JIRA)" <ji...@apache.org> on 2018/11/13 10:21:00 UTC
[jira] [Commented] (BEAM-6052) elasticsearchIO checkForErrors
method bug
[ https://issues.apache.org/jira/browse/BEAM-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684984#comment-16684984 ]
Etienne Chauchot commented on BEAM-6052:
----------------------------------------
[~kevin_123] thx for that ! you said on slack that you have a local fix. Feel free to submit a PR to beam (see https://beam.apache.org/contribute/) and put me or [~timrobertson100] as reviewer
> elasticsearchIO checkForErrors method bug
> -----------------------------------------
>
> Key: BEAM-6052
> URL: https://issues.apache.org/jira/browse/BEAM-6052
> Project: Beam
> Issue Type: Bug
> Components: io-java-elasticsearch
> Environment: beam-sdk-java-io-elasticsearch-2.8.0
> Reporter: Fred k
> Assignee: Etienne Chauchot
> Priority: Minor
> Labels: easyfix
>
> When i use Write to write update bulk request to elasticsearch, it appear the exception below:
> {code:java}
> Caused by: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
> at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:215)
> at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1235)
> at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1199)
> {code}
> I check the method of checkForErrors, found out that can not parse the response include update contents. So i add the logic for parse update, i can see the output like below:
> {code:java}
> Caused by: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
> Document id 1465285334751e039cc4883a8a270191: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception)
> Document id e2722c653c65a4cb119e9b8dc44e37ad: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception)
> Document id b25472e3665695c49861f6eceee5531a: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception)
> Document id 022c1accdae82f6fe4108ba7989732fc: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception)
> at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:215)
> at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1235)
> at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1199)
> {code}
> the reponse content is like below:
> {
> "took": 293,
> "errors": true,
> "items": [
> {
> "update": {
> "_index": "test_kevin_2018-11",
> "_type": "kevin",
> "_id": "8d7664286c0887c637229166c7c613bc",
> "_version": 1,
> "result": "noop",
> "_shards": {
> "total": 1,
> "successful": 1,
> "failed": 0
> },
> "status": 200
> }
> },
> {
> "update": {
> "_index": "test_kevin_2018-11",
> "_type": "kevin",
> "_id": "49952be98f4fc160f56bcdb33b1dbf7e",
> "status": 429,
> "error": {
> "type": "es_rejected_execution_exception",
> "reason": "rejected execution of org.elasticsearch.transport.TransportService$7@3f70bbe7 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 200, completed tasks = 10034174]]"
> }
> }
> }
> }
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)