You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sam Stephens <sa...@gmail.com> on 2019/10/10 15:44:58 UTC

Joining PCollections to aggregates of themselves

My team and I have been puzzling for a while how to solve a specific
problem.

Say you have an input stream of tuples:

  <String uuid, Integer value>

And you want to output a stream containing:

  <String uuid, Double average>

Where the average is an aggregation over a 10 minute sliding window of the
"value" field.

There are a couple of extra requirements:
A) We want to output a single result for each input tuple
B) We want to output a result as early as possible after the input arrives
(low latency)
C) We want the average value in result_i to have *seen* the value from
input_i

An illustration of the input stream with corresponding output

Time: 00:00:00
Input: <UUID1, 1>
Output: <UUID1, 1.0>

Time: 00:02:00
Input: <UUID2, 2>
Output: <UUID2, 1.5>

Time: 00:08:00
Input: <UUID3, 6>
Output: <UUID3, 3.0>

Time: 00:13:00
Input: <UUID4, 4>
Output: <UUID4, 5.0>

The issue we have is that without some magic tricks and hacky code,
achieving all 3 extra requirements is tough.  A naive solution looks like
this (beam pseudo-code):


PCollectionView<Double> agg = input
     .apply(Windows.sliding(10mins, 1sec
hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
     .apply(Mean.globally())
     .apply(View.asSingleton());

PCollection<Tuples> output = input
     .apply(ParDo.of(new Joiner().withSideInputs(agg)));


The problem is that theres a race-condition - input elements can pass
through the Joiner DoFn before the sideInput corresponding to that element
is present. This makes solving the A, B, C requirements listed above
difficult.

Has anyone solved a similar problem to this before? Any neat ideas?

Re: Joining PCollections to aggregates of themselves

Posted by Kenneth Knowles <ke...@apache.org>.
This seems a great example of use of stateful DoFn. It has essentially the
same structure as the example on the Beam blog but is more meaningful.

Kenn

On Fri, Oct 11, 2019 at 12:38 PM Robert Bradshaw <ro...@google.com>
wrote:

> OK, the only way to do this would be via a non-determanistic stateful
> DoFn that buffers elements as they come in and computes averages by
> looking at the buffer each time.
>
> This could also be represented with an extension to window merging and
> a join, where the trigger would be explicitly used to control the
> balance between latency and correctness.
>
> On Fri, Oct 11, 2019 at 8:01 AM Sam Stephens <sa...@gmail.com>
> wrote:
> >
> > On 2019/10/10 18:23:46, Eugene Kirpichov <ki...@google.com> wrote:
> > > " input elements can pass through the Joiner DoFn before the sideInput
> > > corresponding to that element is present"
> > >
> > > I don't think this is correct. Runners will evaluate a DoFn with side
> > > inputs on elements in a given window only after all side inputs are
> ready
> > > (have triggered at least once) in this window, so your code should be
> safe.
> > > However, runners will not rerun the DoFn with side inputs on subsequent
> > > triggerings of the side inputs, so you won't be able to update the
> results.
> >
> > Yes, but the second or third time an element falling into a given window
> is processed by the Joiner DoFn the side input may not be up-to-date with
> these new elements, so the side-input having triggered at least once is not
> a guarantee it is up to date.
> >
> > On 2019/10/10 18:35:21, Robert Bradshaw <ro...@google.com> wrote:
> >
> > > Time: 00:08:00
> > > Input: <UUID3, 6>
> > Output: <UUID3, 6>
> >
> > >
> > > Time: 00:13:00
> > > Input: <UUID4, 4>
> >
> > Output: <UUID4, 5> // average 4 & 6
> >
> > >
> > > Time: 00:00:00
> > > Input: <UUID1, 1>
> >
> > Output: <UUID1, 1> // average 1
> > >
> > > Time: 00:02:00
> > > Input: <UUID2, 2>
> >
> > Output: <UUID2, 1.5> // average 1 & 2
> >
> > I'd say the least surprising result here is that the aggregate includes
> the best available information at the time of processing. So yes it is
> sensitive to the order of arrival, that's unavoidable I think.
> >
> > >
> > > Are you really trying to emit elements with the mean of all elements
> > > with timestamp up to 10 minutes prior to the current value? That's a
> > > bit different than sliding windows. In that a case you could do
> > > something with a Stateful DoFn that buffers elements and for each
> > > incoming element sets a timer at T which then reads the buffer,
> > > computes the output, and discards elements older than 10 minutes. You
> > > could also possibly do this with a custom WindowFn.
> > >
> >
> > Yes the requirement is basically to enrich an event stream with values
> computed over arbitrary other event streams (including the event stream
> being enriched) and to do this with as low latency as possible.
> >
> > Of course the values derived from other event streams might not be
> included even if they occur before the event being enriched (even if
> "before" is in both the event time and processing time sense). But this is
> easier to swallow because theres no obvious causal dependency between that
> aggregate value and the event being enriched.
> >
> > .. I hope that made sense
>

Re: Joining PCollections to aggregates of themselves

Posted by Robert Bradshaw <ro...@google.com>.
OK, the only way to do this would be via a non-determanistic stateful
DoFn that buffers elements as they come in and computes averages by
looking at the buffer each time.

This could also be represented with an extension to window merging and
a join, where the trigger would be explicitly used to control the
balance between latency and correctness.

On Fri, Oct 11, 2019 at 8:01 AM Sam Stephens <sa...@gmail.com> wrote:
>
> On 2019/10/10 18:23:46, Eugene Kirpichov <ki...@google.com> wrote:
> > " input elements can pass through the Joiner DoFn before the sideInput
> > corresponding to that element is present"
> >
> > I don't think this is correct. Runners will evaluate a DoFn with side
> > inputs on elements in a given window only after all side inputs are ready
> > (have triggered at least once) in this window, so your code should be safe.
> > However, runners will not rerun the DoFn with side inputs on subsequent
> > triggerings of the side inputs, so you won't be able to update the results.
>
> Yes, but the second or third time an element falling into a given window is processed by the Joiner DoFn the side input may not be up-to-date with these new elements, so the side-input having triggered at least once is not a guarantee it is up to date.
>
> On 2019/10/10 18:35:21, Robert Bradshaw <ro...@google.com> wrote:
>
> > Time: 00:08:00
> > Input: <UUID3, 6>
> Output: <UUID3, 6>
>
> >
> > Time: 00:13:00
> > Input: <UUID4, 4>
>
> Output: <UUID4, 5> // average 4 & 6
>
> >
> > Time: 00:00:00
> > Input: <UUID1, 1>
>
> Output: <UUID1, 1> // average 1
> >
> > Time: 00:02:00
> > Input: <UUID2, 2>
>
> Output: <UUID2, 1.5> // average 1 & 2
>
> I'd say the least surprising result here is that the aggregate includes the best available information at the time of processing. So yes it is sensitive to the order of arrival, that's unavoidable I think.
>
> >
> > Are you really trying to emit elements with the mean of all elements
> > with timestamp up to 10 minutes prior to the current value? That's a
> > bit different than sliding windows. In that a case you could do
> > something with a Stateful DoFn that buffers elements and for each
> > incoming element sets a timer at T which then reads the buffer,
> > computes the output, and discards elements older than 10 minutes. You
> > could also possibly do this with a custom WindowFn.
> >
>
> Yes the requirement is basically to enrich an event stream with values computed over arbitrary other event streams (including the event stream being enriched) and to do this with as low latency as possible.
>
> Of course the values derived from other event streams might not be included even if they occur before the event being enriched (even if "before" is in both the event time and processing time sense). But this is easier to swallow because theres no obvious causal dependency between that aggregate value and the event being enriched.
>
> .. I hope that made sense

Re: Joining PCollections to aggregates of themselves

Posted by Sam Stephens <sa...@gmail.com>.
On 2019/10/10 18:23:46, Eugene Kirpichov <ki...@google.com> wrote: 
> " input elements can pass through the Joiner DoFn before the sideInput
> corresponding to that element is present"
> 
> I don't think this is correct. Runners will evaluate a DoFn with side
> inputs on elements in a given window only after all side inputs are ready
> (have triggered at least once) in this window, so your code should be safe.
> However, runners will not rerun the DoFn with side inputs on subsequent
> triggerings of the side inputs, so you won't be able to update the results.

Yes, but the second or third time an element falling into a given window is processed by the Joiner DoFn the side input may not be up-to-date with these new elements, so the side-input having triggered at least once is not a guarantee it is up to date.

On 2019/10/10 18:35:21, Robert Bradshaw <ro...@google.com> wrote: 

> Time: 00:08:00
> Input: <UUID3, 6>
Output: <UUID3, 6>

> 
> Time: 00:13:00
> Input: <UUID4, 4>

Output: <UUID4, 5> // average 4 & 6

> 
> Time: 00:00:00
> Input: <UUID1, 1>

Output: <UUID1, 1> // average 1
> 
> Time: 00:02:00
> Input: <UUID2, 2>

Output: <UUID2, 1.5> // average 1 & 2

I'd say the least surprising result here is that the aggregate includes the best available information at the time of processing. So yes it is sensitive to the order of arrival, that's unavoidable I think.

> 
> Are you really trying to emit elements with the mean of all elements
> with timestamp up to 10 minutes prior to the current value? That's a
> bit different than sliding windows. In that a case you could do
> something with a Stateful DoFn that buffers elements and for each
> incoming element sets a timer at T which then reads the buffer,
> computes the output, and discards elements older than 10 minutes. You
> could also possibly do this with a custom WindowFn.
> 

Yes the requirement is basically to enrich an event stream with values computed over arbitrary other event streams (including the event stream being enriched) and to do this with as low latency as possible. 

Of course the values derived from other event streams might not be included even if they occur before the event being enriched (even if "before" is in both the event time and processing time sense). But this is easier to swallow because theres no obvious causal dependency between that aggregate value and the event being enriched.

.. I hope that made sense

Re: Joining PCollections to aggregates of themselves

Posted by Robert Bradshaw <ro...@google.com>.
Looking at the naive solution

PCollectionView<Double> agg = input
     .apply(Windows.sliding(10mins, 1sec
hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
     .apply(Mean.globally())
     .apply(View.asSingleton());

PCollection<Tuples> output = input
     .apply(ParDo.of(new Joiner().withSideInputs(agg)));

the constraint (C) is being violated because of your
AfterPane.elementCountAtLeast trigger, which will emit averages before
having seen all the values that should contribute to the average. It
will emit a (speculative) average as soon as the first element comes
in, and at each subsequent element, but this order may not correspond
to the order in which elements get passed into the Joiner.

Removing this trigger would hold back processing in Joiner until this
element (as well as all other elements) in the window have been seen
to produce the final average.

Your computation seems to be sensitive to the ordering that elements
arrive. What would you expect the output to be for

Time: 00:08:00
Input: <UUID3, 6>
Output: <UUID3, ???>

Time: 00:13:00
Input: <UUID4, 4>
Output: <UUID4, ???>

Time: 00:00:00
Input: <UUID1, 1>
Output: <UUID1, ???>

Time: 00:02:00
Input: <UUID2, 2>
Output: <UUID2, ???>

Are you really trying to emit elements with the mean of all elements
with timestamp up to 10 minutes prior to the current value? That's a
bit different than sliding windows. In that a case you could do
something with a Stateful DoFn that buffers elements and for each
incoming element sets a timer at T which then reads the buffer,
computes the output, and discards elements older than 10 minutes. You
could also possibly do this with a custom WindowFn.

On Thu, Oct 10, 2019 at 10:56 AM rahul patwari
<ra...@gmail.com> wrote:
>
> With Stateful DoFn, each instance of DoFn will have elements which belong to the same window and have the same key. So, the parallelism is limited by [no. of keys * no. of Windows]

On a practical note, no runner actually parallelizes across windows
(and indeed sometimes, e.g. for merging windows like sessions, it's
not actually possible).

> In batch mode, as all the elements belong to the same window, i.e. Global Window, the parallelism will be limited by the [no. of keys]. So, if you only have one key, only one instance of DoFn will be running.
>
> AFAIK, it is not possible to pass the elements through the DoFn in the desired order.

That is correct.

> On Thu, Oct 10, 2019 at 10:11 PM Sam Stephens <sa...@gmail.com> wrote:
>>
>> Hi Rahul,
>>
>> Thanks for the response.
>>
>> I did consider State, but actually I was tentative because of a different requirement that I didn't specify - the same pipeline should work for batch and stream modes. I'm not sure how Stateful DoFn's behave in the batch world: can you get Beam to pass the elements through the DoFn in a desired order, e.g. by sorted by event time?
>>
>> Most aggregations we're likely to be running are per-key rather than global, so the parallelism issue might not be such a big deal.
>>
>> Regards,
>> Sam
>>
>> On Thu, Oct 10, 2019 at 5:30 PM rahul patwari <ra...@gmail.com> wrote:
>>>
>>> Hi Sam,
>>>
>>> (Assuming all the tuples have the same key) One solution could be to use ParDo with State(to calculate mean) => For each element as they occur, calculate the Mean(store the sum and count as the state) and emit the tuple with the new average value.
>>> But it will limit the parallelism count.
>>>
>>> Regards,
>>> Rahul
>>>
>>> On Thu, Oct 10, 2019 at 9:15 PM Sam Stephens <sa...@gmail.com> wrote:
>>>>
>>>> My team and I have been puzzling for a while how to solve a specific problem.
>>>>
>>>> Say you have an input stream of tuples:
>>>>
>>>>   <String uuid, Integer value>
>>>>
>>>> And you want to output a stream containing:
>>>>
>>>>   <String uuid, Double average>
>>>>
>>>> Where the average is an aggregation over a 10 minute sliding window of the "value" field.
>>>>
>>>> There are a couple of extra requirements:
>>>> A) We want to output a single result for each input tuple
>>>> B) We want to output a result as early as possible after the input arrives (low latency)
>>>> C) We want the average value in result_i to have *seen* the value from input_i
>>>>
>>>> An illustration of the input stream with corresponding output
>>>>
>>>> Time: 00:00:00
>>>> Input: <UUID1, 1>
>>>> Output: <UUID1, 1.0>
>>>>
>>>> Time: 00:02:00
>>>> Input: <UUID2, 2>
>>>> Output: <UUID2, 1.5>
>>>>
>>>> Time: 00:08:00
>>>> Input: <UUID3, 6>
>>>> Output: <UUID3, 3.0>
>>>>
>>>> Time: 00:13:00
>>>> Input: <UUID4, 4>
>>>> Output: <UUID4, 5.0>
>>>>
>>>> The issue we have is that without some magic tricks and hacky code, achieving all 3 extra requirements is tough.  A naive solution looks like this (beam pseudo-code):
>>>>
>>>>
>>>> PCollectionView<Double> agg = input
>>>>      .apply(Windows.sliding(10mins, 1sec hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
>>>>      .apply(Mean.globally())
>>>>      .apply(View.asSingleton());
>>>>
>>>> PCollection<Tuples> output = input
>>>>      .apply(ParDo.of(new Joiner().withSideInputs(agg)));
>>>>
>>>>
>>>> The problem is that theres a race-condition - input elements can pass through the Joiner DoFn before the sideInput corresponding to that element is present. This makes solving the A, B, C requirements listed above difficult.
>>>>
>>>> Has anyone solved a similar problem to this before? Any neat ideas?

Re: Joining PCollections to aggregates of themselves

Posted by rahul patwari <ra...@gmail.com>.
With Stateful DoFn, each instance of DoFn will have elements which belong
to the same window and have the same key. So, the parallelism is limited by
[no. of keys * no. of Windows]
In batch mode, as all the elements belong to the same window, i.e. Global
Window, the parallelism will be limited by the [no. of keys]. So, if you
only have one key, only one instance of DoFn will be running.

AFAIK, it is not possible to pass the elements through the DoFn in the
desired order.

On Thu, Oct 10, 2019 at 10:11 PM Sam Stephens <sa...@gmail.com>
wrote:

> Hi Rahul,
>
> Thanks for the response.
>
> I did consider State, but actually I was tentative because of a different
> requirement that I didn't specify - the same pipeline should work for batch
> and stream modes. I'm not sure how Stateful DoFn's behave in the batch
> world: can you get Beam to pass the elements through the DoFn in a desired
> order, e.g. by sorted by event time?
>
> Most aggregations we're likely to be running are per-key rather than
> global, so the parallelism issue might not be such a big deal.
>
> Regards,
> Sam
>
> On Thu, Oct 10, 2019 at 5:30 PM rahul patwari <ra...@gmail.com>
> wrote:
>
>> Hi Sam,
>>
>> (Assuming all the tuples have the same key) One solution could be to use
>> ParDo with State(to calculate mean) => For each element as they occur,
>> calculate the Mean(store the sum and count as the state) and emit the tuple
>> with the new average value.
>> But it will limit the parallelism count.
>>
>> Regards,
>> Rahul
>>
>> On Thu, Oct 10, 2019 at 9:15 PM Sam Stephens <sa...@gmail.com>
>> wrote:
>>
>>> My team and I have been puzzling for a while how to solve a specific
>>> problem.
>>>
>>> Say you have an input stream of tuples:
>>>
>>>   <String uuid, Integer value>
>>>
>>> And you want to output a stream containing:
>>>
>>>   <String uuid, Double average>
>>>
>>> Where the average is an aggregation over a 10 minute sliding window of
>>> the "value" field.
>>>
>>> There are a couple of extra requirements:
>>> A) We want to output a single result for each input tuple
>>> B) We want to output a result as early as possible after the input
>>> arrives (low latency)
>>> C) We want the average value in result_i to have *seen* the value from
>>> input_i
>>>
>>> An illustration of the input stream with corresponding output
>>>
>>> Time: 00:00:00
>>> Input: <UUID1, 1>
>>> Output: <UUID1, 1.0>
>>>
>>> Time: 00:02:00
>>> Input: <UUID2, 2>
>>> Output: <UUID2, 1.5>
>>>
>>> Time: 00:08:00
>>> Input: <UUID3, 6>
>>> Output: <UUID3, 3.0>
>>>
>>> Time: 00:13:00
>>> Input: <UUID4, 4>
>>> Output: <UUID4, 5.0>
>>>
>>> The issue we have is that without some magic tricks and hacky code,
>>> achieving all 3 extra requirements is tough.  A naive solution looks like
>>> this (beam pseudo-code):
>>>
>>>
>>> PCollectionView<Double> agg = input
>>>      .apply(Windows.sliding(10mins, 1sec
>>> hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
>>>      .apply(Mean.globally())
>>>      .apply(View.asSingleton());
>>>
>>> PCollection<Tuples> output = input
>>>      .apply(ParDo.of(new Joiner().withSideInputs(agg)));
>>>
>>>
>>> The problem is that theres a race-condition - input elements can pass
>>> through the Joiner DoFn before the sideInput corresponding to that element
>>> is present. This makes solving the A, B, C requirements listed above
>>> difficult.
>>>
>>> Has anyone solved a similar problem to this before? Any neat ideas?
>>>
>>

Re: Joining PCollections to aggregates of themselves

Posted by Sam Stephens <sa...@gmail.com>.
Hi Rahul,

Thanks for the response.

I did consider State, but actually I was tentative because of a different
requirement that I didn't specify - the same pipeline should work for batch
and stream modes. I'm not sure how Stateful DoFn's behave in the batch
world: can you get Beam to pass the elements through the DoFn in a desired
order, e.g. by sorted by event time?

Most aggregations we're likely to be running are per-key rather than
global, so the parallelism issue might not be such a big deal.

Regards,
Sam

On Thu, Oct 10, 2019 at 5:30 PM rahul patwari <ra...@gmail.com>
wrote:

> Hi Sam,
>
> (Assuming all the tuples have the same key) One solution could be to use
> ParDo with State(to calculate mean) => For each element as they occur,
> calculate the Mean(store the sum and count as the state) and emit the tuple
> with the new average value.
> But it will limit the parallelism count.
>
> Regards,
> Rahul
>
> On Thu, Oct 10, 2019 at 9:15 PM Sam Stephens <sa...@gmail.com>
> wrote:
>
>> My team and I have been puzzling for a while how to solve a specific
>> problem.
>>
>> Say you have an input stream of tuples:
>>
>>   <String uuid, Integer value>
>>
>> And you want to output a stream containing:
>>
>>   <String uuid, Double average>
>>
>> Where the average is an aggregation over a 10 minute sliding window of
>> the "value" field.
>>
>> There are a couple of extra requirements:
>> A) We want to output a single result for each input tuple
>> B) We want to output a result as early as possible after the input
>> arrives (low latency)
>> C) We want the average value in result_i to have *seen* the value from
>> input_i
>>
>> An illustration of the input stream with corresponding output
>>
>> Time: 00:00:00
>> Input: <UUID1, 1>
>> Output: <UUID1, 1.0>
>>
>> Time: 00:02:00
>> Input: <UUID2, 2>
>> Output: <UUID2, 1.5>
>>
>> Time: 00:08:00
>> Input: <UUID3, 6>
>> Output: <UUID3, 3.0>
>>
>> Time: 00:13:00
>> Input: <UUID4, 4>
>> Output: <UUID4, 5.0>
>>
>> The issue we have is that without some magic tricks and hacky code,
>> achieving all 3 extra requirements is tough.  A naive solution looks like
>> this (beam pseudo-code):
>>
>>
>> PCollectionView<Double> agg = input
>>      .apply(Windows.sliding(10mins, 1sec
>> hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
>>      .apply(Mean.globally())
>>      .apply(View.asSingleton());
>>
>> PCollection<Tuples> output = input
>>      .apply(ParDo.of(new Joiner().withSideInputs(agg)));
>>
>>
>> The problem is that theres a race-condition - input elements can pass
>> through the Joiner DoFn before the sideInput corresponding to that element
>> is present. This makes solving the A, B, C requirements listed above
>> difficult.
>>
>> Has anyone solved a similar problem to this before? Any neat ideas?
>>
>

Re: Joining PCollections to aggregates of themselves

Posted by rahul patwari <ra...@gmail.com>.
Hi Sam,

(Assuming all the tuples have the same key) One solution could be to use
ParDo with State(to calculate mean) => For each element as they occur,
calculate the Mean(store the sum and count as the state) and emit the tuple
with the new average value.
But it will limit the parallelism count.

Regards,
Rahul

On Thu, Oct 10, 2019 at 9:15 PM Sam Stephens <sa...@gmail.com>
wrote:

> My team and I have been puzzling for a while how to solve a specific
> problem.
>
> Say you have an input stream of tuples:
>
>   <String uuid, Integer value>
>
> And you want to output a stream containing:
>
>   <String uuid, Double average>
>
> Where the average is an aggregation over a 10 minute sliding window of the
> "value" field.
>
> There are a couple of extra requirements:
> A) We want to output a single result for each input tuple
> B) We want to output a result as early as possible after the input arrives
> (low latency)
> C) We want the average value in result_i to have *seen* the value from
> input_i
>
> An illustration of the input stream with corresponding output
>
> Time: 00:00:00
> Input: <UUID1, 1>
> Output: <UUID1, 1.0>
>
> Time: 00:02:00
> Input: <UUID2, 2>
> Output: <UUID2, 1.5>
>
> Time: 00:08:00
> Input: <UUID3, 6>
> Output: <UUID3, 3.0>
>
> Time: 00:13:00
> Input: <UUID4, 4>
> Output: <UUID4, 5.0>
>
> The issue we have is that without some magic tricks and hacky code,
> achieving all 3 extra requirements is tough.  A naive solution looks like
> this (beam pseudo-code):
>
>
> PCollectionView<Double> agg = input
>      .apply(Windows.sliding(10mins, 1sec
> hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
>      .apply(Mean.globally())
>      .apply(View.asSingleton());
>
> PCollection<Tuples> output = input
>      .apply(ParDo.of(new Joiner().withSideInputs(agg)));
>
>
> The problem is that theres a race-condition - input elements can pass
> through the Joiner DoFn before the sideInput corresponding to that element
> is present. This makes solving the A, B, C requirements listed above
> difficult.
>
> Has anyone solved a similar problem to this before? Any neat ideas?
>

Re: Joining PCollections to aggregates of themselves

Posted by Eugene Kirpichov <ki...@google.com>.
" input elements can pass through the Joiner DoFn before the sideInput
corresponding to that element is present"

I don't think this is correct. Runners will evaluate a DoFn with side
inputs on elements in a given window only after all side inputs are ready
(have triggered at least once) in this window, so your code should be safe.
However, runners will not rerun the DoFn with side inputs on subsequent
triggerings of the side inputs, so you won't be able to update the results.

On Thu, Oct 10, 2019 at 8:45 AM Sam Stephens <sa...@gmail.com>
wrote:

> My team and I have been puzzling for a while how to solve a specific
> problem.
>
> Say you have an input stream of tuples:
>
>   <String uuid, Integer value>
>
> And you want to output a stream containing:
>
>   <String uuid, Double average>
>
> Where the average is an aggregation over a 10 minute sliding window of the
> "value" field.
>
> There are a couple of extra requirements:
> A) We want to output a single result for each input tuple
> B) We want to output a result as early as possible after the input arrives
> (low latency)
> C) We want the average value in result_i to have *seen* the value from
> input_i
>
> An illustration of the input stream with corresponding output
>
> Time: 00:00:00
> Input: <UUID1, 1>
> Output: <UUID1, 1.0>
>
> Time: 00:02:00
> Input: <UUID2, 2>
> Output: <UUID2, 1.5>
>
> Time: 00:08:00
> Input: <UUID3, 6>
> Output: <UUID3, 3.0>
>
> Time: 00:13:00
> Input: <UUID4, 4>
> Output: <UUID4, 5.0>
>
> The issue we have is that without some magic tricks and hacky code,
> achieving all 3 extra requirements is tough.  A naive solution looks like
> this (beam pseudo-code):
>
>
> PCollectionView<Double> agg = input
>      .apply(Windows.sliding(10mins, 1sec
> hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
>      .apply(Mean.globally())
>      .apply(View.asSingleton());
>
> PCollection<Tuples> output = input
>      .apply(ParDo.of(new Joiner().withSideInputs(agg)));
>
>
> The problem is that theres a race-condition - input elements can pass
> through the Joiner DoFn before the sideInput corresponding to that element
> is present. This makes solving the A, B, C requirements listed above
> difficult.
>
> Has anyone solved a similar problem to this before? Any neat ideas?
>