You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fabian Hueske <fh...@gmail.com> on 2018/10/01 09:27:04 UTC

Re: In-Memory Lookup in Flink Operators

Hi Chirag,

Flink 1.5.0 added support for BroadcastState which should address your
requirement of replicating the data.  [1]
The replicated data is stored in the configured state backend which can
also be RocksDB.

Regarding the reload, I would recommend Lasse's approach of having a custom
source that pushes data in regular intervals instead.
One problem is that it is not possible to pause a stream until all data is
loaded. Instread, you would need to buffer that data in state as well and
work with start and end markers on the broadcast stream.

Best, Fabian

[1]
https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink


Am So., 30. Sep. 2018 um 10:48 Uhr schrieb Chirag Dewan <
chirag.dewan22@yahoo.in>:

> Thanks Lasse, that is rightly put. That's the only solution I can think of
> too.
>
> Only thing which I can't get my head around is using the coMap and
> coFlatMap functions with such a stream. Since they dont support side
> outputs, is there a way my lookup map/flatmap function simply consume a
> stream?
>
> Ken, thats an interesting solution actually. Is there any chance you need
> to update the memory-loaded data too?
>
> Thanks,
>
> Chirag
>
> On Sunday, 30 September, 2018, 5:17:51 AM IST, Ken Krugler <
> kkrugler_lists@transpac.com> wrote:
>
>
> Hi Lasse,
>
> One approach I’ve used in a similar situation is to have a “UnionedSource”
> wrapper that first emits the (bounded) data that will be loaded in-memory,
> and then starts running the source that emits the continuous stream of data.
>
> This outputs an Either<A, B>, which I then split, and broadcast the A, and
> key/partition the B.
>
> You could do something similar, but occasionally keep checking if there’s
> more <A> data vs. assuming it’s bounded.
>
> The main issue I ran into is that it doesn’t seem possible to do
> checkpointing, or at least I couldn’t think of a way to make this work
> properly.
>
> — Ken
>
>
> On Sep 27, 2018, at 9:50 PM, Lasse Nedergaard <la...@gmail.com>
> wrote:
>
> Hi.
>
> We have created our own database source that pools the data with a
> configured interval. We then use a co processed function. It takes to input
> one from our database and one from our data input. I require that you keyby
> with the attributes you use lookup in your map function.
> To delay your data input until your database lookup is done first time is
> not simple but a simple solution could be to implement a delay operation or
> keep the data in your process function until data arrive from your database
> stream.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 28. sep. 2018 kl. 06.28 skrev Chirag Dewan <ch...@yahoo.in>:
>
> Hi,
>
> I saw Apache Flink User Mailing List archive. - static/dynamic lookups in
> flink streaming
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/static-dynamic-lookups-in-flink-streaming-td10726.html> being
> discussed, and then I saw this FLIP
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
> .
>
> I know we havent made much progress on this topic. I still wanted to put
> forward my problem statement around this.
>
> I am also looking for a dynamic lookup in Flink operators. I actually want
> to pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into
> memory. Along with that, I have to ensure a refresh of in-memory lookup
> table periodically. The period being a configurable parameter.
>
> This is what a map operator would look like with lookup:
>
> -> Load in-memory lookup - Refresh timer start
> -> Stream processing start
> -> Call lookup
> -> Use lookup result in Stream processing
> -> Timer elapsed -> Reload lookup data source into in-memory table
> -> Continue processing
>
>
>  My concern around these are :
>
> 1) Possibly storing the same copy of data in every Task slots memory or
> state backend(RocksDB in my case).
> 2) Having a dedicated refresh thread for each subtask instance(possibly,
> every Task Manager having multiple refresh thread)
>
> Am i thinking in the right direction? Or missing something very obvious?
> It confusing.
>
> Any leads are much appreciated. Thanks in advance.
>
> Cheers,
> Chirag
>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Re: In-Memory Lookup in Flink Operators

Posted by Chirag Dewan <ch...@yahoo.in>.
 Thanks a lot, David and Fabian.
I will give this a try.
Cheers,Chirag
   On Monday, 1 October, 2018, 3:48:42 PM IST, David Anderson <da...@data-artisans.com> wrote: 
 
 Hi Chirag,
The community is also looking at an approach that involves using Bravo[1][2] to bootstrap state by loading the initial version of the state into a savepoint.
[1] https://github.com/king/bravo
[2] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Proposal-Utilities-for-reading-transforming-and-creating-Streaming-savepoints-td23843.html#a23854
On Mon, Oct 1, 2018 at 11:27 AM Fabian Hueske <fh...@gmail.com> wrote:

Hi Chirag,
Flink 1.5.0 added support for BroadcastState which should address your requirement of replicating the data.  [1]
The replicated data is stored in the configured state backend which can also be RocksDB.
Regarding the reload, I would recommend Lasse's approach of having a custom source that pushes data in regular intervals instead.One problem is that it is not possible to pause a stream until all data is loaded. Instread, you would need to buffer that data in state as well and work with start and end markers on the broadcast stream.

Best, Fabian

[1] https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink


Am So., 30. Sep. 2018 um 10:48 Uhr schrieb Chirag Dewan <ch...@yahoo.in>:

 Thanks Lasse, that is rightly put. That's the only solution I can think of too.
Only thing which I can't get my head around is using the coMap and coFlatMap functions with such a stream. Since they dont support side outputs, is there a way my lookup map/flatmap function simply consume a stream? 
Ken, thats an interesting solution actually. Is there any chance you need to update the memory-loaded data too? 
Thanks,
Chirag
   On Sunday, 30 September, 2018, 5:17:51 AM IST, Ken Krugler <kk...@transpac.com> wrote: 
 
 Hi Lasse,
One approach I’ve used in a similar situation is to have a “UnionedSource” wrapper that first emits the (bounded) data that will be loaded in-memory, and then starts running the source that emits the continuous stream of data.
This outputs an Either<A, B>, which I then split, and broadcast the A, and key/partition the B.
You could do something similar, but occasionally keep checking if there’s more <A> data vs. assuming it’s bounded.
The main issue I ran into is that it doesn’t seem possible to do checkpointing, or at least I couldn’t think of a way to make this work properly.
— Ken


On Sep 27, 2018, at 9:50 PM, Lasse Nedergaard <la...@gmail.com> wrote:

Hi. 
We have created our own database source that pools the data with a configured interval. We then use a co processed function. It takes to input one from our database and one from our data input. I require that you keyby with the attributes you use lookup in your map function. To delay your data input until your database lookup is done first time is not simple but a simple solution could be to implement a delay operation or keep the data in your process function until data arrive from your database stream. 

Med venlig hilsen / Best regardsLasse Nedergaard

Den 28. sep. 2018 kl. 06.28 skrev Chirag Dewan <ch...@yahoo.in>:


Hi,
I saw Apache Flink User Mailing List archive. - static/dynamic lookups in flink streaming being discussed, and then I saw this FLIP https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API. 
I know we havent made much progress on this topic. I still wanted to put forward my problem statement around this. 
I am also looking for a dynamic lookup in Flink operators. I actually want to pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc. into memory. Along with that, I have to ensure a refresh of in-memory lookup table periodically. The period being a configurable parameter. 
This is what a map operator would look like with lookup: 
-> Load in-memory lookup - Refresh timer start-> Stream processing start-> Call lookup-> Use lookup result in Stream processing
-> Timer elapsed -> Reload lookup data source into in-memory table-> Continue processing

 My concern around these are : 
1) Possibly storing the same copy of data in every Task slots memory or state backend(RocksDB in my case).2) Having a dedicated refresh thread for each subtask instance(possibly, every Task Manager having multiple refresh thread)
Am i thinking in the right direction? Or missing something very obvious? It confusing.
Any leads are much appreciated. Thanks in advance.
Cheers, Chirag

--------------------------Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra
  



-- 
David Anderson | Training Coordinator | data Artisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time  

Re: In-Memory Lookup in Flink Operators

Posted by David Anderson <da...@data-artisans.com>.
Hi Chirag,

The community is also looking at an approach that involves using
Bravo[1][2] to bootstrap state by loading the initial version of the state
into a savepoint.

[1] https://github.com/king/bravo
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Proposal-Utilities-for-reading-transforming-and-creating-Streaming-savepoints-td23843.html#a23854

On Mon, Oct 1, 2018 at 11:27 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Chirag,
>
> Flink 1.5.0 added support for BroadcastState which should address your
> requirement of replicating the data.  [1]
> The replicated data is stored in the configured state backend which can
> also be RocksDB.
>
> Regarding the reload, I would recommend Lasse's approach of having a
> custom source that pushes data in regular intervals instead.
> One problem is that it is not possible to pause a stream until all data is
> loaded. Instread, you would need to buffer that data in state as well and
> work with start and end markers on the broadcast stream.
>
> Best, Fabian
>
> [1]
> https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
>
>
> Am So., 30. Sep. 2018 um 10:48 Uhr schrieb Chirag Dewan <
> chirag.dewan22@yahoo.in>:
>
>> Thanks Lasse, that is rightly put. That's the only solution I can think
>> of too.
>>
>> Only thing which I can't get my head around is using the coMap and
>> coFlatMap functions with such a stream. Since they dont support side
>> outputs, is there a way my lookup map/flatmap function simply consume a
>> stream?
>>
>> Ken, thats an interesting solution actually. Is there any chance you need
>> to update the memory-loaded data too?
>>
>> Thanks,
>>
>> Chirag
>>
>> On Sunday, 30 September, 2018, 5:17:51 AM IST, Ken Krugler <
>> kkrugler_lists@transpac.com> wrote:
>>
>>
>> Hi Lasse,
>>
>> One approach I’ve used in a similar situation is to have a
>> “UnionedSource” wrapper that first emits the (bounded) data that will be
>> loaded in-memory, and then starts running the source that emits the
>> continuous stream of data.
>>
>> This outputs an Either<A, B>, which I then split, and broadcast the A,
>> and key/partition the B.
>>
>> You could do something similar, but occasionally keep checking if there’s
>> more <A> data vs. assuming it’s bounded.
>>
>> The main issue I ran into is that it doesn’t seem possible to do
>> checkpointing, or at least I couldn’t think of a way to make this work
>> properly.
>>
>> — Ken
>>
>>
>> On Sep 27, 2018, at 9:50 PM, Lasse Nedergaard <la...@gmail.com>
>> wrote:
>>
>> Hi.
>>
>> We have created our own database source that pools the data with a
>> configured interval. We then use a co processed function. It takes to input
>> one from our database and one from our data input. I require that you keyby
>> with the attributes you use lookup in your map function.
>> To delay your data input until your database lookup is done first time is
>> not simple but a simple solution could be to implement a delay operation or
>> keep the data in your process function until data arrive from your database
>> stream.
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>
>> Den 28. sep. 2018 kl. 06.28 skrev Chirag Dewan <ch...@yahoo.in>:
>>
>> Hi,
>>
>> I saw Apache Flink User Mailing List archive. - static/dynamic lookups
>> in flink streaming
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/static-dynamic-lookups-in-flink-streaming-td10726.html> being
>> discussed, and then I saw this FLIP
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>> .
>>
>> I know we havent made much progress on this topic. I still wanted to put
>> forward my problem statement around this.
>>
>> I am also looking for a dynamic lookup in Flink operators. I actually
>> want to pre-fetch various Data Sources, like DB, Filesystem, Cassandra etc.
>> into memory. Along with that, I have to ensure a refresh of in-memory
>> lookup table periodically. The period being a configurable parameter.
>>
>> This is what a map operator would look like with lookup:
>>
>> -> Load in-memory lookup - Refresh timer start
>> -> Stream processing start
>> -> Call lookup
>> -> Use lookup result in Stream processing
>> -> Timer elapsed -> Reload lookup data source into in-memory table
>> -> Continue processing
>>
>>
>>  My concern around these are :
>>
>> 1) Possibly storing the same copy of data in every Task slots memory or
>> state backend(RocksDB in my case).
>> 2) Having a dedicated refresh thread for each subtask instance(possibly,
>> every Task Manager having multiple refresh thread)
>>
>> Am i thinking in the right direction? Or missing something very obvious?
>> It confusing.
>>
>> Any leads are much appreciated. Thanks in advance.
>>
>> Cheers,
>> Chirag
>>
>>
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>>
>>

-- 
*David Anderson* | Training Coordinator | data Artisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time