You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by criss <ct...@gmail.com> on 2016/11/11 13:09:49 UTC

Kafka Stream to Database batch inserts

Hello,

I'm new to Flink and I need some advicees regarding the best approach to do
the following:
- read some items from a Kafka topic
- on Flink stream side, after some simple filtering steps, group these items
in batches by flink processing time.
- insert the items in a PostgreSql database using a batch insert.

I did this by using a time window of 1 second and added a custom sink which
collects items in a blocking queue. Additionally I need to have a separate
thread which triggers the commit to the database after some time, smaller
than window's time.

The solution works, but i am not very pleased with it because it looks very
complicated for a simple batching items task.

Is there any way to trigger the commit directly when the window is closed? I
didn't find any solution to get notified when the window is completed. I
would like to get rid of this separate thread only for triggering the batch
insert.

Any other possible solution would be highly appreciated. :)
Thanks



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Stream-to-Database-batch-inserts-tp10036.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Kafka Stream to Database batch inserts

Posted by criss <ct...@gmail.com>.
Hi,

Thank you very much for you hit! It works pretty well.




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Stream-to-Database-batch-inserts-tp10036p10140.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Kafka Stream to Database batch inserts

Posted by Ufuk Celebi <uc...@apache.org>.
You can specify a custom trigger that extends the default ProcessingTimeTrigger (if you are working with processing time) or EventTimeTrigger (if you are working with event time).

You do it like this:

stream.timeWindow(Time.of(1, SECONDS)).trigger(new MyTrigger())

Check out the Trigger implementations to get the behaviour you want ([1], [2], [3]).

@Kostas: Is there another way?

– Ufuk

[1] https://github.com/apache/flink/blob/release-1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java

[2] https://github.com/apache/flink/blob/release-1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java

[3] https://github.com/apache/flink/blob/release-1.1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java

On 11 November 2016 at 14:20:38, criss (ctinmota@gmail.com) wrote:
> Hello,
>  
> I'm new to Flink and I need some advicees regarding the best approach to do
> the following:
> - read some items from a Kafka topic
> - on Flink stream side, after some simple filtering steps, group these items
> in batches by flink processing time.
> - insert the items in a PostgreSql database using a batch insert.
>  
> I did this by using a time window of 1 second and added a custom sink which
> collects items in a blocking queue. Additionally I need to have a separate
> thread which triggers the commit to the database after some time, smaller
> than window's time.
>  
> The solution works, but i am not very pleased with it because it looks very
> complicated for a simple batching items task.
>  
> Is there any way to trigger the commit directly when the window is closed? I
> didn't find any solution to get notified when the window is completed. I
> would like to get rid of this separate thread only for triggering the batch
> insert.
>  
> Any other possible solution would be highly appreciated. :)
> Thanks
>  
>  
>  
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Stream-to-Database-batch-inserts-tp10036.html  
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.  
>