You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Lei Chen <le...@gmail.com> on 2017/06/30 19:22:50 UTC

connect data stream with parameter stream

Hi,

In my scenario I have 2 streams. DS1 is main data stream reading logs from
kafka, and DS2 is a parameter stream which is used to maintain a state
about all processing parameters (including filters) need to be applied at
runtime by DS1. The processing parameters can be changed anytime during the
job is running.

DS1 is a windowed stream, DS2 is just a non-keyed normal stream. How to
connect these 2 streams together so DS1 can apply those parameters in its
window function by reading up-to-date parameter state maintained by DS2?


thanks
Lei

Re: connect data stream with parameter stream

Posted by Lei Chen <le...@gmail.com>.
Hi Aljoscha,

Thanks a lot for your help! Yes solution 2 did point me the right
direction. Ended up connect them before window function so I can filter out
uninterested element earlier.

thanks,
Lei

On Tue, Jul 4, 2017 at 9:33 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Lei,
>
> I’m afraid there is currently no API for doing this in one operation. I
> see two options right now:
>
> 1. Built a custom operator that implements windowing and also has a second
> input for the parameter stream. This would be a subclass of
> TwoInputStreamOperator. As an example, you can look at
> KeyedCoProcessOperator which is the operator implementation for a two-input
> process function (CoProcessFunction). This variant gives you most
> flexibility but it’s a bit involved.
>
> 2. Use two separate steps, i.e. first do the windowed operation and then
> have a second operation that combines the window results with the parameter
> stream. Something like this:
>
> DataStream<T> input = …;
> DataStream<P> parameterStream = …;
> input
>   .keyBy(…)
>   .window(…)
>   .reduce()/process()/apply() // the operation that you want to perform
>   .connect(parameterStream)
>   .process(new MyCoProcessFunction())
>
> Where MyCoProcessFunction would receive the results of the windowed
> operation on input 1 and the parameter stream on input 2. The function
> would keep state based on the parameter stream (you should checkpoint this
> (see CheckpointedFunction, and especially OperatorStateStore.getUnionListState())
> and process elements that come in on input 1 based on this state.
>
> Union ListState works like this: each parallel operator instance can put a
> bunch of things in state. When checkpointing, the state of all parallel
> instances is collected and checkpointed. When restoring (after failure, for
> example) all state is sent to each parallel operator instance. In your case
> (I’m assuming that the parameter stream should be broadcast so that all
> parallel operator instances get the same input and therefore have the same
> state) you would only checkpoint the state of parallel operator instance 0.
> When restoring, this would be distributed to all operators and they
> therefore all have the same state again.
>
> Does that help?
>
> Best,
> Aljoscha
>
> > On 30. Jun 2017, at 21:22, Lei Chen <le...@gmail.com> wrote:
> >
> > Hi,
> >
> > In my scenario I have 2 streams. DS1 is main data stream reading logs
> from
> > kafka, and DS2 is a parameter stream which is used to maintain a state
> > about all processing parameters (including filters) need to be applied at
> > runtime by DS1. The processing parameters can be changed anytime during
> the
> > job is running.
> >
> > DS1 is a windowed stream, DS2 is just a non-keyed normal stream. How to
> > connect these 2 streams together so DS1 can apply those parameters in its
> > window function by reading up-to-date parameter state maintained by DS2?
> >
> >
> > thanks
> > Lei
>
>

Re: connect data stream with parameter stream

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Lei,

I’m afraid there is currently no API for doing this in one operation. I see two options right now:

1. Built a custom operator that implements windowing and also has a second input for the parameter stream. This would be a subclass of TwoInputStreamOperator. As an example, you can look at KeyedCoProcessOperator which is the operator implementation for a two-input process function (CoProcessFunction). This variant gives you most flexibility but it’s a bit involved.

2. Use two separate steps, i.e. first do the windowed operation and then have a second operation that combines the window results with the parameter stream. Something like this:

DataStream<T> input = …;
DataStream<P> parameterStream = …;
input
  .keyBy(…)
  .window(…)
  .reduce()/process()/apply() // the operation that you want to perform
  .connect(parameterStream)
  .process(new MyCoProcessFunction())

Where MyCoProcessFunction would receive the results of the windowed operation on input 1 and the parameter stream on input 2. The function would keep state based on the parameter stream (you should checkpoint this (see CheckpointedFunction, and especially OperatorStateStore.getUnionListState()) and process elements that come in on input 1 based on this state.

Union ListState works like this: each parallel operator instance can put a bunch of things in state. When checkpointing, the state of all parallel instances is collected and checkpointed. When restoring (after failure, for example) all state is sent to each parallel operator instance. In your case (I’m assuming that the parameter stream should be broadcast so that all parallel operator instances get the same input and therefore have the same state) you would only checkpoint the state of parallel operator instance 0. When restoring, this would be distributed to all operators and they therefore all have the same state again.

Does that help?

Best,
Aljoscha

> On 30. Jun 2017, at 21:22, Lei Chen <le...@gmail.com> wrote:
> 
> Hi,
> 
> In my scenario I have 2 streams. DS1 is main data stream reading logs from
> kafka, and DS2 is a parameter stream which is used to maintain a state
> about all processing parameters (including filters) need to be applied at
> runtime by DS1. The processing parameters can be changed anytime during the
> job is running.
> 
> DS1 is a windowed stream, DS2 is just a non-keyed normal stream. How to
> connect these 2 streams together so DS1 can apply those parameters in its
> window function by reading up-to-date parameter state maintained by DS2?
> 
> 
> thanks
> Lei