You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Aaron Dixon <at...@gmail.com> on 2019/10/29 20:15:37 UTC

aggregating over triggered results

Hi I am new to Beam.

I would like to accumulate data over 30 day period and perform a running
aggregation over this data, say every 10 minutes.

I could use a sliding window of 30 days every 10 minutes (triggering at end
of window) but this seems grossly inefficient (both in terms of # of
windows at play and # of events duplicated across these windows).

A more efficient strategy seems to be to use a sliding window of 60 days
every 30 days -- *triggering* every 10 minutes -- so that I'm guaranteed to
have 30 days worth of data aggregated/combined in at least one of the 2
at-play sliding windows.

The last piece of this puzzle however would be to do a final global
aggregation over *only the keys from the latest trigger of the earlier
sliding window*.

But Beam does not seem to offer a way to orchestrate this. Even though this
seems like it would be a pretty common or fundamental ask.

One thought I had was to re-window in a way that would isolate keys
triggered at the same time, in the same window but I don't see any
contracts from Beam that would allow an approach like that.

What am I missing?

Re: aggregating over triggered results

Posted by Robert Bradshaw <ro...@google.com>.
On Thu, Oct 31, 2019 at 8:48 PM Aaron Dixon <at...@gmail.com> wrote:
>
> First of all thank you for taking the time on this very clear and helpful message. Much appreciated.
>
> >I suppose one could avoid doing any pre-aggregation, and emit all of
> the events (with reified timestamp) in 60/30-day windows, then have a
> DoFn that filters on the events and computes each of the 10-minute
> aggregates over the "true" sliding window (4320 outputs). This could
> be cheaper if your events are very sparse, will be more expensive if
> they're very dense, and it's unclear what the tradeoff will be.
>
> This is exactly what I was doing (trying to do), reify the events and filter them out to compute my own desired window for the trigger. I have lots of events but each key has few events (in the thousands) but I think your point is that even this is not a win, the events overall would have to be quite sparse for it to be a win and by how much. So I can see why this is perhaps not a great thread to pursue.
>
> On another note, trying to use *periodic* triggers like this in *intermediate* pipeline stages and leverage them in downstream aggregations was something I was trying to do here and in a few other cases. (I'm new to Beam and triggers seemed fundamental so I expected to not get so lost trying to use them this way.) But at least at this stage of my understanding I think this was misplaced... periodic triggers seem primarily important say at the last stage of a pipeline where you may be writing updates to an actual sink/table.
>
> In other words suppose the above (60/30 day sliding) approach turned out to be more efficient... I still have no idea if, using Beam, I'd be able to properly regroup on the other side and pick out all the "latest triggered" events from the rest... or even know when I've got them. This was the source of my original question, but I'm now just thinking this is just not what people do in Beam pipelines... periodically trigger windows _in the middle_ of a pipeline. Am I on the right track in this thinking? If so, I wonder if the API would better reflect this? If it's a doomed strategy to try to periodically trigger 'into' downstream aggregations, why is the API so friendly to doing just this?

Yes, see e.g. https://docs.google.com/document/d/17H2sBEtnoTSxjzlrz7rmKtX5E3F0mW1NpFQzWzSYOpY
. As an intermediate point (and stepping stone) we want to at least
have retractions:
https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE
. Triggering is an advanced, and somewhat thorny (and not as fleshed
out) concept (e.g. it introduced non-determinism). It's basically
trying to solve them problem of seeing versions of aggregations that
are not gated by the watermark (either early, before the watermark has
declared that you've seen all the data, or late, in case the watermark
was wrong (watermarks can be heuristic as perfect certainty might be
to slow/expensive)).

> On Wed, Oct 30, 2019 at 5:37 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> On Tue, Oct 29, 2019 at 7:01 PM Aaron Dixon <at...@gmail.com> wrote:
>> >
>> > Thank you, Luke and Robert. Sorry for hitting dev@, I criss-crossed and meant to hit user@, but as we're here could you clarify your two points, however--
>>
>> No problem. This is veering into dev@ territory anyway :).
>>
>> > 1) I am under the impression that the 4,000 sliding windows approach (30 days every 10m) will re-evaluate my combine aggregation every 10m whereas with the two-window approach my Combine aggregation would evolve iteratively, only merging new results into the aggregation.
>> >
>> > If there's a cross-window optimization occurring that would allow iterative combining _across windows_, given the substantial order of magnitude difference in scale at play, is it safe to consider such 'internal optimization detail' part of the platform contract (Dataflow's, say)? Otherwise it would be hard to lean on this from a production system that will live into the future.
>>
>> OK, let's first define exactly what (I think) you're trying to
>> compute. Let's label windows by their (upper) endpoint. So, every 10
>> minutes you have a window W_t and an aggregate Aggr(e for e in
>> all_events if t - 60days <= timestamp(e) < t).
>>
>> The way this is computed in Beam is by storing a map W_t ->
>> RunningAggregate and whenever we see an element with timestamp T we
>> assign it to the set of windows S = {W_t : T in W_t} (in this case
>> there would be 30*24*6 = 4320 of them) and subsequently update all the
>> running aggregates. When we are sure we've seen all elements up to t
>> (the watermark) we release window W_t with its computed aggregate
>> downstream.
>>
>> An alternative that's often proposed, and works only for aligned
>> sliding windows, is to instead store a map of 10-minute buckets to
>> running aggregates, and whenever an element comes in we add its value
>> to the aggregate of that bucket. This side is cheaper, but every time
>> the watermark tells us we're able to release a window we then have to
>> compute an Aggregate over all 4000 of these buckets.
>>
>> A further extension, if the aggregation function is reversible, is to
>> keep a running total, and every time we release a window, we add the
>> next bucket's contribution, and remove the previous buckets
>> contribution, to this running total. If the computation is not
>> reversible, we can compute a "binary tree" of aggregates (e.g. 10-min
>> buckets, 20-min buckets, 40-min buckets, ...) and perform log(N)
>> aggregations each time and element comes in and log(N) every time a
>> window is released.
>>
>> Each of these is more specialized to the exact shape of sliding
>> windows, and though they may allow us to save some compute, still
>> require the storage of 4320 (or even 4320 log(N)) bits of state. Often
>> the compute is virtually free compared to the cost of reading/writing
>> state, which means there are less (or even no) advantages for the
>> specialized methods (depending on how sparse the data is), especially
>> if one does blind writes and only has to absorb latency on reads.
>>
>> As mentioned, Beam does the first, and though we've looked at the
>> others there has not been a compelling case made that they would
>> actually help much if at all in practice.
>>
>>
>> As for what would be computed with 60/30-day sliding windows with 10
>> minute triggers, the output would be (approximately, due to mixing
>> processing and event time, though at this granularity they are likely
>> to line up mostly), W_t_i where t jumps in 30 day increments and i
>> jumps in 10 minute increments and the aggregate assigned to W_t_i is
>> Aggr(e for e in all_events where t-60days < timestamp(e) < i) which is
>> not quite the same thing. E.g. let's say we have a single event
>> (aggregation count) every 10 minute interval. The sliding windows
>> output would give 4320 evens for every 30-day window, published every
>> 10 minutes. The 60/30-day would re-publish each of the windows [0,
>> 60day), [30day, 90day), [60day, 120day), ... with the values 1, 2, 3,
>> 4, ..., 4320. In other words, at time t=45day, the window  [0, 60day)
>> would trigger with a value of the 6480 values that had fallen in this
>> window so far, and the [30day, 90day) window would trigger with the
>> 2160 seen so far, neither of which are accurate representations of
>> "from now until 30 days ago."
>>
>> I suppose one could avoid doing any pre-aggregation, and emit all of
>> the events (with reified timestamp) in 60/30-day windows, then have a
>> DoFn that filters on the events and computes each of the 10-minute
>> aggregates over the "true" sliding window (4320 outputs). This could
>> be cheaper if your events are very sparse, will be more expensive if
>> they're very dense, and it's unclear what the tradeoff will be.
>>
>> I would try the straightforward approach and see if that works,
>> because it just well might be good enough that the additional
>> complexity isn't worth it (if even a measurable improvement). You can
>> play with various windowing/triggering strategies at
>> https://window-explorer.appspot.com/ (though unfortunately processing
>> time triggers are not represented).
>>
>> > 2) When you say "regardless of the how the problem is structured" there are 4,000 stored 'sub-aggregations', even in the two-window approach--why is that so? Isn't the volume of panes produced by a trigger a function of what keys have actually received new values *in the window*?
>>
>> True, if most 10-minute intervals that have no event then there are
>> further optimizations one can do.
>>
>> > Thanks for help in understanding these details. I want to make good use of Beam and hope to contribute back at some point (docs/writing etc), once I can come to terms with all of these pieces.
>> >
>> > On 2019/10/29 20:39:18, Robert Bradshaw <ro...@google.com> wrote:
>> > > No matter how the problem is structured, computing 30 day aggregations
>> > > for every 10 minute window requires storing at least 30day/10min =
>> > > ~4000 sub-aggregations. In Beam, the elements themselves are not
>> > > stored in every window, only the intermediate aggregates.
>> > >
>> > > I second Luke's suggestion to try it out and see if this is indeed a
>> > > prohibitive bottleneck.
>> > >
>> > > On Tue, Oct 29, 2019 at 1:29 PM Luke Cwik <lc...@google.com> wrote:
>> > > >
>> > > > You should first try the obvious answer of using a sliding window of 30 days every 10 minutes before you try the 60 days every 30 days.
>> > > > Beam has some optimizations which will assign a value to multiple windows and only process that value once even if its in many windows. If that doesn't perform well, then come back to dev@ and look to optimize.
>> > > >
>> > > > On Tue, Oct 29, 2019 at 1:22 PM Aaron Dixon <at...@gmail.com> wrote:
>> > > >>
>> > > >> Hi I am new to Beam.
>> > > >>
>> > > >> I would like to accumulate data over 30 day period and perform a running aggregation over this data, say every 10 minutes.
>> > > >>
>> > > >> I could use a sliding window of 30 days every 10 minutes (triggering at end of window) but this seems grossly inefficient (both in terms of # of windows at play and # of events duplicated across these windows).
>> > > >>
>> > > >> A more efficient strategy seems to be to use a sliding window of 60 days every 30 days -- triggering every 10 minutes -- so that I'm guaranteed to have 30 days worth of data aggregated/combined in at least one of the 2 at-play sliding windows.
>> > > >>
>> > > >> The last piece of this puzzle however would be to do a final global aggregation over only the keys from the latest trigger of the earlier sliding window.
>> > > >>
>> > > >> But Beam does not seem to offer a way to orchestrate this. Even though this seems like it would be a pretty common or fundamental ask.
>> > > >>
>> > > >> One thought I had was to re-window in a way that would isolate keys triggered at the same time, in the same window but I don't see any contracts from Beam that would allow an approach like that.
>> > > >>
>> > > >> What am I missing?
>> > > >>
>> > > >>
>> > >

Re: aggregating over triggered results

Posted by Aaron Dixon <at...@gmail.com>.
First of all thank you for taking the time on this very clear and helpful
message. Much appreciated.

>I suppose one could avoid doing any pre-aggregation, and emit all of
the events (with reified timestamp) in 60/30-day windows, then have a
DoFn that filters on the events and computes each of the 10-minute
aggregates over the "true" sliding window (4320 outputs). This could
be cheaper if your events are very sparse, will be more expensive if
they're very dense, and it's unclear what the tradeoff will be.

This is exactly what I was doing (trying to do), reify the events and
filter them out to compute my own desired window for the trigger. I have
lots of events but each key has few events (in the thousands) but I think
your point is that even this is not a win, the events overall would have to
be quite sparse for it to be a win and by how much. So I can see why this
is perhaps not a great thread to pursue.

On another note, trying to use *periodic* triggers like this in
*intermediate* pipeline stages and leverage them in downstream aggregations
was something I was trying to do here and in a few other cases. (I'm new to
Beam and triggers seemed fundamental so I expected to not get so lost
trying to use them this way.) But at least at this stage of my
understanding I think this was misplaced... periodic triggers seem
primarily important say at the last stage of a pipeline where you may be
writing updates to an actual sink/table.

In other words suppose the above (60/30 day sliding) approach turned out to
be more efficient... I still have no idea if, using Beam, I'd be able to
properly regroup on the other side and pick out all the "latest triggered"
events from the rest... or even know when I've got them. This was the
source of my original question, but I'm now just thinking this is just not
what people do in Beam pipelines... periodically trigger windows _in the
middle_ of a pipeline. Am I on the right track in this thinking? If so, I
wonder if the API would better reflect this? If it's a doomed strategy to
try to periodically trigger 'into' downstream aggregations, why is the API
so friendly to doing just this?







On Wed, Oct 30, 2019 at 5:37 PM Robert Bradshaw <ro...@google.com> wrote:

> On Tue, Oct 29, 2019 at 7:01 PM Aaron Dixon <at...@gmail.com> wrote:
> >
> > Thank you, Luke and Robert. Sorry for hitting dev@, I criss-crossed and
> meant to hit user@, but as we're here could you clarify your two points,
> however--
>
> No problem. This is veering into dev@ territory anyway :).
>
> > 1) I am under the impression that the 4,000 sliding windows approach (30
> days every 10m) will re-evaluate my combine aggregation every 10m whereas
> with the two-window approach my Combine aggregation would evolve
> iteratively, only merging new results into the aggregation.
> >
> > If there's a cross-window optimization occurring that would allow
> iterative combining _across windows_, given the substantial order of
> magnitude difference in scale at play, is it safe to consider such
> 'internal optimization detail' part of the platform contract (Dataflow's,
> say)? Otherwise it would be hard to lean on this from a production system
> that will live into the future.
>
> OK, let's first define exactly what (I think) you're trying to
> compute. Let's label windows by their (upper) endpoint. So, every 10
> minutes you have a window W_t and an aggregate Aggr(e for e in
> all_events if t - 60days <= timestamp(e) < t).
>
> The way this is computed in Beam is by storing a map W_t ->
> RunningAggregate and whenever we see an element with timestamp T we
> assign it to the set of windows S = {W_t : T in W_t} (in this case
> there would be 30*24*6 = 4320 of them) and subsequently update all the
> running aggregates. When we are sure we've seen all elements up to t
> (the watermark) we release window W_t with its computed aggregate
> downstream.
>
> An alternative that's often proposed, and works only for aligned
> sliding windows, is to instead store a map of 10-minute buckets to
> running aggregates, and whenever an element comes in we add its value
> to the aggregate of that bucket. This side is cheaper, but every time
> the watermark tells us we're able to release a window we then have to
> compute an Aggregate over all 4000 of these buckets.
>
> A further extension, if the aggregation function is reversible, is to
> keep a running total, and every time we release a window, we add the
> next bucket's contribution, and remove the previous buckets
> contribution, to this running total. If the computation is not
> reversible, we can compute a "binary tree" of aggregates (e.g. 10-min
> buckets, 20-min buckets, 40-min buckets, ...) and perform log(N)
> aggregations each time and element comes in and log(N) every time a
> window is released.
>
> Each of these is more specialized to the exact shape of sliding
> windows, and though they may allow us to save some compute, still
> require the storage of 4320 (or even 4320 log(N)) bits of state. Often
> the compute is virtually free compared to the cost of reading/writing
> state, which means there are less (or even no) advantages for the
> specialized methods (depending on how sparse the data is), especially
> if one does blind writes and only has to absorb latency on reads.
>
> As mentioned, Beam does the first, and though we've looked at the
> others there has not been a compelling case made that they would
> actually help much if at all in practice.
>
>
> As for what would be computed with 60/30-day sliding windows with 10
> minute triggers, the output would be (approximately, due to mixing
> processing and event time, though at this granularity they are likely
> to line up mostly), W_t_i where t jumps in 30 day increments and i
> jumps in 10 minute increments and the aggregate assigned to W_t_i is
> Aggr(e for e in all_events where t-60days < timestamp(e) < i) which is
> not quite the same thing. E.g. let's say we have a single event
> (aggregation count) every 10 minute interval. The sliding windows
> output would give 4320 evens for every 30-day window, published every
> 10 minutes. The 60/30-day would re-publish each of the windows [0,
> 60day), [30day, 90day), [60day, 120day), ... with the values 1, 2, 3,
> 4, ..., 4320. In other words, at time t=45day, the window  [0, 60day)
> would trigger with a value of the 6480 values that had fallen in this
> window so far, and the [30day, 90day) window would trigger with the
> 2160 seen so far, neither of which are accurate representations of
> "from now until 30 days ago."
>
> I suppose one could avoid doing any pre-aggregation, and emit all of
> the events (with reified timestamp) in 60/30-day windows, then have a
> DoFn that filters on the events and computes each of the 10-minute
> aggregates over the "true" sliding window (4320 outputs). This could
> be cheaper if your events are very sparse, will be more expensive if
> they're very dense, and it's unclear what the tradeoff will be.
>
> I would try the straightforward approach and see if that works,
> because it just well might be good enough that the additional
> complexity isn't worth it (if even a measurable improvement). You can
> play with various windowing/triggering strategies at
> https://window-explorer.appspot.com/ (though unfortunately processing
> time triggers are not represented).
>
> > 2) When you say "regardless of the how the problem is structured" there
> are 4,000 stored 'sub-aggregations', even in the two-window approach--why
> is that so? Isn't the volume of panes produced by a trigger a function of
> what keys have actually received new values *in the window*?
>
> True, if most 10-minute intervals that have no event then there are
> further optimizations one can do.
>
> > Thanks for help in understanding these details. I want to make good use
> of Beam and hope to contribute back at some point (docs/writing etc), once
> I can come to terms with all of these pieces.
> >
> > On 2019/10/29 20:39:18, Robert Bradshaw <ro...@google.com> wrote:
> > > No matter how the problem is structured, computing 30 day aggregations
> > > for every 10 minute window requires storing at least 30day/10min =
> > > ~4000 sub-aggregations. In Beam, the elements themselves are not
> > > stored in every window, only the intermediate aggregates.
> > >
> > > I second Luke's suggestion to try it out and see if this is indeed a
> > > prohibitive bottleneck.
> > >
> > > On Tue, Oct 29, 2019 at 1:29 PM Luke Cwik <lc...@google.com> wrote:
> > > >
> > > > You should first try the obvious answer of using a sliding window of
> 30 days every 10 minutes before you try the 60 days every 30 days.
> > > > Beam has some optimizations which will assign a value to multiple
> windows and only process that value once even if its in many windows. If
> that doesn't perform well, then come back to dev@ and look to optimize.
> > > >
> > > > On Tue, Oct 29, 2019 at 1:22 PM Aaron Dixon <at...@gmail.com>
> wrote:
> > > >>
> > > >> Hi I am new to Beam.
> > > >>
> > > >> I would like to accumulate data over 30 day period and perform a
> running aggregation over this data, say every 10 minutes.
> > > >>
> > > >> I could use a sliding window of 30 days every 10 minutes
> (triggering at end of window) but this seems grossly inefficient (both in
> terms of # of windows at play and # of events duplicated across these
> windows).
> > > >>
> > > >> A more efficient strategy seems to be to use a sliding window of 60
> days every 30 days -- triggering every 10 minutes -- so that I'm guaranteed
> to have 30 days worth of data aggregated/combined in at least one of the 2
> at-play sliding windows.
> > > >>
> > > >> The last piece of this puzzle however would be to do a final global
> aggregation over only the keys from the latest trigger of the earlier
> sliding window.
> > > >>
> > > >> But Beam does not seem to offer a way to orchestrate this. Even
> though this seems like it would be a pretty common or fundamental ask.
> > > >>
> > > >> One thought I had was to re-window in a way that would isolate keys
> triggered at the same time, in the same window but I don't see any
> contracts from Beam that would allow an approach like that.
> > > >>
> > > >> What am I missing?
> > > >>
> > > >>
> > >
>

Re: aggregating over triggered results

Posted by Robert Bradshaw <ro...@google.com>.
On Tue, Oct 29, 2019 at 7:01 PM Aaron Dixon <at...@gmail.com> wrote:
>
> Thank you, Luke and Robert. Sorry for hitting dev@, I criss-crossed and meant to hit user@, but as we're here could you clarify your two points, however--

No problem. This is veering into dev@ territory anyway :).

> 1) I am under the impression that the 4,000 sliding windows approach (30 days every 10m) will re-evaluate my combine aggregation every 10m whereas with the two-window approach my Combine aggregation would evolve iteratively, only merging new results into the aggregation.
>
> If there's a cross-window optimization occurring that would allow iterative combining _across windows_, given the substantial order of magnitude difference in scale at play, is it safe to consider such 'internal optimization detail' part of the platform contract (Dataflow's, say)? Otherwise it would be hard to lean on this from a production system that will live into the future.

OK, let's first define exactly what (I think) you're trying to
compute. Let's label windows by their (upper) endpoint. So, every 10
minutes you have a window W_t and an aggregate Aggr(e for e in
all_events if t - 60days <= timestamp(e) < t).

The way this is computed in Beam is by storing a map W_t ->
RunningAggregate and whenever we see an element with timestamp T we
assign it to the set of windows S = {W_t : T in W_t} (in this case
there would be 30*24*6 = 4320 of them) and subsequently update all the
running aggregates. When we are sure we've seen all elements up to t
(the watermark) we release window W_t with its computed aggregate
downstream.

An alternative that's often proposed, and works only for aligned
sliding windows, is to instead store a map of 10-minute buckets to
running aggregates, and whenever an element comes in we add its value
to the aggregate of that bucket. This side is cheaper, but every time
the watermark tells us we're able to release a window we then have to
compute an Aggregate over all 4000 of these buckets.

A further extension, if the aggregation function is reversible, is to
keep a running total, and every time we release a window, we add the
next bucket's contribution, and remove the previous buckets
contribution, to this running total. If the computation is not
reversible, we can compute a "binary tree" of aggregates (e.g. 10-min
buckets, 20-min buckets, 40-min buckets, ...) and perform log(N)
aggregations each time and element comes in and log(N) every time a
window is released.

Each of these is more specialized to the exact shape of sliding
windows, and though they may allow us to save some compute, still
require the storage of 4320 (or even 4320 log(N)) bits of state. Often
the compute is virtually free compared to the cost of reading/writing
state, which means there are less (or even no) advantages for the
specialized methods (depending on how sparse the data is), especially
if one does blind writes and only has to absorb latency on reads.

As mentioned, Beam does the first, and though we've looked at the
others there has not been a compelling case made that they would
actually help much if at all in practice.


As for what would be computed with 60/30-day sliding windows with 10
minute triggers, the output would be (approximately, due to mixing
processing and event time, though at this granularity they are likely
to line up mostly), W_t_i where t jumps in 30 day increments and i
jumps in 10 minute increments and the aggregate assigned to W_t_i is
Aggr(e for e in all_events where t-60days < timestamp(e) < i) which is
not quite the same thing. E.g. let's say we have a single event
(aggregation count) every 10 minute interval. The sliding windows
output would give 4320 evens for every 30-day window, published every
10 minutes. The 60/30-day would re-publish each of the windows [0,
60day), [30day, 90day), [60day, 120day), ... with the values 1, 2, 3,
4, ..., 4320. In other words, at time t=45day, the window  [0, 60day)
would trigger with a value of the 6480 values that had fallen in this
window so far, and the [30day, 90day) window would trigger with the
2160 seen so far, neither of which are accurate representations of
"from now until 30 days ago."

I suppose one could avoid doing any pre-aggregation, and emit all of
the events (with reified timestamp) in 60/30-day windows, then have a
DoFn that filters on the events and computes each of the 10-minute
aggregates over the "true" sliding window (4320 outputs). This could
be cheaper if your events are very sparse, will be more expensive if
they're very dense, and it's unclear what the tradeoff will be.

I would try the straightforward approach and see if that works,
because it just well might be good enough that the additional
complexity isn't worth it (if even a measurable improvement). You can
play with various windowing/triggering strategies at
https://window-explorer.appspot.com/ (though unfortunately processing
time triggers are not represented).

> 2) When you say "regardless of the how the problem is structured" there are 4,000 stored 'sub-aggregations', even in the two-window approach--why is that so? Isn't the volume of panes produced by a trigger a function of what keys have actually received new values *in the window*?

True, if most 10-minute intervals that have no event then there are
further optimizations one can do.

> Thanks for help in understanding these details. I want to make good use of Beam and hope to contribute back at some point (docs/writing etc), once I can come to terms with all of these pieces.
>
> On 2019/10/29 20:39:18, Robert Bradshaw <ro...@google.com> wrote:
> > No matter how the problem is structured, computing 30 day aggregations
> > for every 10 minute window requires storing at least 30day/10min =
> > ~4000 sub-aggregations. In Beam, the elements themselves are not
> > stored in every window, only the intermediate aggregates.
> >
> > I second Luke's suggestion to try it out and see if this is indeed a
> > prohibitive bottleneck.
> >
> > On Tue, Oct 29, 2019 at 1:29 PM Luke Cwik <lc...@google.com> wrote:
> > >
> > > You should first try the obvious answer of using a sliding window of 30 days every 10 minutes before you try the 60 days every 30 days.
> > > Beam has some optimizations which will assign a value to multiple windows and only process that value once even if its in many windows. If that doesn't perform well, then come back to dev@ and look to optimize.
> > >
> > > On Tue, Oct 29, 2019 at 1:22 PM Aaron Dixon <at...@gmail.com> wrote:
> > >>
> > >> Hi I am new to Beam.
> > >>
> > >> I would like to accumulate data over 30 day period and perform a running aggregation over this data, say every 10 minutes.
> > >>
> > >> I could use a sliding window of 30 days every 10 minutes (triggering at end of window) but this seems grossly inefficient (both in terms of # of windows at play and # of events duplicated across these windows).
> > >>
> > >> A more efficient strategy seems to be to use a sliding window of 60 days every 30 days -- triggering every 10 minutes -- so that I'm guaranteed to have 30 days worth of data aggregated/combined in at least one of the 2 at-play sliding windows.
> > >>
> > >> The last piece of this puzzle however would be to do a final global aggregation over only the keys from the latest trigger of the earlier sliding window.
> > >>
> > >> But Beam does not seem to offer a way to orchestrate this. Even though this seems like it would be a pretty common or fundamental ask.
> > >>
> > >> One thought I had was to re-window in a way that would isolate keys triggered at the same time, in the same window but I don't see any contracts from Beam that would allow an approach like that.
> > >>
> > >> What am I missing?
> > >>
> > >>
> >

Re: aggregating over triggered results

Posted by Aaron Dixon <at...@gmail.com>.
Thank you, Luke and Robert. Sorry for hitting dev@, I criss-crossed and meant to hit user@, but as we're here could you clarify your two points, however--

1) I am under the impression that the 4,000 sliding windows approach (30 days every 10m) will re-evaluate my combine aggregation every 10m whereas with the two-window approach my Combine aggregation would evolve iteratively, only merging new results into the aggregation. 

If there's a cross-window optimization occurring that would allow iterative combining _across windows_, given the substantial order of magnitude difference in scale at play, is it safe to consider such 'internal optimization detail' part of the platform contract (Dataflow's, say)? Otherwise it would be hard to lean on this from a production system that will live into the future.

2) When you say "regardless of the how the problem is structured" there are 4,000 stored 'sub-aggregations', even in the two-window approach--why is that so? Isn't the volume of panes produced by a trigger a function of what keys have actually received new values *in the window*?

Thanks for help in understanding these details. I want to make good use of Beam and hope to contribute back at some point (docs/writing etc), once I can come to terms with all of these pieces.

On 2019/10/29 20:39:18, Robert Bradshaw <ro...@google.com> wrote: 
> No matter how the problem is structured, computing 30 day aggregations
> for every 10 minute window requires storing at least 30day/10min =
> ~4000 sub-aggregations. In Beam, the elements themselves are not
> stored in every window, only the intermediate aggregates.
> 
> I second Luke's suggestion to try it out and see if this is indeed a
> prohibitive bottleneck.
> 
> On Tue, Oct 29, 2019 at 1:29 PM Luke Cwik <lc...@google.com> wrote:
> >
> > You should first try the obvious answer of using a sliding window of 30 days every 10 minutes before you try the 60 days every 30 days.
> > Beam has some optimizations which will assign a value to multiple windows and only process that value once even if its in many windows. If that doesn't perform well, then come back to dev@ and look to optimize.
> >
> > On Tue, Oct 29, 2019 at 1:22 PM Aaron Dixon <at...@gmail.com> wrote:
> >>
> >> Hi I am new to Beam.
> >>
> >> I would like to accumulate data over 30 day period and perform a running aggregation over this data, say every 10 minutes.
> >>
> >> I could use a sliding window of 30 days every 10 minutes (triggering at end of window) but this seems grossly inefficient (both in terms of # of windows at play and # of events duplicated across these windows).
> >>
> >> A more efficient strategy seems to be to use a sliding window of 60 days every 30 days -- triggering every 10 minutes -- so that I'm guaranteed to have 30 days worth of data aggregated/combined in at least one of the 2 at-play sliding windows.
> >>
> >> The last piece of this puzzle however would be to do a final global aggregation over only the keys from the latest trigger of the earlier sliding window.
> >>
> >> But Beam does not seem to offer a way to orchestrate this. Even though this seems like it would be a pretty common or fundamental ask.
> >>
> >> One thought I had was to re-window in a way that would isolate keys triggered at the same time, in the same window but I don't see any contracts from Beam that would allow an approach like that.
> >>
> >> What am I missing?
> >>
> >>
> 

Re: aggregating over triggered results

Posted by Robert Bradshaw <ro...@google.com>.
No matter how the problem is structured, computing 30 day aggregations
for every 10 minute window requires storing at least 30day/10min =
~4000 sub-aggregations. In Beam, the elements themselves are not
stored in every window, only the intermediate aggregates.

I second Luke's suggestion to try it out and see if this is indeed a
prohibitive bottleneck.

On Tue, Oct 29, 2019 at 1:29 PM Luke Cwik <lc...@google.com> wrote:
>
> You should first try the obvious answer of using a sliding window of 30 days every 10 minutes before you try the 60 days every 30 days.
> Beam has some optimizations which will assign a value to multiple windows and only process that value once even if its in many windows. If that doesn't perform well, then come back to dev@ and look to optimize.
>
> On Tue, Oct 29, 2019 at 1:22 PM Aaron Dixon <at...@gmail.com> wrote:
>>
>> Hi I am new to Beam.
>>
>> I would like to accumulate data over 30 day period and perform a running aggregation over this data, say every 10 minutes.
>>
>> I could use a sliding window of 30 days every 10 minutes (triggering at end of window) but this seems grossly inefficient (both in terms of # of windows at play and # of events duplicated across these windows).
>>
>> A more efficient strategy seems to be to use a sliding window of 60 days every 30 days -- triggering every 10 minutes -- so that I'm guaranteed to have 30 days worth of data aggregated/combined in at least one of the 2 at-play sliding windows.
>>
>> The last piece of this puzzle however would be to do a final global aggregation over only the keys from the latest trigger of the earlier sliding window.
>>
>> But Beam does not seem to offer a way to orchestrate this. Even though this seems like it would be a pretty common or fundamental ask.
>>
>> One thought I had was to re-window in a way that would isolate keys triggered at the same time, in the same window but I don't see any contracts from Beam that would allow an approach like that.
>>
>> What am I missing?
>>
>>

Re: aggregating over triggered results

Posted by Luke Cwik <lc...@google.com>.
You should first try the obvious answer of using a sliding window of 30
days every 10 minutes before you try the 60 days every 30 days.
Beam has some optimizations which will assign a value to multiple windows
and only process that value once even if its in many windows. If that
doesn't perform well, then come back to dev@ and look to optimize.

On Tue, Oct 29, 2019 at 1:22 PM Aaron Dixon <at...@gmail.com> wrote:

> Hi I am new to Beam.
>
> I would like to accumulate data over 30 day period and perform a running
> aggregation over this data, say every 10 minutes.
>
> I could use a sliding window of 30 days every 10 minutes (triggering at
> end of window) but this seems grossly inefficient (both in terms of # of
> windows at play and # of events duplicated across these windows).
>
> A more efficient strategy seems to be to use a sliding window of 60 days
> every 30 days -- *triggering* every 10 minutes -- so that I'm guaranteed
> to have 30 days worth of data aggregated/combined in at least one of the 2
> at-play sliding windows.
>
> The last piece of this puzzle however would be to do a final global
> aggregation over *only the keys from the latest trigger of the earlier
> sliding window*.
>
> But Beam does not seem to offer a way to orchestrate this. Even though
> this seems like it would be a pretty common or fundamental ask.
>
> One thought I had was to re-window in a way that would isolate keys
> triggered at the same time, in the same window but I don't see any
> contracts from Beam that would allow an approach like that.
>
> What am I missing?
>
>
>