You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Theodore Vasiloudis <th...@gmail.com> on 2017/03/15 13:20:16 UTC

A question about iterations and prioritizing "control" over "data" inputs

Hello all,

I've started thinking about online learning in Flink and one of the issues
that has come
up in other frameworks is the ability to prioritize "control" over "data"
events in iterations.

To set an example, say we develop an ML model, that ingests events in
parallel, performs
an aggregation to update the model, and then broadcasts the updated model
to back through
an iteration/back edge. Using the above nomenclature the events being
ingested would be
"data" events, and the model update would a "control" event.

I talked about this scenario a bit with couple of people (Paris and
Gianmarco) and one thing
we would like to have is the ability to prioritize the ingestion of control
events over the data events.

If my understanding is correct, currently there is a buffer/queue of events
waiting to be processed
for each operator, and each incoming event ends up at the end of that queue.

If our data source is fast, and the model updates slow, a lot of data
events might be buffered/scheduled
to be processed before each model update, because of the speed difference
between the two
streams. But we would like to update the model that is used to process data
events as soon as
the newest version becomes available.

Is it somehow possible to make the control events "jump" the queue and be
processed as soon
as they arrive over the data events?

Regards,
Theodore

P.S. This is still very much a theoretical problem, I haven't looked at how
such a pipeline would
be implemented in Flink.

Re: A question about iterations and prioritizing "control" over "data" inputs

Posted by Paris Carbone <pa...@kth.se>.
Unless I got this wrong, if he meant relaxing FIFO processing per channel/stream partition then Robert is absolutely right.

On 23 Mar 2017, at 12:28, Paris Carbone <pa...@kth.se>> wrote:

I think what Theo meant is to allow for different: high/low priority on different channels (or data streams per se) for n-ary operators such as ConnectedStream binary maps, loops etc.. not to change the sequence of events within channels I guess.

This does not violate the FIFO channel assumptions of the checkpointing algorithm. The checkpoint barriers anyway block committed stream partitions so there is no priority concern there.

On 23 Mar 2017, at 12:13, Robert Metzger <rm...@apache.org>> wrote:

To very quickly respond to Theo's question: No, it is not possible to have records overtake each other in the buffer.
This could potentially void the exactly once processing guarantees, in the case when records overtake checkpoint barriers.


On Wed, Mar 15, 2017 at 5:58 PM, Kathleen Sharp <ka...@signavio.com>> wrote:
Hi,

I have a similar sounding use case and just yesterday was
experimenting with this approach:

Use 2 separate streams: one for model events, one for data events.
Connect these 2, key the resulting stream and then use a
RichCoFlatMapFunction to ensure that each data event is enriched with
the latest model event as soon as a new model event arrives.
Also, as soon as a new model arrives emit all previously seen events
with this new model events.
This involves keeping events and models in state.
My emitted enriched events have a command-like syntax (add/remove) so
that downstream operators can remove/add as necessary depending on the
calculations (so for each model change I would emit an add/remove pair
of enriched events).

As I say I have only experimented with this yesterday, perhaps someone
a bit more experienced with flink might spot some problems with this
approach, which I would definitely be interested in hearing.

Kat

On Wed, Mar 15, 2017 at 2:20 PM, Theodore Vasiloudis
<th...@gmail.com>> wrote:
> Hello all,
>
> I've started thinking about online learning in Flink and one of the issues
> that has come
> up in other frameworks is the ability to prioritize "control" over "data"
> events in iterations.
>
> To set an example, say we develop an ML model, that ingests events in
> parallel, performs
> an aggregation to update the model, and then broadcasts the updated model to
> back through
> an iteration/back edge. Using the above nomenclature the events being
> ingested would be
> "data" events, and the model update would a "control" event.
>
> I talked about this scenario a bit with couple of people (Paris and
> Gianmarco) and one thing
> we would like to have is the ability to prioritize the ingestion of control
> events over the data events.
>
> If my understanding is correct, currently there is a buffer/queue of events
> waiting to be processed
> for each operator, and each incoming event ends up at the end of that queue.
>
> If our data source is fast, and the model updates slow, a lot of data events
> might be buffered/scheduled
> to be processed before each model update, because of the speed difference
> between the two
> streams. But we would like to update the model that is used to process data
> events as soon as
> the newest version becomes available.
>
> Is it somehow possible to make the control events "jump" the queue and be
> processed as soon
> as they arrive over the data events?
>
> Regards,
> Theodore
>
> P.S. This is still very much a theoretical problem, I haven't looked at how
> such a pipeline would
> be implemented in Flink.




Re: A question about iterations and prioritizing "control" over "data" inputs

Posted by Paris Carbone <pa...@kth.se>.
I think what Theo meant is to allow for different: high/low priority on different channels (or data streams per se) for n-ary operators such as ConnectedStream binary maps, loops etc.. not to change the sequence of events within channels I guess.

This does not violate the FIFO channel assumptions of the checkpointing algorithm. The checkpoint barriers anyway block committed stream partitions so there is no priority concern there.

On 23 Mar 2017, at 12:13, Robert Metzger <rm...@apache.org>> wrote:

To very quickly respond to Theo's question: No, it is not possible to have records overtake each other in the buffer.
This could potentially void the exactly once processing guarantees, in the case when records overtake checkpoint barriers.


On Wed, Mar 15, 2017 at 5:58 PM, Kathleen Sharp <ka...@signavio.com>> wrote:
Hi,

I have a similar sounding use case and just yesterday was
experimenting with this approach:

Use 2 separate streams: one for model events, one for data events.
Connect these 2, key the resulting stream and then use a
RichCoFlatMapFunction to ensure that each data event is enriched with
the latest model event as soon as a new model event arrives.
Also, as soon as a new model arrives emit all previously seen events
with this new model events.
This involves keeping events and models in state.
My emitted enriched events have a command-like syntax (add/remove) so
that downstream operators can remove/add as necessary depending on the
calculations (so for each model change I would emit an add/remove pair
of enriched events).

As I say I have only experimented with this yesterday, perhaps someone
a bit more experienced with flink might spot some problems with this
approach, which I would definitely be interested in hearing.

Kat

On Wed, Mar 15, 2017 at 2:20 PM, Theodore Vasiloudis
<th...@gmail.com>> wrote:
> Hello all,
>
> I've started thinking about online learning in Flink and one of the issues
> that has come
> up in other frameworks is the ability to prioritize "control" over "data"
> events in iterations.
>
> To set an example, say we develop an ML model, that ingests events in
> parallel, performs
> an aggregation to update the model, and then broadcasts the updated model to
> back through
> an iteration/back edge. Using the above nomenclature the events being
> ingested would be
> "data" events, and the model update would a "control" event.
>
> I talked about this scenario a bit with couple of people (Paris and
> Gianmarco) and one thing
> we would like to have is the ability to prioritize the ingestion of control
> events over the data events.
>
> If my understanding is correct, currently there is a buffer/queue of events
> waiting to be processed
> for each operator, and each incoming event ends up at the end of that queue.
>
> If our data source is fast, and the model updates slow, a lot of data events
> might be buffered/scheduled
> to be processed before each model update, because of the speed difference
> between the two
> streams. But we would like to update the model that is used to process data
> events as soon as
> the newest version becomes available.
>
> Is it somehow possible to make the control events "jump" the queue and be
> processed as soon
> as they arrive over the data events?
>
> Regards,
> Theodore
>
> P.S. This is still very much a theoretical problem, I haven't looked at how
> such a pipeline would
> be implemented in Flink.



Re: A question about iterations and prioritizing "control" over "data" inputs

Posted by Robert Metzger <rm...@apache.org>.
To very quickly respond to Theo's question: No, it is not possible to have
records overtake each other in the buffer.
This could potentially void the exactly once processing guarantees, in the
case when records overtake checkpoint barriers.


On Wed, Mar 15, 2017 at 5:58 PM, Kathleen Sharp <kathleen.sharp@signavio.com
> wrote:

> Hi,
>
> I have a similar sounding use case and just yesterday was
> experimenting with this approach:
>
> Use 2 separate streams: one for model events, one for data events.
> Connect these 2, key the resulting stream and then use a
> RichCoFlatMapFunction to ensure that each data event is enriched with
> the latest model event as soon as a new model event arrives.
> Also, as soon as a new model arrives emit all previously seen events
> with this new model events.
> This involves keeping events and models in state.
> My emitted enriched events have a command-like syntax (add/remove) so
> that downstream operators can remove/add as necessary depending on the
> calculations (so for each model change I would emit an add/remove pair
> of enriched events).
>
> As I say I have only experimented with this yesterday, perhaps someone
> a bit more experienced with flink might spot some problems with this
> approach, which I would definitely be interested in hearing.
>
> Kat
>
> On Wed, Mar 15, 2017 at 2:20 PM, Theodore Vasiloudis
> <th...@gmail.com> wrote:
> > Hello all,
> >
> > I've started thinking about online learning in Flink and one of the
> issues
> > that has come
> > up in other frameworks is the ability to prioritize "control" over "data"
> > events in iterations.
> >
> > To set an example, say we develop an ML model, that ingests events in
> > parallel, performs
> > an aggregation to update the model, and then broadcasts the updated
> model to
> > back through
> > an iteration/back edge. Using the above nomenclature the events being
> > ingested would be
> > "data" events, and the model update would a "control" event.
> >
> > I talked about this scenario a bit with couple of people (Paris and
> > Gianmarco) and one thing
> > we would like to have is the ability to prioritize the ingestion of
> control
> > events over the data events.
> >
> > If my understanding is correct, currently there is a buffer/queue of
> events
> > waiting to be processed
> > for each operator, and each incoming event ends up at the end of that
> queue.
> >
> > If our data source is fast, and the model updates slow, a lot of data
> events
> > might be buffered/scheduled
> > to be processed before each model update, because of the speed difference
> > between the two
> > streams. But we would like to update the model that is used to process
> data
> > events as soon as
> > the newest version becomes available.
> >
> > Is it somehow possible to make the control events "jump" the queue and be
> > processed as soon
> > as they arrive over the data events?
> >
> > Regards,
> > Theodore
> >
> > P.S. This is still very much a theoretical problem, I haven't looked at
> how
> > such a pipeline would
> > be implemented in Flink.
>

Re: A question about iterations and prioritizing "control" over "data" inputs

Posted by Kathleen Sharp <ka...@signavio.com>.
Hi,

I have a similar sounding use case and just yesterday was
experimenting with this approach:

Use 2 separate streams: one for model events, one for data events.
Connect these 2, key the resulting stream and then use a
RichCoFlatMapFunction to ensure that each data event is enriched with
the latest model event as soon as a new model event arrives.
Also, as soon as a new model arrives emit all previously seen events
with this new model events.
This involves keeping events and models in state.
My emitted enriched events have a command-like syntax (add/remove) so
that downstream operators can remove/add as necessary depending on the
calculations (so for each model change I would emit an add/remove pair
of enriched events).

As I say I have only experimented with this yesterday, perhaps someone
a bit more experienced with flink might spot some problems with this
approach, which I would definitely be interested in hearing.

Kat

On Wed, Mar 15, 2017 at 2:20 PM, Theodore Vasiloudis
<th...@gmail.com> wrote:
> Hello all,
>
> I've started thinking about online learning in Flink and one of the issues
> that has come
> up in other frameworks is the ability to prioritize "control" over "data"
> events in iterations.
>
> To set an example, say we develop an ML model, that ingests events in
> parallel, performs
> an aggregation to update the model, and then broadcasts the updated model to
> back through
> an iteration/back edge. Using the above nomenclature the events being
> ingested would be
> "data" events, and the model update would a "control" event.
>
> I talked about this scenario a bit with couple of people (Paris and
> Gianmarco) and one thing
> we would like to have is the ability to prioritize the ingestion of control
> events over the data events.
>
> If my understanding is correct, currently there is a buffer/queue of events
> waiting to be processed
> for each operator, and each incoming event ends up at the end of that queue.
>
> If our data source is fast, and the model updates slow, a lot of data events
> might be buffered/scheduled
> to be processed before each model update, because of the speed difference
> between the two
> streams. But we would like to update the model that is used to process data
> events as soon as
> the newest version becomes available.
>
> Is it somehow possible to make the control events "jump" the queue and be
> processed as soon
> as they arrive over the data events?
>
> Regards,
> Theodore
>
> P.S. This is still very much a theoretical problem, I haven't looked at how
> such a pipeline would
> be implemented in Flink.