You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by David Desberg <da...@uber.com> on 2016/07/15 22:06:16 UTC

Stateful Stream Processing

Hi all,

I'm working on a simple Beam app which should compute an exponential
weighted moving average on a stream of data, by key. The data is windowed
at a fixed interval, the count of the elements is taken per-key, and then
this count should be utilized to update the moving average. This requires
maintaining state (local to each JVM instance/per-key) in the form of the
result of the previous computation. Either a key-value based state store
(as is available in Flink) or an implementation of scan semantics (where
the result of the previous computation is passed as the initial value to
the next invocation) would work; however, there does not seem to be a way
to achieve either of these with the Beam API as it currently stands.

I noticed a related JIRA issue (
https://issues.apache.org/jira/browse/BEAM-25) but it seems no progress has
been made. Is there vision/roadmap for this API? I would be happy to
contribute to the project by beginning an implementation and would love to
collaborate with anyone already working toward this goal.

Thanks!
David Desberg

Re: Stateful Stream Processing

Posted by amir bahmanyari <am...@yahoo.com>.
In the meanwhile, the poor man's best choice , like me, is using an in-memory db to maintain "state"...Am using Open Source Redis Redis - Wikipedia, the free encyclopedia & its "working". But, network back/forth makes my Flink Beam app run slower than keeping state in runtime available only Java collection objects...
  
|  
|   
|   
|   |    |

   |

  |
|  
|   |  
Redis - Wikipedia, the free encyclopedia
   |   |

  |

  |

 
FYI.Cheers

      From: David Desberg <da...@uber.com>
 To: user@beam.incubator.apache.org 
 Sent: Friday, July 15, 2016 6:59 PM
 Subject: Re: Stateful Stream Processing
   
Kenn,
Gotcha. When do you think that revision will be done, out of curiosity? Also, the EWMA example is a bit simplified; there are other forecasting algorithms we plan to implement which require more complex state management. 
As of now, our plan is to create a sort of "scan" function which can be applied to a pipeline, with semantics as described in my previous email, and implement it for the Flink pipeline translator/runner. Any thoughts on this/is a similar construct at all part of the Beam roadmap? Trying to create something that would be useful for the community at-large, not just us. 
David
On Fri, Jul 15, 2016 at 3:51 PM, Kenneth Knowles <kl...@google.com> wrote:

Hi David,I'm responsible for that feature; it was under design review at the advent of Beam (hence the low issue number). I'm working on prepping a revision that adds context and generalizes to Beam, rather than just Dataflow.For your use case, it isn't as simple as stateful decay if you care about event time, since windowing does not actually reorder your inputs. With a watermark-based trigger you are most likely close enough, though even then if the watermark passes the end of multiple windows in one update they are all permitted to be output, in any order. And transport between transforms is not required to be order preserving, though obviously in many backends ends it is, especially to support stateful pipelines. We want to talk about ordering explicitly in Beam, or else presume a lack of order.The good news is you can calculate the EWMA directly using sliding windows that are large enough so their tail has negligible contribution. This should work well and as a bonus you'll get the full time series.Apologies if I've misunderstood what you are hoping to accomplish.KennOn Jul 15, 2016 3:06 PM, "David Desberg" <da...@uber.com> wrote:
>
> Hi all,
>
> I'm working on a simple Beam app which should compute an exponential weighted moving average on a stream of data, by key. The data is windowed at a fixed interval, the count of the elements is taken per-key, and then this count should be utilized to update the moving average. This requires maintaining state (local to each JVM instance/per-key) in the form of the result of the previous computation. Either a key-value based state store (as is available in Flink) or an implementation of scan semantics (where the result of the previous computation is passed as the initial value to the next invocation) would work; however, there does not seem to be a way to achieve either of these with the Beam API as it currently stands. 
>
> I noticed a related JIRA issue (https://issues.apache.org/jira/browse/BEAM-25) but it seems no progress has been made. Is there vision/roadmap for this API? I would be happy to contribute to the project by beginning an implementation and would love to collaborate with anyone already working toward this goal.
>
> Thanks!
> David Desberg




  

Re: Stateful Stream Processing

Posted by David Desberg <da...@uber.com>.
It'd be awesome if you could ping me when you send it. I'll see if I can
send you an overview of what we'd be using in terms of state management
sometime this week. Can you elaborate on what you mean by the operation
being equivalent to a streaming combine with element-based triggering?  Not
entirely sure I get the idea, my apologies.

Thanks,
David

On Fri, Jul 15, 2016 at 8:27 PM, Kenneth Knowles <kl...@google.com> wrote:

> On Jul 15, 2016 19:00, "David Desberg" <da...@uber.com> wrote:
> >
> > Kenn,
> >
> > Gotcha. When do you think that revision will be done, out of curiosity?
>
> I don't want to speculate, only because there's a lot going on in Beam
> right now. It should be soon. I'll ping you when I send it if you like,
> though it will be just a proposal for discussion and consensus on
> dev@beam.incubator.apache.org so you might want to wait until after that
> process.
>
> > Also, the EWMA example is a bit simplified; there are other forecasting
> algorithms we plan to implement which require more complex state
> management.
>
> I'd be interested in the details if you can share them; perhaps it will
> form a new and interesting use case for my proposal :-)
>
> > As of now, our plan is to create a sort of "scan" function which can be
> applied to a pipeline, with semantics as described in my previous email,
> and implement it for the Flink pipeline translator/runner. Any thoughts on
> this/is a similar construct at all part of the Beam roadmap? Trying to
> create something that would be useful for the community at-large, not just
> us.
>
> The uses of scan will definitely be addressed by the state support we are
> going to add to the model. We definitely want to focus on
> runner-independent developments. If you wanted to build out pipelines ASAP
> then such an operation is often equivalent to a streaming Combine with
> element-based triggering, though that will tempt you to break CombineFn's
> spec. You might just want to wait.
> Kenn
>
> >
> > David
> >
> > On Fri, Jul 15, 2016 at 3:51 PM, Kenneth Knowles <kl...@google.com> wrote:
> >>
> >> Hi David,
> >>
> >> I'm responsible for that feature; it was under design review at the
> advent of Beam (hence the low issue number). I'm working on prepping a
> revision that adds context and generalizes to Beam, rather than just
> Dataflow.
> >>
> >> For your use case, it isn't as simple as stateful decay if you care
> about event time, since windowing does not actually reorder your inputs.
> With a watermark-based trigger you are most likely close enough, though
> even then if the watermark passes the end of multiple windows in one update
> they are all permitted to be output, in any order. And transport between
> transforms is not required to be order preserving, though obviously in many
> backends ends it is, especially to support stateful pipelines. We want to
> talk about ordering explicitly in Beam, or else presume a lack of order.
> >>
> >> The good news is you can calculate the EWMA directly using sliding
> windows that are large enough so their tail has negligible contribution.
> This should work well and as a bonus you'll get the full time series.
> >>
> >> Apologies if I've misunderstood what you are hoping to accomplish.
> >>
> >> Kenn
> >>
> >> On Jul 15, 2016 3:06 PM, "David Desberg" <da...@uber.com>
> wrote:
> >> >
> >> > Hi all,
> >> >
> >> > I'm working on a simple Beam app which should compute an exponential
> weighted moving average on a stream of data, by key. The data is windowed
> at a fixed interval, the count of the elements is taken per-key, and then
> this count should be utilized to update the moving average. This requires
> maintaining state (local to each JVM instance/per-key) in the form of the
> result of the previous computation. Either a key-value based state store
> (as is available in Flink) or an implementation of scan semantics (where
> the result of the previous computation is passed as the initial value to
> the next invocation) would work; however, there does not seem to be a way
> to achieve either of these with the Beam API as it currently stands.
> >> >
> >> > I noticed a related JIRA issue (
> https://issues.apache.org/jira/browse/BEAM-25) but it seems no progress
> has been made. Is there vision/roadmap for this API? I would be happy to
> contribute to the project by beginning an implementation and would love to
> collaborate with anyone already working toward this goal.
> >> >
> >> > Thanks!
> >> > David Desberg
> >
> >
>
>
>

Re: Stateful Stream Processing

Posted by Kenneth Knowles <kl...@google.com>.
On Jul 15, 2016 19:00, "David Desberg" <da...@uber.com> wrote:
>
> Kenn,
>
> Gotcha. When do you think that revision will be done, out of curiosity?

I don't want to speculate, only because there's a lot going on in Beam
right now. It should be soon. I'll ping you when I send it if you like,
though it will be just a proposal for discussion and consensus on
dev@beam.incubator.apache.org so you might want to wait until after that
process.

> Also, the EWMA example is a bit simplified; there are other forecasting
algorithms we plan to implement which require more complex state
management.

I'd be interested in the details if you can share them; perhaps it will
form a new and interesting use case for my proposal :-)

> As of now, our plan is to create a sort of "scan" function which can be
applied to a pipeline, with semantics as described in my previous email,
and implement it for the Flink pipeline translator/runner. Any thoughts on
this/is a similar construct at all part of the Beam roadmap? Trying to
create something that would be useful for the community at-large, not just
us.

The uses of scan will definitely be addressed by the state support we are
going to add to the model. We definitely want to focus on
runner-independent developments. If you wanted to build out pipelines ASAP
then such an operation is often equivalent to a streaming Combine with
element-based triggering, though that will tempt you to break CombineFn's
spec. You might just want to wait.
Kenn

>
> David
>
> On Fri, Jul 15, 2016 at 3:51 PM, Kenneth Knowles <kl...@google.com> wrote:
>>
>> Hi David,
>>
>> I'm responsible for that feature; it was under design review at the
advent of Beam (hence the low issue number). I'm working on prepping a
revision that adds context and generalizes to Beam, rather than just
Dataflow.
>>
>> For your use case, it isn't as simple as stateful decay if you care
about event time, since windowing does not actually reorder your inputs.
With a watermark-based trigger you are most likely close enough, though
even then if the watermark passes the end of multiple windows in one update
they are all permitted to be output, in any order. And transport between
transforms is not required to be order preserving, though obviously in many
backends ends it is, especially to support stateful pipelines. We want to
talk about ordering explicitly in Beam, or else presume a lack of order.
>>
>> The good news is you can calculate the EWMA directly using sliding
windows that are large enough so their tail has negligible contribution.
This should work well and as a bonus you'll get the full time series.
>>
>> Apologies if I've misunderstood what you are hoping to accomplish.
>>
>> Kenn
>>
>> On Jul 15, 2016 3:06 PM, "David Desberg" <da...@uber.com> wrote:
>> >
>> > Hi all,
>> >
>> > I'm working on a simple Beam app which should compute an exponential
weighted moving average on a stream of data, by key. The data is windowed
at a fixed interval, the count of the elements is taken per-key, and then
this count should be utilized to update the moving average. This requires
maintaining state (local to each JVM instance/per-key) in the form of the
result of the previous computation. Either a key-value based state store
(as is available in Flink) or an implementation of scan semantics (where
the result of the previous computation is passed as the initial value to
the next invocation) would work; however, there does not seem to be a way
to achieve either of these with the Beam API as it currently stands.
>> >
>> > I noticed a related JIRA issue (
https://issues.apache.org/jira/browse/BEAM-25) but it seems no progress has
been made. Is there vision/roadmap for this API? I would be happy to
contribute to the project by beginning an implementation and would love to
collaborate with anyone already working toward this goal.
>> >
>> > Thanks!
>> > David Desberg
>
>

Re: Stateful Stream Processing

Posted by David Desberg <da...@uber.com>.
Kenn,

Gotcha. When do you think that revision will be done, out of curiosity?
Also, the EWMA example is a bit simplified; there are other forecasting
algorithms we plan to implement which require more complex state
management.

As of now, our plan is to create a sort of "scan" function which can be
applied to a pipeline, with semantics as described in my previous email,
and implement it for the Flink pipeline translator/runner. Any thoughts on
this/is a similar construct at all part of the Beam roadmap? Trying to
create something that would be useful for the community at-large, not just
us.

David

On Fri, Jul 15, 2016 at 3:51 PM, Kenneth Knowles <kl...@google.com> wrote:

> Hi David,
>
> I'm responsible for that feature; it was under design review at the advent
> of Beam (hence the low issue number). I'm working on prepping a revision
> that adds context and generalizes to Beam, rather than just Dataflow.
>
> For your use case, it isn't as simple as stateful decay if you care about
> event time, since windowing does not actually reorder your inputs. With a
> watermark-based trigger you are most likely close enough, though even then
> if the watermark passes the end of multiple windows in one update they are
> all permitted to be output, in any order. And transport between transforms
> is not required to be order preserving, though obviously in many backends
> ends it is, especially to support stateful pipelines. We want to talk about
> ordering explicitly in Beam, or else presume a lack of order.
>
> The good news is you can calculate the EWMA directly using sliding windows
> that are large enough so their tail has negligible contribution. This
> should work well and as a bonus you'll get the full time series.
>
> Apologies if I've misunderstood what you are hoping to accomplish.
>
> Kenn
>
> On Jul 15, 2016 3:06 PM, "David Desberg" <da...@uber.com> wrote:
> >
> > Hi all,
> >
> > I'm working on a simple Beam app which should compute an exponential
> weighted moving average on a stream of data, by key. The data is windowed
> at a fixed interval, the count of the elements is taken per-key, and then
> this count should be utilized to update the moving average. This requires
> maintaining state (local to each JVM instance/per-key) in the form of the
> result of the previous computation. Either a key-value based state store
> (as is available in Flink) or an implementation of scan semantics (where
> the result of the previous computation is passed as the initial value to
> the next invocation) would work; however, there does not seem to be a way
> to achieve either of these with the Beam API as it currently stands.
> >
> > I noticed a related JIRA issue (
> https://issues.apache.org/jira/browse/BEAM-25) but it seems no progress
> has been made. Is there vision/roadmap for this API? I would be happy to
> contribute to the project by beginning an implementation and would love to
> collaborate with anyone already working toward this goal.
> >
> > Thanks!
> > David Desberg
>

Re: Stateful Stream Processing

Posted by Kenneth Knowles <kl...@google.com>.
Hi David,

I'm responsible for that feature; it was under design review at the advent
of Beam (hence the low issue number). I'm working on prepping a revision
that adds context and generalizes to Beam, rather than just Dataflow.

For your use case, it isn't as simple as stateful decay if you care about
event time, since windowing does not actually reorder your inputs. With a
watermark-based trigger you are most likely close enough, though even then
if the watermark passes the end of multiple windows in one update they are
all permitted to be output, in any order. And transport between transforms
is not required to be order preserving, though obviously in many backends
ends it is, especially to support stateful pipelines. We want to talk about
ordering explicitly in Beam, or else presume a lack of order.

The good news is you can calculate the EWMA directly using sliding windows
that are large enough so their tail has negligible contribution. This
should work well and as a bonus you'll get the full time series.

Apologies if I've misunderstood what you are hoping to accomplish.

Kenn

On Jul 15, 2016 3:06 PM, "David Desberg" <da...@uber.com> wrote:
>
> Hi all,
>
> I'm working on a simple Beam app which should compute an exponential
weighted moving average on a stream of data, by key. The data is windowed
at a fixed interval, the count of the elements is taken per-key, and then
this count should be utilized to update the moving average. This requires
maintaining state (local to each JVM instance/per-key) in the form of the
result of the previous computation. Either a key-value based state store
(as is available in Flink) or an implementation of scan semantics (where
the result of the previous computation is passed as the initial value to
the next invocation) would work; however, there does not seem to be a way
to achieve either of these with the Beam API as it currently stands.
>
> I noticed a related JIRA issue (
https://issues.apache.org/jira/browse/BEAM-25) but it seems no progress has
been made. Is there vision/roadmap for this API? I would be happy to
contribute to the project by beginning an implementation and would love to
collaborate with anyone already working toward this goal.
>
> Thanks!
> David Desberg