You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Etienne Chauchot (JIRA)" <ji...@apache.org> on 2018/11/13 10:21:00 UTC

[jira] [Commented] (BEAM-6052) elasticsearchIO checkForErrors method bug

    [ https://issues.apache.org/jira/browse/BEAM-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684984#comment-16684984 ] 

Etienne Chauchot commented on BEAM-6052:
----------------------------------------

[~kevin_123] thx for that ! you said on slack that you have a local fix. Feel free to submit a PR to beam (see https://beam.apache.org/contribute/) and put me or [~timrobertson100] as reviewer

> elasticsearchIO checkForErrors method bug
> -----------------------------------------
>
>                 Key: BEAM-6052
>                 URL: https://issues.apache.org/jira/browse/BEAM-6052
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-elasticsearch
>         Environment: beam-sdk-java-io-elasticsearch-2.8.0
>            Reporter: Fred k
>            Assignee: Etienne Chauchot
>            Priority: Minor
>              Labels: easyfix
>
> When i use Write to write update bulk request to elasticsearch, it appear the exception below:
> {code:java}
> Caused by: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
>     at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:215)
>     at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1235)
>     at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1199)
> {code}
> I check the method of checkForErrors, found out that can not parse the response include update contents. So i add the logic for parse update, i can see the output like below:
> {code:java}
> Caused by: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
> Document id 1465285334751e039cc4883a8a270191: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception)
> Document id e2722c653c65a4cb119e9b8dc44e37ad: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception)
> Document id b25472e3665695c49861f6eceee5531a: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception)
> Document id 022c1accdae82f6fe4108ba7989732fc: rejected execution of org.elasticsearch.transport.TransportService$7@6c8edc37 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 198, completed tasks = 10324166]] (es_rejected_execution_exception)
>     at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:215)
>     at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1235)
>     at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.processElement(ElasticsearchIO.java:1199)
> {code}
> the reponse content is like below:
> {
>     "took": 293,
>     "errors": true,
>     "items": [
>     {
>         "update": {
>             "_index": "test_kevin_2018-11",
>             "_type": "kevin",
>             "_id": "8d7664286c0887c637229166c7c613bc",
>             "_version": 1,
>             "result": "noop",
>             "_shards": {
>                 "total": 1,
>                 "successful": 1,
>                 "failed": 0
>             },
>             "status": 200
>         }
>     },                
>     {
>         "update": {
>             "_index": "test_kevin_2018-11",
>             "_type": "kevin",
>             "_id": "49952be98f4fc160f56bcdb33b1dbf7e",
>             "status": 429,
>             "error": {
>                 "type": "es_rejected_execution_exception",
>                 "reason": "rejected execution of org.elasticsearch.transport.TransportService$7@3f70bbe7 on EsThreadPoolExecutor[name = gjzx159-node2/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@56b9faa3[Running, pool size = 40, active threads = 40, queued tasks = 200, completed tasks = 10034174]]"
>             }
>         }
>     }
> }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)