You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tzu-Li (Gordon) Tai (JIRA)" <ji...@apache.org> on 2017/02/01 04:02:51 UTC

[jira] [Comment Edited] (FLINK-5353) Elasticsearch Sink loses well-formed documents when there are malformed documents

    [ https://issues.apache.org/jira/browse/FLINK-5353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847946#comment-15847946 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5353 at 2/1/17 4:02 AM:
--------------------------------------------------------------------

Hi [~f.pompermaier], first some clarification will be helpful:

1. Was the error somehow thrown in {{afterBulk}}, causing the sink to fail immediately? Previously we only keep the reported error, and only rethrow it on {{close()}}, so what your original description suggests seems a bit odd to me.

2. Which Elasticsearch version are you using? We're using the Elasticsearch {{BulkProcessor}} internally, and the expected behaviour is that one or more failures in the bulk should not affect the other good documents to be properly requested. So it *might* be that an older version of Elasticsearch's {{BulkProcessor}} is not following this expected behaviour.

3. Could you confirm if the exception was {{ElasticsearchParseException}}? I'm trying to find the exception type for malformed documents.

4. Have you tried setting the {{ignore_malformed}} config for your indices / fields? https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-malformed.html.


Now, about the possible approach:
With all the different types of exceptions that may occur at Elasticsearch, instead of handling them case by case in the connector code, I think it'll be reasonable to have an user-provided handler / callback for these failed documents, and let the user decide on how to handle them: either drop them, or re-add them to the {{RequestIndexer}}.

What do you think about this? Would be great to hear from actual ES users so that we can find an optimal, future change-proof solution here :-)


was (Author: tzulitai):
Hi [~f.pompermaier], first some clarification will be helpful:

1. Was the error somehow thrown in `afterBulk`, causing the sink to fail immediately? We only keep the reported error, and only rethrow it on `close()`, so what your original description suggests seems a bit odd to me.

2. Which Elasticsearch version are you using? We're using the Elasticsearch `BulkProcessor` internally, and the expected behaviour is that one or more failures in the bulk should not affect the other good documents to be properly requested. So it *might* be that an older version of Elasticsearch's `BulkProcessor` is not following this expected behaviour.

3. Could you confirm if the exception was `ElasticsearchParseException`? I'm trying to find the exception type for malformed documents.

4. Have you tried setting the `ignore_malformed` config for your indices / fields? https://www.elastic.co/guide/en/elasticsearch/reference/current/ignore-malformed.html.


Now, about the possible approach:
With all the different types of exceptions that may occur at Elasticsearch, instead of handling them case by case in the connector code, I think it'll be reasonable to have an user-provided handler / callback for these failed documents, and let the user decide on how to handle them: either drop them, or re-add them to the `RequestIndexer`.

What do you think about this? Would be great to hear from actual ES users so that we can find an optimal, future change-proof solution here :-)

> Elasticsearch Sink loses well-formed documents when there are malformed documents
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-5353
>                 URL: https://issues.apache.org/jira/browse/FLINK-5353
>             Project: Flink
>          Issue Type: Bug
>          Components: Batch Connectors and Input/Output Formats
>    Affects Versions: 1.1.3
>            Reporter: Flavio Pompermaier
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)