You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sand Stone <sa...@gmail.com> on 2017/05/18 23:11:03 UTC

Best practices to maintain reference data for Flink Jobs

Hi. Say I have a few reference data sets need to be used for a
streaming job. The sizes range between 10M-10GB. The data is not
static, will be refreshed at minutes and/or day intervals.

With the new advancements in Flink, it seems there are quite a few options.
   A. Store all the data in an external (kv) database cluster. And use
async io calls
          * data refresh can be done in a few different ways
   B. Use the new Querytable State feature
            * it seems there is no "easy" API to discover the
queryable state at the moment. Need to use the restful API to figure
out the job id.
   C. Ingest the reference data into the job and cache them in memory
Any other option?

On paper, it seems option B with the Queryable State is the cleanest solution.

Any comment/suggestion is greatly appreciated in particular in terms
of robustness and consistent recovery.

Thanks much!

Re: Best practices to maintain reference data for Flink Jobs

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

if you need range queries for the lookups, you can only use Option A (async
calls to an external store).
Queryable State only supports key lookups but no range queries.

Since version 1.2.0, Flink has a dedicated function type for async calls
[1].
This might be helpful to implement your usecase.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/asyncio.html

2017-05-19 19:39 GMT+02:00 Sand Stone <sa...@gmail.com>:

> Also, took a quick read on side input. it's unclear to me how side
> input could solve this issue better.
>
> At a high level, this is what I have in mind:
>                 flatmap(byte[] value, Collector<> output) {
>                    var iter = someStoreStateObject.seek(akeyprefix);
> //or seek(akeyprefix, akeysuffix);
>                     for(byte[] key : iter) {}
>                 }
>
> Thanks for your time!
>
>
> On Fri, May 19, 2017 at 10:03 AM, Sand Stone <sa...@gmail.com>
> wrote:
> > Thanks Gordon and Fabian.
> >
> > The enriching data is really reference data, e.g. the reverseIP
> > database. It's hard to be keyed as the main data stream as the "ip
> > address" in the event is not a primary key in the main data stream.
> >
> > QueryableState is close, but it does not support range scan as far as
> > I could tell. The remote datastore has a clean semantics: a logical
> > single copy plus supports range scan, but the RPC to another cluster
> > is not optimal.
> >
> > I assume this is a quite common streaming processing pattern for Flink
> > based services.
> >
> >
> > On Fri, May 19, 2017 at 2:08 AM, Fabian Hueske <fh...@gmail.com>
> wrote:
> >> +1 to what Gordon said.
> >>
> >> Queryable state is rather meant as an external interface to streaming
> jobs
> >> than for lookups within jobs.
> >> Accessing co-located state should give you better performance and is
> >> probably easier to implement and maintain.
> >>
> >> Cheers,
> >> Fabian
> >>
> >> 2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:
> >>>
> >>> Hi,
> >>>
> >>> Can the enriching data be keyed? Or is it something that has to be
> >>> broadcasted to each operator?
> >>> Either way, I think Side Inputs (an upcoming feature in the future) is
> the
> >>> best fit for this. You can take a look at
> >>> https://issues.apache.org/jira/browse/FLINK-6131.
> >>>
> >>> Regarding the 3 options you listed:
> >>>
> >>> By using QueryableState in option B, what you mean is that you want to
> >>> feed the enriching data stream to a separate job, let that job allow
> >>> queryable state, and query that state from the actual application job
> >>> operators, correct? If so, I think options A and B would mean the same
> >>> thing; i.e., they require accessing data external to the job.
> >>>
> >>> If the enriching data can somehow be keyed with the stream that
> requires
> >>> it, I would go for option C using connected streams, with the
> enriching data
> >>> as one input and the actual data as the other. Instead of just
> “caching the
> >>> enriching data in memory”, you should register it as a managed Link
> state
> >>> for the CoMapFunction / CoFlatMapFunction. The actual input stream
> records
> >>> can just access that registered state locally.
> >>>
> >>> Cheers,
> >>> Gordon
> >>>
> >>>
> >>> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.stone@gmail.com)
> wrote:
> >>>
> >>> Hi. Say I have a few reference data sets need to be used for a
> >>> streaming job. The sizes range between 10M-10GB. The data is not
> >>> static, will be refreshed at minutes and/or day intervals.
> >>>
> >>> With the new advancements in Flink, it seems there are quite a few
> >>> options.
> >>> A. Store all the data in an external (kv) database cluster. And use
> >>> async io calls
> >>> * data refresh can be done in a few different ways
> >>> B. Use the new Querytable State feature
> >>> * it seems there is no "easy" API to discover the
> >>> queryable state at the moment. Need to use the restful API to figure
> >>> out the job id.
> >>> C. Ingest the reference data into the job and cache them in memory
> >>> Any other option?
> >>>
> >>> On paper, it seems option B with the Queryable State is the cleanest
> >>> solution.
> >>>
> >>> Any comment/suggestion is greatly appreciated in particular in terms
> >>> of robustness and consistent recovery.
> >>>
> >>> Thanks much!
> >>
> >>
>

Re: Best practices to maintain reference data for Flink Jobs

Posted by Sand Stone <sa...@gmail.com>.
Also, took a quick read on side input. it's unclear to me how side
input could solve this issue better.

At a high level, this is what I have in mind:
                flatmap(byte[] value, Collector<> output) {
                   var iter = someStoreStateObject.seek(akeyprefix);
//or seek(akeyprefix, akeysuffix);
                    for(byte[] key : iter) {}
                }

Thanks for your time!


On Fri, May 19, 2017 at 10:03 AM, Sand Stone <sa...@gmail.com> wrote:
> Thanks Gordon and Fabian.
>
> The enriching data is really reference data, e.g. the reverseIP
> database. It's hard to be keyed as the main data stream as the "ip
> address" in the event is not a primary key in the main data stream.
>
> QueryableState is close, but it does not support range scan as far as
> I could tell. The remote datastore has a clean semantics: a logical
> single copy plus supports range scan, but the RPC to another cluster
> is not optimal.
>
> I assume this is a quite common streaming processing pattern for Flink
> based services.
>
>
> On Fri, May 19, 2017 at 2:08 AM, Fabian Hueske <fh...@gmail.com> wrote:
>> +1 to what Gordon said.
>>
>> Queryable state is rather meant as an external interface to streaming jobs
>> than for lookups within jobs.
>> Accessing co-located state should give you better performance and is
>> probably easier to implement and maintain.
>>
>> Cheers,
>> Fabian
>>
>> 2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:
>>>
>>> Hi,
>>>
>>> Can the enriching data be keyed? Or is it something that has to be
>>> broadcasted to each operator?
>>> Either way, I think Side Inputs (an upcoming feature in the future) is the
>>> best fit for this. You can take a look at
>>> https://issues.apache.org/jira/browse/FLINK-6131.
>>>
>>> Regarding the 3 options you listed:
>>>
>>> By using QueryableState in option B, what you mean is that you want to
>>> feed the enriching data stream to a separate job, let that job allow
>>> queryable state, and query that state from the actual application job
>>> operators, correct? If so, I think options A and B would mean the same
>>> thing; i.e., they require accessing data external to the job.
>>>
>>> If the enriching data can somehow be keyed with the stream that requires
>>> it, I would go for option C using connected streams, with the enriching data
>>> as one input and the actual data as the other. Instead of just “caching the
>>> enriching data in memory”, you should register it as a managed Link state
>>> for the CoMapFunction / CoFlatMapFunction. The actual input stream records
>>> can just access that registered state locally.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.stone@gmail.com) wrote:
>>>
>>> Hi. Say I have a few reference data sets need to be used for a
>>> streaming job. The sizes range between 10M-10GB. The data is not
>>> static, will be refreshed at minutes and/or day intervals.
>>>
>>> With the new advancements in Flink, it seems there are quite a few
>>> options.
>>> A. Store all the data in an external (kv) database cluster. And use
>>> async io calls
>>> * data refresh can be done in a few different ways
>>> B. Use the new Querytable State feature
>>> * it seems there is no "easy" API to discover the
>>> queryable state at the moment. Need to use the restful API to figure
>>> out the job id.
>>> C. Ingest the reference data into the job and cache them in memory
>>> Any other option?
>>>
>>> On paper, it seems option B with the Queryable State is the cleanest
>>> solution.
>>>
>>> Any comment/suggestion is greatly appreciated in particular in terms
>>> of robustness and consistent recovery.
>>>
>>> Thanks much!
>>
>>

Re: Best practices to maintain reference data for Flink Jobs

Posted by Sand Stone <sa...@gmail.com>.
Thanks Gordon and Fabian.

The enriching data is really reference data, e.g. the reverseIP
database. It's hard to be keyed as the main data stream as the "ip
address" in the event is not a primary key in the main data stream.

QueryableState is close, but it does not support range scan as far as
I could tell. The remote datastore has a clean semantics: a logical
single copy plus supports range scan, but the RPC to another cluster
is not optimal.

I assume this is a quite common streaming processing pattern for Flink
based services.


On Fri, May 19, 2017 at 2:08 AM, Fabian Hueske <fh...@gmail.com> wrote:
> +1 to what Gordon said.
>
> Queryable state is rather meant as an external interface to streaming jobs
> than for lookups within jobs.
> Accessing co-located state should give you better performance and is
> probably easier to implement and maintain.
>
> Cheers,
> Fabian
>
> 2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:
>>
>> Hi,
>>
>> Can the enriching data be keyed? Or is it something that has to be
>> broadcasted to each operator?
>> Either way, I think Side Inputs (an upcoming feature in the future) is the
>> best fit for this. You can take a look at
>> https://issues.apache.org/jira/browse/FLINK-6131.
>>
>> Regarding the 3 options you listed:
>>
>> By using QueryableState in option B, what you mean is that you want to
>> feed the enriching data stream to a separate job, let that job allow
>> queryable state, and query that state from the actual application job
>> operators, correct? If so, I think options A and B would mean the same
>> thing; i.e., they require accessing data external to the job.
>>
>> If the enriching data can somehow be keyed with the stream that requires
>> it, I would go for option C using connected streams, with the enriching data
>> as one input and the actual data as the other. Instead of just “caching the
>> enriching data in memory”, you should register it as a managed Link state
>> for the CoMapFunction / CoFlatMapFunction. The actual input stream records
>> can just access that registered state locally.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.stone@gmail.com) wrote:
>>
>> Hi. Say I have a few reference data sets need to be used for a
>> streaming job. The sizes range between 10M-10GB. The data is not
>> static, will be refreshed at minutes and/or day intervals.
>>
>> With the new advancements in Flink, it seems there are quite a few
>> options.
>> A. Store all the data in an external (kv) database cluster. And use
>> async io calls
>> * data refresh can be done in a few different ways
>> B. Use the new Querytable State feature
>> * it seems there is no "easy" API to discover the
>> queryable state at the moment. Need to use the restful API to figure
>> out the job id.
>> C. Ingest the reference data into the job and cache them in memory
>> Any other option?
>>
>> On paper, it seems option B with the Queryable State is the cleanest
>> solution.
>>
>> Any comment/suggestion is greatly appreciated in particular in terms
>> of robustness and consistent recovery.
>>
>> Thanks much!
>
>

Re: Best practices to maintain reference data for Flink Jobs

Posted by Fabian Hueske <fh...@gmail.com>.
+1 to what Gordon said.

Queryable state is rather meant as an external interface to streaming jobs
than for lookups within jobs.
Accessing co-located state should give you better performance and is
probably easier to implement and maintain.

Cheers,
Fabian

2017-05-19 7:43 GMT+02:00 Tzu-Li (Gordon) Tai <tz...@apache.org>:

> Hi,
>
> Can the enriching data be keyed? Or is it something that has to be
> broadcasted to each operator?
> Either way, I think Side Inputs (an upcoming feature in the future) is the
> best fit for this. You can take a look at https://issues.apache.org/
> jira/browse/FLINK-6131.
>
> Regarding the 3 options you listed:
>
> By using QueryableState in option B, what you mean is that you want to
> feed the enriching data stream to a separate job, let that job allow
> queryable state, and query that state from the actual application job
> operators, correct? If so, I think options A and B would mean the same
> thing; i.e., they require accessing data external to the job.
>
> If the enriching data can somehow be keyed with the stream that requires
> it, I would go for option C using connected streams, with the enriching
> data as one input and the actual data as the other. Instead of just
> “caching the enriching data in memory”, you should register it as a managed
> Link state for the CoMapFunction / CoFlatMapFunction. The actual input
> stream records can just access that registered state locally.
>
> Cheers,
> Gordon
>
>
> On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.stone@gmail.com) wrote:
>
> Hi. Say I have a few reference data sets need to be used for a
> streaming job. The sizes range between 10M-10GB. The data is not
> static, will be refreshed at minutes and/or day intervals.
>
> With the new advancements in Flink, it seems there are quite a few
> options.
> A. Store all the data in an external (kv) database cluster. And use
> async io calls
> * data refresh can be done in a few different ways
> B. Use the new Querytable State feature
> * it seems there is no "easy" API to discover the
> queryable state at the moment. Need to use the restful API to figure
> out the job id.
> C. Ingest the reference data into the job and cache them in memory
> Any other option?
>
> On paper, it seems option B with the Queryable State is the cleanest
> solution.
>
> Any comment/suggestion is greatly appreciated in particular in terms
> of robustness and consistent recovery.
>
> Thanks much!
>
>

Re: Best practices to maintain reference data for Flink Jobs

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

Can the enriching data be keyed? Or is it something that has to be broadcasted to each operator?
Either way, I think Side Inputs (an upcoming feature in the future) is the best fit for this. You can take a look at https://issues.apache.org/jira/browse/FLINK-6131.

Regarding the 3 options you listed:

By using QueryableState in option B, what you mean is that you want to feed the enriching data stream to a separate job, let that job allow queryable state, and query that state from the actual application job operators, correct? If so, I think options A and B would mean the same thing; i.e., they require accessing data external to the job. 

If the enriching data can somehow be keyed with the stream that requires it, I would go for option C using connected streams, with the enriching data as one input and the actual data as the other. Instead of just “caching the enriching data in memory”, you should register it as a managed Link state for the CoMapFunction / CoFlatMapFunction. The actual input stream records can just access that registered state locally.

Cheers,
Gordon

On 19 May 2017 at 7:11:07 AM, Sand Stone (sand.m.stone@gmail.com) wrote:

Hi. Say I have a few reference data sets need to be used for a  
streaming job. The sizes range between 10M-10GB. The data is not  
static, will be refreshed at minutes and/or day intervals.  

With the new advancements in Flink, it seems there are quite a few options.  
A. Store all the data in an external (kv) database cluster. And use  
async io calls  
* data refresh can be done in a few different ways  
B. Use the new Querytable State feature  
* it seems there is no "easy" API to discover the  
queryable state at the moment. Need to use the restful API to figure  
out the job id.  
C. Ingest the reference data into the job and cache them in memory  
Any other option?  

On paper, it seems option B with the Queryable State is the cleanest solution.  

Any comment/suggestion is greatly appreciated in particular in terms  
of robustness and consistent recovery.  

Thanks much!