You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Etienne Chauchot (JIRA)" <ji...@apache.org> on 2017/11/23 15:37:00 UTC

[jira] [Comment Edited] (BEAM-3201) ElasticsearchIO should deal with documents id

    [ https://issues.apache.org/jira/browse/BEAM-3201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264477#comment-16264477 ] 

Etienne Chauchot edited comment on BEAM-3201 at 11/23/17 3:36 PM:
------------------------------------------------------------------

[~nerdynick] regarding what Chet said, and with the knowledge about your use case that you gave, have you tried the {{Partition}} transform?  It allows to "split" a PCollection (your unbounded one got after reading from kafka) into several based on a user defined function, allowing then to plug different {{ESIO.Write()}} transforms to write to different index/type. The only thing is that it requires to specify the number of partitions when applying {{Partition}} transform. 

If it does not fit, then I prefer having 3 user defined functions that specify id, type and index values rather than adding metadata fields to the element stored in the PCollection. Adding them will not break the backward compatibility because {{with[id|type|index]Fn}} will be optional. In the end, at writing time, of course, the payload of the ES doc will not contain the metadata fields, they will be in the regular metadata bulk place 
{code}
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{code}

Besides keeping PCollection<String> is mainly to avoid breaking the API. You said "Doing the work to deserialize, add fields, serialize is a lot of extra work "; in the user functions you will not need to add fields or serialize because the fields would not be in the output payload. You would just need to parse the json string to determine the id, type, index names out of the json on each call to {{processElement}} in the {{ESIO.WriteFn}}. To parse you can use JSonPath of course.


was (Author: echauchot):
[~nerdynick] regarding what Chet said, and with the knowledge about your use case that you gave, have you tried the {{Partition}} transform?  It allows to "split" a PCollection (your unbounded one got after reading from kafka) into several based on a user defined function, allowing then to plug different {{ESIO.Write()}} transforms to write to different index/type. The only thing is that it requires to specify the number of partitions when apply {{Partition}} transform. 

If it does not fit, then I prefer having 3 user defined functions that specify id, type and index values rather than adding metadata fields to the element stored in the PCollection. Adding them will not break the backward compatibility because {{with[id|type|index]Fn}} will be optional. In the end, at writing time, of course, the payload of the ES doc will not contain the metadata fields, they will be in the regular metadata bulk place 
{code}
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{code}

Besides keeping PCollection<String> is mainly to avoid breaking the API. You said "Doing the work to deserialize, add fields, serialize is a lot of extra work "; in the user functions you will not need to add fields because the fields would not be in the output payload. You would just need to parse the json string to determine the id, type, index names out of the json on each call to {{processElement}} in the {{ESIO.WriteFn}}. To parse you can use JSonPath of course.

> ElasticsearchIO should deal with documents id
> ---------------------------------------------
>
>                 Key: BEAM-3201
>                 URL: https://issues.apache.org/jira/browse/BEAM-3201
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-extensions
>            Reporter: Etienne Chauchot
>            Assignee: Chet Aldrich
>
> Today the ESIO only inserts the payload of the ES documents. Elasticsearch generates a document id for each record inserted. So each new insertion is considered as a new document. Users want to be able to update documents using the IO. So, for the write part of the IO, users should be able to provide a document id so that they could update already stored documents. Providing an id for the documents could also help the user on indempotency.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)