You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ron Crocker <rc...@newrelic.com> on 2018/01/20 00:43:57 UTC

Forcing consuming one stream completely prior to another starting

I’m joining two streams - one is a “decoration” stream that we have in a compacted Kafka topic, produced using a view on a MySQL table AND using Kafka Connect; the other is the “event data” we want to decorate, coming in over time via Kafka. These streams are keyed the same way - via an “id” field, and we join them using CoFlatMap that attaches the data from the decoration stream to the event data and publishes downstream.

What I’d like to do in my CoFlatMap is wait to process the event data until we’ve received the corresponding decoration data. How do I make that happen? 

Alternatively, is there another structure that will work to prevent non-decorated event data from being allowed through until the corresponding decoration event appears?

Thanks!

Ron

Re: Forcing consuming one stream completely prior to another starting

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

How can you determine whether the required decoration data for an event from the main stream is there? If it works via event-time you could think about buffering main-input events in the operator until the corresponding decoration arrives.

On a side note, we're currently working on broadcast inputs and broadcast state, which is a first step towards proper side inputs. This work should land in Flink 1.5.

Best,
Aljoscha

> On 20. Jan 2018, at 09:25, Maxim Parkachov <la...@gmail.com> wrote:
> 
> Hi Ron,
> 
> I’m joining two streams - one is a “decoration” stream that we have in a compacted Kafka topic, produced using a view on a MySQL table AND using Kafka Connect; the other is the “event data” we want to decorate, coming in over time via Kafka. These streams are keyed the same way - via an “id” field, and we join them using CoFlatMap that attaches the data from the decoration stream to the event data and publishes downstream.
> 
> What I’d like to do in my CoFlatMap is wait to process the event data until we’ve received the corresponding decoration data. How do I make that happen?
> 
>  
> I have exactly the same scenario and researched this. Basically, what we need is https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API <https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API>, but unfortunately it seems like nobody actively working on it. You could check ideas there though. 
> 
> Side inputs seems to be implemented in Beam API and should be working on Flink executor, but I didn’t try it for many reasons.
> 
> I have ended with following solution:
> 
> - in the script staring Job I create Stop file on shared filesystem (HDFS)
> - created 2 SourceFunction extending Kafka source 
> - in source function for “decoration” stream in run method I consume all records from compacted topic. Here is the tricky part how to identify if everything is consumed already. I resolved it by reading kafka end offset directly with kafka admin API and checking if I arrived at this offset. After waiting a bit to make sure that event is propagated to next operator I delete Stop file on the shared file system
> - in source function for event data, I have implemented “open” method waiting until Stop file is deleted. This keeps it consuming event data.
> - in pipeline I broadcasted “decoration” event and used CoProcessFunction to store it in state and enrich main event stream.
> 
> The application is not in production yet as I need to do more testing, but it seems to work. 
> 
> Additionally I tried to cache decorated data in state of source function to recover from checkpoint easily, but I’m still not sure if it’s better to read it from compacted topic every time or have additional cache in source function or state in CoProcessFunction is enough.
> 
> Hope this helps and would be interested to hear your experience.
> 
> Regards,
> Maxim.


Re: Forcing consuming one stream completely prior to another starting

Posted by Maxim Parkachov <la...@gmail.com>.
Hi Ron,

I’m joining two streams - one is a “decoration” stream that we have in a
> compacted Kafka topic, produced using a view on a MySQL table AND using
> Kafka Connect; the other is the “event data” we want to decorate, coming in
> over time via Kafka. These streams are keyed the same way - via an “id”
> field, and we join them using CoFlatMap that attaches the data from the
> decoration stream to the event data and publishes downstream.
>
> What I’d like to do in my CoFlatMap is wait to process the event data
> until we’ve received the corresponding decoration data. How do I make that
> happen?
>
>
I have exactly the same scenario and researched this. Basically, what we
need is
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API,
but unfortunately it seems like nobody actively working on it. You could
check ideas there though.

Side inputs seems to be implemented in Beam API and should be working on
Flink executor, but I didn’t try it for many reasons.

I have ended with following solution:

- in the script staring Job I create Stop file on shared filesystem (HDFS)
- created 2 SourceFunction extending Kafka source
- in source function for “decoration” stream in run method I consume all
records from compacted topic. Here is the tricky part how to identify if
everything is consumed already. I resolved it by reading kafka end offset
directly with kafka admin API and checking if I arrived at this offset.
After waiting a bit to make sure that event is propagated to next operator
I delete Stop file on the shared file system
- in source function for event data, I have implemented “open” method
waiting until Stop file is deleted. This keeps it consuming event data.
- in pipeline I broadcasted “decoration” event and used CoProcessFunction
to store it in state and enrich main event stream.

The application is not in production yet as I need to do more testing, but
it seems to work.

Additionally I tried to cache decorated data in state of source function to
recover from checkpoint easily, but I’m still not sure if it’s better to
read it from compacted topic every time or have additional cache in source
function or state in CoProcessFunction is enough.

Hope this helps and would be interested to hear your experience.

Regards,
Maxim.