You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by aj <aj...@gmail.com> on 2020/05/28 18:43:03 UTC

Flink Elastic Sink

Hello All,

I am getting many events in Kafka and I have written a link job that sinks
that Avro records from Kafka to S3 in parquet format.

Now, I want to sink these records into elastic search. but the only
challenge is that I want to sink record on time indices. Basically, In
Elastic, I want to create a per day index with the date as the suffix.
So in Flink stream job if I create an es sink how will I change the sink to
start writing  in a new index when the first event of the day arrives

Thanks,
Anuj.


<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Flink Elastic Sink

Posted by Leonard Xu <xb...@gmail.com>.
Hi, aj

> I was confused before as I was thinking the sink builder is called only once but it gets called for every batch request, correct me if my understanding is wrong. 

You’re right that sink builder should be called only once rather than every batch requests, could you post some code piece of using the sink?

Best,
Leonard Xu



> On Fri, May 29, 2020 at 9:08 AM Leonard Xu <xbjtdcq@gmail.com <ma...@gmail.com>> wrote:
> Hi,aj
> 
> In the implementation of ElasticsearchSink, ElasticsearchSink  won't create index and only start a Elastic client for sending requests to
> the Elastic cluster. You can simply extract the index(date value in your case) from your timestamp field and then put it to an IndexRequest[2],  ElasticsearchSink will send the IndexRequests to the Elastic cluster, Elastic cluster will create corresponding index and flush the records.
> 
> BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch sql connector [2], you can simply config 'connector.index' = ‘myindex_{ts_field|yyyy-MM-dd}’ to achieve your goals.
> 
> Best,
> Leoanrd Xu
> [1] https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119 <https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119> 
> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector>    
> 
> 
> 
>> 在 2020年5月29日,02:43,aj <ajainjecrc@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hello All,
>> 
>> I am getting many events in Kafka and I have written a link job that sinks that Avro records from Kafka to S3 in parquet format. 
>> 
>> Now, I want to sink these records into elastic search. but the only challenge is that I want to sink record on time indices. Basically, In Elastic, I want to create a per day index with the date as the suffix. 
>> So in Flink stream job if I create an es sink how will I change the sink to start writing  in a new index when the first event of the day arrives
>> 
>> Thanks,
>> Anuj. 
>> 
>> 
>>  <http://www.oracle.com/>
>> 
>> 
>>  <http://www.cse.iitm.ac.in/%7Eanujjain/>
> 
> 
> -- 
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877 
> Skype : anuj.jain07
>  <http://www.oracle.com/>
> 
> 
>  <http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Flink Elastic Sink

Posted by aj <aj...@gmail.com>.
Thanks, It worked.

I was confused before as I was thinking the sink builder is called only
once but it gets called for every batch request, correct me if my
understanding is wrong.

On Fri, May 29, 2020 at 9:08 AM Leonard Xu <xb...@gmail.com> wrote:

> Hi,aj
>
> In the implementation of ElasticsearchSink, ElasticsearchSink  won't
> create index and only start a Elastic client for sending requests to
> the Elastic cluster. You can simply extract the index(date value in your
> case) from your timestamp field and then put it to an IndexRequest[2],
>  ElasticsearchSink will send the IndexRequests to the Elastic cluster,
> Elastic cluster will create corresponding index and flush the records.
>
> BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch
> sql connector [2], you can simply config 'connector.index' =
> ‘myindex_{ts_field|yyyy-MM-dd}’ to achieve your goals.
>
> Best,
> Leoanrd Xu
> [1]
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
>
>
>
>
> 在 2020年5月29日,02:43,aj <aj...@gmail.com> 写道:
>
> Hello All,
>
> I am getting many events in Kafka and I have written a link job that sinks
> that Avro records from Kafka to S3 in parquet format.
>
> Now, I want to sink these records into elastic search. but the only
> challenge is that I want to sink record on time indices. Basically, In
> Elastic, I want to create a per day index with the date as the suffix.
> So in Flink stream job if I create an es sink how will I change the sink
> to start writing  in a new index when the first event of the day arrives
>
> Thanks,
> Anuj.
>
>
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>
>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Flink Elastic Sink

Posted by Leonard Xu <xb...@gmail.com>.
Hi,aj

In the implementation of ElasticsearchSink, ElasticsearchSink  won't create index and only start a Elastic client for sending requests to
the Elastic cluster. You can simply extract the index(date value in your case) from your timestamp field and then put it to an IndexRequest[2],  ElasticsearchSink will send the IndexRequests to the Elastic cluster, Elastic cluster will create corresponding index and flush the records.

BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch sql connector [2], you can simply config 'connector.index' = ‘myindex_{ts_field|yyyy-MM-dd}’ to achieve your goals.

Best,
Leoanrd Xu
[1] https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119 <https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119> 
[2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector>    



> 在 2020年5月29日,02:43,aj <aj...@gmail.com> 写道:
> 
> Hello All,
> 
> I am getting many events in Kafka and I have written a link job that sinks that Avro records from Kafka to S3 in parquet format. 
> 
> Now, I want to sink these records into elastic search. but the only challenge is that I want to sink record on time indices. Basically, In Elastic, I want to create a per day index with the date as the suffix. 
> So in Flink stream job if I create an es sink how will I change the sink to start writing  in a new index when the first event of the day arrives
> 
> Thanks,
> Anuj. 
> 
> 
>  <http://www.oracle.com/>
> 
> 
>  <http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Flink Elastic Sink

Posted by Yangze Guo <ka...@gmail.com>.
Hi, Anuj.

From my understanding, you could send IndexRequest to the indexer in
`ElasticsearchSink`. It will create a document under the given index
and type. So, it seems you only need to get the timestamp and concat
the `date` to your index. Am I understanding that correctly? Or do you
want to emit only 1 record per day?

Best,
Yangze Guo

On Fri, May 29, 2020 at 2:43 AM aj <aj...@gmail.com> wrote:
>
> Hello All,
>
> I am getting many events in Kafka and I have written a link job that sinks that Avro records from Kafka to S3 in parquet format.
>
> Now, I want to sink these records into elastic search. but the only challenge is that I want to sink record on time indices. Basically, In Elastic, I want to create a per day index with the date as the suffix.
> So in Flink stream job if I create an es sink how will I change the sink to start writing  in a new index when the first event of the day arrives
>
> Thanks,
> Anuj.
>
>
>
>
>