You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ApoorvK <ap...@razorpay.com> on 2020/02/10 07:26:04 UTC

Flink Elasticsearch upsert document in ES

Team, 
Presently I have added elasticsearch as a sink to a stream and inserting the
json data, the problem is when I restore the application in case of crash it
reprocess the data in between (meanwhile a backend application updates the
document in ES) and flink reinsert the document in ES and all update to ES
are lost . 

I am trying for a update or insert in case document not found or do not
insert if document is already there.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink Elasticsearch upsert document in ES

Posted by Apoorv Upadhyay <ap...@razorpay.com>.
I have tried by providing opType to elasticsearch builder, I am getting an
error message "document already exists" on my console, but it still updates
the value in elasticsearch

val jsonString = write(record)
val rqst: IndexRequest = Requests.indexRequest
  .index(parameter.get("esIndexName"))
  .`type`(parameter.get("esIndexType"))
  .id(record.getApi_key + "_" + record.getOrder_id)
  .source(jsonString, XContentType.JSON)
    .opType(OpType.CREATE)


On Mon, Feb 10, 2020 at 1:42 PM Itamar Syn-Hershko <
itamar@bigdataboutique.com> wrote:

> Hi ApoorvK,
>
> Elasticsearch supports "create" mode while indexing. By default indexing
> will overwrite documents with a the same ID, but you can tell ES to refuse
> overwriting. See op_type in
> https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-query-params
> .
>
> Looking at the Elasticsearch Sink, it doesn't seem like it's implemented
> currently, but it should be relatively easy to add.
>
> On Mon, Feb 10, 2020 at 9:26 AM ApoorvK <ap...@razorpay.com>
> wrote:
>
>> Team,
>> Presently I have added elasticsearch as a sink to a stream and inserting
>> the
>> json data, the problem is when I restore the application in case of crash
>> it
>> reprocess the data in between (meanwhile a backend application updates the
>> document in ES) and flink reinsert the document in ES and all update to ES
>> are lost .
>>
>> I am trying for a update or insert in case document not found or do not
>> insert if document is already there.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>
>
> --
>
> [image: logo] <https://bigdataboutique.com/>
> Itamar Syn-Hershko
> CTO, Founder
>
> itamar@bigdataboutique.com
> https://bigdataboutique.com
> <https://www.linkedin.com/in/itamar-syn-hershko-78b25013>
> <https://twitter.com/synhershko>
> <https://www.youtube.com/channel/UCBHr7lM2u6SCWPJvcKug-Yg>
>

Re: Flink Elasticsearch upsert document in ES

Posted by Itamar Syn-Hershko <it...@bigdataboutique.com>.
Hi ApoorvK,

Elasticsearch supports "create" mode while indexing. By default indexing
will overwrite documents with a the same ID, but you can tell ES to refuse
overwriting. See op_type in
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-query-params
.

Looking at the Elasticsearch Sink, it doesn't seem like it's implemented
currently, but it should be relatively easy to add.

On Mon, Feb 10, 2020 at 9:26 AM ApoorvK <ap...@razorpay.com>
wrote:

> Team,
> Presently I have added elasticsearch as a sink to a stream and inserting
> the
> json data, the problem is when I restore the application in case of crash
> it
> reprocess the data in between (meanwhile a backend application updates the
> document in ES) and flink reinsert the document in ES and all update to ES
> are lost .
>
> I am trying for a update or insert in case document not found or do not
> insert if document is already there.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

[image: logo] <https://bigdataboutique.com/>
Itamar Syn-Hershko
CTO, Founder

itamar@bigdataboutique.com
https://bigdataboutique.com
<https://www.linkedin.com/in/itamar-syn-hershko-78b25013>
<https://twitter.com/synhershko>
<https://www.youtube.com/channel/UCBHr7lM2u6SCWPJvcKug-Yg>