You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Peter Lappo <pe...@systematicmethods.com> on 2017/09/10 21:59:59 UTC

ETL with changing reference data

hi,
We are building an ETL style application in Flink that consumes records from a file or a message bus as a DataStream. We are transforming records using SQL and UDFs. The UDF loads reference data in the open method and currently the data loaded remains in memory until the job is cancelled. The eval method of the UDF is used to do the actual transformation on a particular field.
So of course reference data changes and data will need to reprocessed. Lets assume we can identify and resubmit records for reprocessing what is the best design that
* keeps the Flink job running
* reloads the changed reference data
so that records are reprocessed in a deterministic fashion

Two options spring to mind
1) send a control record to the stream that reloads reference data or part of it and ensure resubmitted records are processed after the reload message
2) use a separate thread to poll the reference data source and reload any changed data which will of course suffer from race conditions

Or is there a better way of solving this type of problem with Flink?

Thanks
Peter

Re: ETL with changing reference data

Posted by Chen Qin <qi...@gmail.com>.
Hi Peter,

If I understand correctly, I think you are facing a delima of having
efficient dynamic referencing as well as salable processing.
I don't have answer to how thing would work for your specific case. Yet
this is just interesting topic to discuss.

Fabian provides insights and I would like to adds some salt on his idea. I
think you already point out in 1)

Can referencing update comes with meaningful event time-stamps and merge to
main stream event time.
If so, can you buffer those enriched data as well as main pipeline data
with windows and possible to do retraction at certain time point with
control messages.
That will generally means less efficient states management and varies if
these extra overhead is acceptable. Thanks to incremental checkpoint, this
is less problem compare to 1.2.

I would not recommend solving problem outside of flink primitives as you
will find rebuild everything flink tried to solve by yourself.
That usually ends up with building a customized streaming system by
yourself.

Thanks,
Chen

On Thu, Sep 14, 2017 at 6:36 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Peter,
>
> in principle, joining the data stream with the reference data would be the
> most natural approach to enrich data.
> However, stream joins for the Table API / SQL are currently under
> development and not available yet.
>
> You can of course try to solve the issue manually using UDFs but this will
> require many tweaks and might be fragile.
> I would setup the input as a union of two tables. The first is the regular
> data stream, the second one is used to feed in records that need to be
> reprocessed.
> The enriching UDFs could poll for updates and load them.
> In this approach, you need to somehow synchronize updating the reference
> data and feeding in the records to reprocess to ensure that all functions
> have the latest version of the data when reprocessing.
>
> Best, Fabian
>
>
>
> 2017-09-11 23:01 GMT+02:00 Peter Lappo <pe...@systematicmethods.com>:
>
>> Thanks Chen
>> We add our reference data to a JVM global hash map, one map per reference
>> data type (we found flink table joins were too slow as they were doing a
>> table scan) so a side pipeline to update the reference data is a nice idea
>> but may suffer from concurrency issues. If there are pending records to
>> process these may get processed before the reference data update especially
>> if fetching reference data is slow, as is in our case.
>>
>> Having said that processing reference data sequentially in the main
>> pipeline doesn’t help either if there is more than one parallel data stream.
>>
>>
>> On 11 Sep 2017, at 02:29, qinnchen@gmail.com wrote:
>>
>> Hi Peter,
>>
>> I think what you referred is typical amendment process where partial or
>> all results need to modified. I think it is definitely interesting topic!
>> Here is my two cents
>>
>> In ideal world, reference data source can ingest updated used values as
>> events and join with buffered events in windows . (it’s a bit counter
>> intuitive, but think there is a magic function where we ingest all
>> reference data as stream instead of doing on demand rpc)
>>
>> Unfortunately, in lots of use cases, it seems hard to know exactly how
>> reference data source used and dump reference data costs too much. So
>> replay pipeline might be cheapest way to get things done in general.
>>
>> In some cases,  results are partitioned and bounded. It makes possible to
>> recomputed within bounded windows, that may requires a bit work to
>> customize window which hold longer than watermark pass its endtime. I
>> remember there was a Jira talk about retraction.
>> In other cases, results are derived from long history which makes not
>> rationale to keep. A side pipeline capture those events with late arriving
>> event handling might interact with external storage and amend results.
>>
>> Thanks,
>> Chen
>>
>>
>> *From: *Peter Lappo <pe...@systematicmethods.com>
>> *Sent: *Sunday, September 10, 2017 3:00 PM
>> *To: *user@flink.apache.org
>> *Subject: *ETL with changing reference data
>>
>> hi,
>> We are building an ETL style application in Flink that consumes records
>> from a file or a message bus as a DataStream. We are transforming records
>> using SQL and UDFs. The UDF loads reference data in the open method and
>> currently the data loaded remains in memory until the job is cancelled. The
>> eval method of the UDF is used to do the actual transformation on a
>> particular field.
>> So of course reference data changes and data will need to reprocessed.
>> Lets assume we can identify and resubmit records for reprocessing what is
>> the best design that
>> * keeps the Flink job running
>> * reloads the changed reference data
>> so that records are reprocessed in a deterministic fashion
>>
>> Two options spring to mind
>> 1) send a control record to the stream that reloads reference data or
>> part of it and ensure resubmitted records are processed after the reload
>> message
>> 2) use a separate thread to poll the reference data source and reload any
>> changed data which will of course suffer from race conditions
>>
>> Or is there a better way of solving this type of problem with Flink?
>>
>> Thanks
>> Peter
>>
>>
>>
>

Re: ETL with changing reference data

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Peter,

in principle, joining the data stream with the reference data would be the
most natural approach to enrich data.
However, stream joins for the Table API / SQL are currently under
development and not available yet.

You can of course try to solve the issue manually using UDFs but this will
require many tweaks and might be fragile.
I would setup the input as a union of two tables. The first is the regular
data stream, the second one is used to feed in records that need to be
reprocessed.
The enriching UDFs could poll for updates and load them.
In this approach, you need to somehow synchronize updating the reference
data and feeding in the records to reprocess to ensure that all functions
have the latest version of the data when reprocessing.

Best, Fabian



2017-09-11 23:01 GMT+02:00 Peter Lappo <pe...@systematicmethods.com>:

> Thanks Chen
> We add our reference data to a JVM global hash map, one map per reference
> data type (we found flink table joins were too slow as they were doing a
> table scan) so a side pipeline to update the reference data is a nice idea
> but may suffer from concurrency issues. If there are pending records to
> process these may get processed before the reference data update especially
> if fetching reference data is slow, as is in our case.
>
> Having said that processing reference data sequentially in the main
> pipeline doesn’t help either if there is more than one parallel data stream.
>
>
> On 11 Sep 2017, at 02:29, qinnchen@gmail.com wrote:
>
> Hi Peter,
>
> I think what you referred is typical amendment process where partial or
> all results need to modified. I think it is definitely interesting topic!
> Here is my two cents
>
> In ideal world, reference data source can ingest updated used values as
> events and join with buffered events in windows . (it’s a bit counter
> intuitive, but think there is a magic function where we ingest all
> reference data as stream instead of doing on demand rpc)
>
> Unfortunately, in lots of use cases, it seems hard to know exactly how
> reference data source used and dump reference data costs too much. So
> replay pipeline might be cheapest way to get things done in general.
>
> In some cases,  results are partitioned and bounded. It makes possible to
> recomputed within bounded windows, that may requires a bit work to
> customize window which hold longer than watermark pass its endtime. I
> remember there was a Jira talk about retraction.
> In other cases, results are derived from long history which makes not
> rationale to keep. A side pipeline capture those events with late arriving
> event handling might interact with external storage and amend results.
>
> Thanks,
> Chen
>
>
> *From: *Peter Lappo <pe...@systematicmethods.com>
> *Sent: *Sunday, September 10, 2017 3:00 PM
> *To: *user@flink.apache.org
> *Subject: *ETL with changing reference data
>
> hi,
> We are building an ETL style application in Flink that consumes records
> from a file or a message bus as a DataStream. We are transforming records
> using SQL and UDFs. The UDF loads reference data in the open method and
> currently the data loaded remains in memory until the job is cancelled. The
> eval method of the UDF is used to do the actual transformation on a
> particular field.
> So of course reference data changes and data will need to reprocessed.
> Lets assume we can identify and resubmit records for reprocessing what is
> the best design that
> * keeps the Flink job running
> * reloads the changed reference data
> so that records are reprocessed in a deterministic fashion
>
> Two options spring to mind
> 1) send a control record to the stream that reloads reference data or part
> of it and ensure resubmitted records are processed after the reload message
> 2) use a separate thread to poll the reference data source and reload any
> changed data which will of course suffer from race conditions
>
> Or is there a better way of solving this type of problem with Flink?
>
> Thanks
> Peter
>
>
>

Re: ETL with changing reference data

Posted by Peter Lappo <pe...@systematicmethods.com>.
Thanks Chen
We add our reference data to a JVM global hash map, one map per reference data type (we found flink table joins were too slow as they were doing a table scan) so a side pipeline to update the reference data is a nice idea but may suffer from concurrency issues. If there are pending records to process these may get processed before the reference data update especially if fetching reference data is slow, as is in our case.

Having said that processing reference data sequentially in the main pipeline doesn’t help either if there is more than one parallel data stream.


> On 11 Sep 2017, at 02:29, qinnchen@gmail.com wrote:
> 
> Hi Peter,
>  
> I think what you referred is typical amendment process where partial or all results need to modified. I think it is definitely interesting topic! Here is my two cents
>  
> In ideal world, reference data source can ingest updated used values as events and join with buffered events in windows . (it’s a bit counter intuitive, but think there is a magic function where we ingest all reference data as stream instead of doing on demand rpc)
>  
> Unfortunately, in lots of use cases, it seems hard to know exactly how reference data source used and dump reference data costs too much. So replay pipeline might be cheapest way to get things done in general.
>  
> In some cases,  results are partitioned and bounded. It makes possible to recomputed within bounded windows, that may requires a bit work to customize window which hold longer than watermark pass its endtime. I remember there was a Jira talk about retraction.  
> In other cases, results are derived from long history which makes not rationale to keep. A side pipeline capture those events with late arriving event handling might interact with external storage and amend results.
>  
> Thanks,
> Chen
>  
> From: Peter Lappo <ma...@systematicmethods.com>
> Sent: Sunday, September 10, 2017 3:00 PM
> To: user@flink.apache.org <ma...@flink.apache.org>
> Subject: ETL with changing reference data
>  
> hi,
> We are building an ETL style application in Flink that consumes records from a file or a message bus as a DataStream. We are transforming records using SQL and UDFs. The UDF loads reference data in the open method and currently the data loaded remains in memory until the job is cancelled. The eval method of the UDF is used to do the actual transformation on a particular field.
> So of course reference data changes and data will need to reprocessed. Lets assume we can identify and resubmit records for reprocessing what is the best design that
> * keeps the Flink job running
> * reloads the changed reference data
> so that records are reprocessed in a deterministic fashion
>  
> Two options spring to mind
> 1) send a control record to the stream that reloads reference data or part of it and ensure resubmitted records are processed after the reload message
> 2) use a separate thread to poll the reference data source and reload any changed data which will of course suffer from race conditions
>  
> Or is there a better way of solving this type of problem with Flink?
>  
> Thanks
> Peter


RE: ETL with changing reference data

Posted by qi...@gmail.com.
Hi Peter,

I think what you referred is typical amendment process where partial or all results need to modified. I think it is definitely interesting topic! Here is my two cents 

In ideal world, reference data source can ingest updated used values as events and join with buffered events in windows . (it’s a bit counter intuitive, but think there is a magic function where we ingest all reference data as stream instead of doing on demand rpc)

Unfortunately, in lots of use cases, it seems hard to know exactly how reference data source used and dump reference data costs too much. So replay pipeline might be cheapest way to get things done in general.

In some cases,  results are partitioned and bounded. It makes possible to recomputed within bounded windows, that may requires a bit work to customize window which hold longer than watermark pass its endtime. I remember there was a Jira talk about retraction.  
In other cases, results are derived from long history which makes not rationale to keep. A side pipeline capture those events with late arriving event handling might interact with external storage and amend results.

Thanks,
Chen
 

From: Peter Lappo
Sent: Sunday, September 10, 2017 3:00 PM
To: user@flink.apache.org
Subject: ETL with changing reference data

hi,
We are building an ETL style application in Flink that consumes records from a file or a message bus as a DataStream. We are transforming records using SQL and UDFs. The UDF loads reference data in the open method and currently the data loaded remains in memory until the job is cancelled. The eval method of the UDF is used to do the actual transformation on a particular field.
So of course reference data changes and data will need to reprocessed. Lets assume we can identify and resubmit records for reprocessing what is the best design that
* keeps the Flink job running
* reloads the changed reference data
so that records are reprocessed in a deterministic fashion

Two options spring to mind
1) send a control record to the stream that reloads reference data or part of it and ensure resubmitted records are processed after the reload message
2) use a separate thread to poll the reference data source and reload any changed data which will of course suffer from race conditions

Or is there a better way of solving this type of problem with Flink?

Thanks
Peter