You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Stephan Kotze <st...@gmail.com> on 2018/05/23 11:43:10 UTC

Global Window - Correctness/Completeness

Hi

We are trying to implement a scenario that requires a rigid order of events
arriving into a Global Aggregation.

To protect the innocent, I've simplified and modified the exact scenario
into something like the following:


   1. I want to calculate a Client's all time Bank balance for example.
   2. I would like to at fixed intervals, emit this all time Balance, and
   join it onto other Fixed window aggregates.


In principle something like this:
[image: image.png]

Now the problem is, that it would seem that the order in which the fixed
windows arrive in the Global window is not Guaranteed (given our
reading/experimenting).

This means that something like the following could potentially occur:
[image: image.png]

Essentially, even though the events/fixed windows are ordered, we can at
any given point in time omit an incorrect balance for the particular
person. (Even though our underlying events are ordered, and no late data
can arrive).

Given our current understanding  of how the compute gets parallelised
(which is how Beam Achieves its high throughput) this appears unavoidable.

Unfortunately, we do require a globally correct balance every time we omit
the value.

Are we missing something?
Does one need to sacrifice this consistency if using Beam?
Must one write some sort of a stateful DoFn that keeps track of the past X
windows and eventually decides: OK I'm as consistent as I am willing to be,
Re-order the events, emit results, purge the "complete set" and ignore any
further late (due to internal beam processing rather than late data) and
move along?
Or is there something else we can do? (,Tweak some options :) or Move this
calc into another streaming framework, with stricter guarantees for
example).

It seams like such an obvious use case at first glance, but learning more
about Beam, seems to indicate that this is not actually a use case that it
is well suited for?

Stephan

Re: Global Window - Correctness/Completeness

Posted by Lukasz Cwik <lc...@google.com>.
I forgot to mention, but instead of using CoGBK (or Flatten + GBK), you can
use side inputs with a custom window function with a window mapping
function which remaps the main window onto the side input window from the
past. This is useful if you plan to join the past sum with many
aggregations or you plan to use a different windowing strategy between the
BackEdgeSource and OriginalSource.

On Wed, May 23, 2018 at 9:40 AM Lukasz Cwik <lc...@google.com> wrote:

> The issue with Apache Beam and many stream processing systems is that they
> don't support back edges since they produce loops which may or may not be
> able to be resolved.
>
> One way to work around this is to emit your data to a pubsub system like
> Kafka or GCP Pubsub and read that as a source within your pipeline in
> addition to your normal source.
> OriginalSource --> 5 min aggregations --> CoGBK --> current sum = current
> aggregation + past sum  --> OriginalSink
> BackEdgeSource -------------------------/
>                        \-- BackEdgeSink
>
> This allows for both the OriginalSource and BackEdgeSource to maintain
> their own watermarks and for the CoGBK (or Flatten + GBK if the input has
> the same key and value type) to produce output based upon the windowing
> strategy you define (no manual state/timers management !). Its important
> that the BackEdgeSource's elements timestamps that are emitted are for the
> window it will be used in, so for a 5 min aggregation window if the current
> sum is at time X, you should emit it at X+5 to become the past sum for the
> next window. Similarly it would be wise to use triggers which only fire
> once (so no speculative or late firings).
>
>
> If you don't want to use the solution above, then using Global windows and
> stateful DoFn makes the most sense based upon what you described using "track
> the past X windows and eventually decides: OK I'm as consistent as I am
> willing to be, Re-order the events, emit results, purge the "complete set"
> and ignore any further late (due to internal beam processing rather than
> late data). Note that stateful DoFn is partitioned by key and window so if
> you use 5 minute windows you'll only see the state that that was recorded
> for that 5 mins which is why its important to use the global window, also
> note that its easy to have "leaks" in stateful DoFns since the watermark
> will never surpass the point in time when the state data would be garbage
> collected.
>
>
> You might have better success with a simpler application though, it all
> depends on what the rest of the system looks like and what your integrating
> with.
>
> On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <st...@gmail.com>
> wrote:
>
>> Hi
>>
>> We are trying to implement a scenario that requires a rigid order of
>> events arriving into a Global Aggregation.
>>
>> To protect the innocent, I've simplified and modified the exact scenario
>> into something like the following:
>>
>>
>>    1. I want to calculate a Client's all time Bank balance for example.
>>    2. I would like to at fixed intervals, emit this all time Balance,
>>    and join it onto other Fixed window aggregates.
>>
>>
>> In principle something like this:
>> [image: image.png]
>>
>> Now the problem is, that it would seem that the order in which the fixed
>> windows arrive in the Global window is not Guaranteed (given our
>> reading/experimenting).
>>
>> This means that something like the following could potentially occur:
>> [image: image.png]
>>
>> Essentially, even though the events/fixed windows are ordered, we can at
>> any given point in time omit an incorrect balance for the particular
>> person. (Even though our underlying events are ordered, and no late data
>> can arrive).
>>
>> Given our current understanding  of how the compute gets parallelised
>> (which is how Beam Achieves its high throughput) this appears unavoidable.
>>
>> Unfortunately, we do require a globally correct balance every time we
>> omit the value.
>>
>> Are we missing something?
>> Does one need to sacrifice this consistency if using Beam?
>> Must one write some sort of a stateful DoFn that keeps track of the past
>> X windows and eventually decides: OK I'm as consistent as I am willing to
>> be, Re-order the events, emit results, purge the "complete set" and ignore
>> any further late (due to internal beam processing rather than late data)
>> and move along?
>> Or is there something else we can do? (,Tweak some options :) or Move
>> this calc into another streaming framework, with stricter guarantees for
>> example).
>>
>> It seams like such an obvious use case at first glance, but learning more
>> about Beam, seems to indicate that this is not actually a use case that it
>> is well suited for?
>>
>> Stephan
>>
>>

Re: Global Window - Correctness/Completeness

Posted by Stephan Kotze <st...@gmail.com>.
I forgot to add: and thanks for all the input!

Stephan

On Fri, May 25, 2018 at 9:18 AM Stephan Kotze <st...@gmail.com>
wrote:

> Indeed, this is exactly what we ended up doing, for now. :)
>
> Blog would be sweet :)
>
> Stephan
>
> On Fri, May 25, 2018 at 3:34 AM Reza Rokni <re...@google.com> wrote:
>
>> Hi,
>>
>> Some code that I am working on but not yet fully tested which might be
>> interesting...
>>
>> In the Dataflow runner timers will fire in order... So the code that I am
>> playing with to solve a similar type of use case (if I have understood your
>> use case correctly...)
>>
>> When you enter the Keyed State on the Global Window , set a timer that
>> will fire in the next Fixed window (lower boundary + x).
>> Attach all accum objects to a List<Accum> in the Keyed State.
>>
>> OnTimer fire()
>> {
>>  Read all elements in list
>> Sort
>> Output results...
>> }
>>
>> Hope to blog about this soon if the testing doesnt find any issues...
>>
>>
>> On 25 May 2018 at 09:00, Robert Bradshaw <ro...@google.com> wrote:
>>
>>> This is an interesting question.
>>>
>>> From a model perspective, one way to represent this is that you're
>>> trying to group all your events into windows with the custom windowing
>>> function
>>>
>>>     t -> { [-infinity, k*5min : k*5min < t] }
>>>
>>> where k varies over all integers. You would then do a per-key sum over
>>> these values. Unfortunately, no Beam runner supports infinite window sets
>>> such as these...
>>>
>>> As mentioned, you could do this with a stateful DoFn and timers in the
>>> global window. When a the partial sum at time T arrives, store it to your
>>> state, and set a timer for T. When this timer fires, you know you'll have
>>> received all partial sums up to T (plus possibly some after T), and you can
>>> emit (and store) the cumulative sum(s) at up to and including T and purge
>>> the buffer.
>>>
>>> On Thu, May 24, 2018 at 5:38 PM Raghu Angadi <ra...@google.com> wrote:
>>>
>>>> I don't think you need to keep last 'n' windows with Stateful DoFn in
>>>> order to reorder. You mentioned your source is ordered, which implies it
>>>> can effectively have a serial id (global or per key). With each 5 minute
>>>> aggregation you also include the serial-id range. Inside the StatefulDoFn,
>>>> you can easily decide if you can immediately emit (in the normal case of
>>>> time order, since window size is large), or if you need to buffer.
>>>>
>>>> On Wed, May 23, 2018 at 11:40 AM Stephan Kotze <
>>>> stephanus.kotze@gmail.com> wrote:
>>>>
>>>>> Indeed, and very true :)
>>>>>
>>>>> More a flight of fancy, than anything else.
>>>>>
>>>>> Trying hard to find the right tool for the job, but something like
>>>>> this could potentially (risks aside for this conversation as it is
>>>>> potentially a hairy implementation with more effort and risk than worth the
>>>>> reward) reduce the need for us to either, rewrite the app in Flink, or do
>>>>> some parts in Beam and others in Flink.
>>>>>
>>>>> Dunno ;)
>>>>>
>>>>> Stephan
>>>>>
>>>>> On Wed, May 23, 2018 at 7:22 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> You could but would need to change how the translation of an Apache
>>>>>> Beam pipeline is done within Flink
>>>>>> https://github.com/apache/beam/blob/8345991842dbc1f3cd8c2902ea8989bb0a8e81fb/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L47
>>>>>> (and possibly it's superclass). You would need to take care of how
>>>>>> stream.key().orderByTime() affects windowing and triggering behavior with
>>>>>> regards to Apache Beam and may need to be quite well versed in the Apache
>>>>>> Flink execution model and Apache Beam model.
>>>>>>
>>>>>> You might as well as use Flink instead of writing an Apache Beam
>>>>>> pipeline.
>>>>>>
>>>>>> On Wed, May 23, 2018 at 10:20 AM Stephan Kotze <
>>>>>> stephanus.kotze@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Lukasz.
>>>>>>>
>>>>>>> As a Hypothetical question (to anyone more familiar with the runner
>>>>>>> than I):
>>>>>>> If one is using the Flink runner, would it theoretically be possible
>>>>>>> to modify the runner so that it constructs its pipelines as:
>>>>>>> stream.keyBy(...).orderByTime()
>>>>>>> <https://cwiki.apache.org/confluence/display/FLINK/stream.keyBy(...).orderByTime()>
>>>>>>> .
>>>>>>> Unless I'm missing something, this could be a cheeky way (bypassing
>>>>>>> the Beam API) to enforce stricter ordering (with all the ensuing costs) on
>>>>>>> a runner that would appear to support the desired semantics?
>>>>>>>
>>>>>>>
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
>>>>>>>
>>>>>>> Stephan
>>>>>>>
>>>>>>> On Wed, May 23, 2018 at 5:41 PM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> The issue with Apache Beam and many stream processing systems is
>>>>>>>> that they don't support back edges since they produce loops which may or
>>>>>>>> may not be able to be resolved.
>>>>>>>>
>>>>>>>> One way to work around this is to emit your data to a pubsub system
>>>>>>>> like Kafka or GCP Pubsub and read that as a source within your pipeline in
>>>>>>>> addition to your normal source.
>>>>>>>> OriginalSource --> 5 min aggregations --> CoGBK --> current sum =
>>>>>>>> current aggregation + past sum  --> OriginalSink
>>>>>>>> BackEdgeSource -------------------------/
>>>>>>>>                                \-- BackEdgeSink
>>>>>>>>
>>>>>>>> This allows for both the OriginalSource and BackEdgeSource to
>>>>>>>> maintain their own watermarks and for the CoGBK (or Flatten + GBK if the
>>>>>>>> input has the same key and value type) to produce output based upon the
>>>>>>>> windowing strategy you define (no manual state/timers management !). Its
>>>>>>>> important that the BackEdgeSource's elements timestamps that are emitted
>>>>>>>> are for the window it will be used in, so for a 5 min aggregation window if
>>>>>>>> the current sum is at time X, you should emit it at X+5 to become the past
>>>>>>>> sum for the next window. Similarly it would be wise to use triggers which
>>>>>>>> only fire once (so no speculative or late firings).
>>>>>>>>
>>>>>>>>
>>>>>>>> If you don't want to use the solution above, then using Global
>>>>>>>> windows and stateful DoFn makes the most sense based upon what you
>>>>>>>> described using "track the past X windows and eventually decides:
>>>>>>>> OK I'm as consistent as I am willing to be, Re-order the events, emit
>>>>>>>> results, purge the "complete set" and ignore any further late (due to
>>>>>>>> internal beam processing rather than late data). Note that stateful DoFn is
>>>>>>>> partitioned by key and window so if you use 5 minute windows you'll only
>>>>>>>> see the state that that was recorded for that 5 mins which is why its
>>>>>>>> important to use the global window, also note that its easy to have "leaks"
>>>>>>>> in stateful DoFns since the watermark will never surpass the point in time
>>>>>>>> when the state data would be garbage collected.
>>>>>>>>
>>>>>>>>
>>>>>>>> You might have better success with a simpler application though, it
>>>>>>>> all depends on what the rest of the system looks like and what your
>>>>>>>> integrating with.
>>>>>>>>
>>>>>>>> On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <
>>>>>>>> stephanus.kotze@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi
>>>>>>>>>
>>>>>>>>> We are trying to implement a scenario that requires a rigid order
>>>>>>>>> of events arriving into a Global Aggregation.
>>>>>>>>>
>>>>>>>>> To protect the innocent, I've simplified and modified the exact
>>>>>>>>> scenario into something like the following:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    1. I want to calculate a Client's all time Bank balance for
>>>>>>>>>    example.
>>>>>>>>>    2. I would like to at fixed intervals, emit this all time
>>>>>>>>>    Balance, and join it onto other Fixed window aggregates.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> In principle something like this:
>>>>>>>>> [image: image.png]
>>>>>>>>>
>>>>>>>>> Now the problem is, that it would seem that the order in which the
>>>>>>>>> fixed windows arrive in the Global window is not Guaranteed (given our
>>>>>>>>> reading/experimenting).
>>>>>>>>>
>>>>>>>>> This means that something like the following could potentially
>>>>>>>>> occur:
>>>>>>>>> [image: image.png]
>>>>>>>>>
>>>>>>>>> Essentially, even though the events/fixed windows are ordered, we
>>>>>>>>> can at any given point in time omit an incorrect balance for the particular
>>>>>>>>> person. (Even though our underlying events are ordered, and no late data
>>>>>>>>> can arrive).
>>>>>>>>>
>>>>>>>>> Given our current understanding  of how the compute gets
>>>>>>>>> parallelised (which is how Beam Achieves its high throughput) this appears
>>>>>>>>> unavoidable.
>>>>>>>>>
>>>>>>>>> Unfortunately, we do require a globally correct balance every time
>>>>>>>>> we omit the value.
>>>>>>>>>
>>>>>>>>> Are we missing something?
>>>>>>>>> Does one need to sacrifice this consistency if using Beam?
>>>>>>>>> Must one write some sort of a stateful DoFn that keeps track of
>>>>>>>>> the past X windows and eventually decides: OK I'm as consistent as I am
>>>>>>>>> willing to be, Re-order the events, emit results, purge the "complete set"
>>>>>>>>> and ignore any further late (due to internal beam processing rather than
>>>>>>>>> late data) and move along?
>>>>>>>>> Or is there something else we can do? (,Tweak some options :) or
>>>>>>>>> Move this calc into another streaming framework, with stricter guarantees
>>>>>>>>> for example).
>>>>>>>>>
>>>>>>>>> It seams like such an obvious use case at first glance, but
>>>>>>>>> learning more about Beam, seems to indicate that this is not actually a use
>>>>>>>>> case that it is well suited for?
>>>>>>>>>
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>

Re: Global Window - Correctness/Completeness

Posted by Stephan Kotze <st...@gmail.com>.
Indeed, this is exactly what we ended up doing, for now. :)

Blog would be sweet :)

Stephan

On Fri, May 25, 2018 at 3:34 AM Reza Rokni <re...@google.com> wrote:

> Hi,
>
> Some code that I am working on but not yet fully tested which might be
> interesting...
>
> In the Dataflow runner timers will fire in order... So the code that I am
> playing with to solve a similar type of use case (if I have understood your
> use case correctly...)
>
> When you enter the Keyed State on the Global Window , set a timer that
> will fire in the next Fixed window (lower boundary + x).
> Attach all accum objects to a List<Accum> in the Keyed State.
>
> OnTimer fire()
> {
>  Read all elements in list
> Sort
> Output results...
> }
>
> Hope to blog about this soon if the testing doesnt find any issues...
>
>
> On 25 May 2018 at 09:00, Robert Bradshaw <ro...@google.com> wrote:
>
>> This is an interesting question.
>>
>> From a model perspective, one way to represent this is that you're trying
>> to group all your events into windows with the custom windowing function
>>
>>     t -> { [-infinity, k*5min : k*5min < t] }
>>
>> where k varies over all integers. You would then do a per-key sum over
>> these values. Unfortunately, no Beam runner supports infinite window sets
>> such as these...
>>
>> As mentioned, you could do this with a stateful DoFn and timers in the
>> global window. When a the partial sum at time T arrives, store it to your
>> state, and set a timer for T. When this timer fires, you know you'll have
>> received all partial sums up to T (plus possibly some after T), and you can
>> emit (and store) the cumulative sum(s) at up to and including T and purge
>> the buffer.
>>
>> On Thu, May 24, 2018 at 5:38 PM Raghu Angadi <ra...@google.com> wrote:
>>
>>> I don't think you need to keep last 'n' windows with Stateful DoFn in
>>> order to reorder. You mentioned your source is ordered, which implies it
>>> can effectively have a serial id (global or per key). With each 5 minute
>>> aggregation you also include the serial-id range. Inside the StatefulDoFn,
>>> you can easily decide if you can immediately emit (in the normal case of
>>> time order, since window size is large), or if you need to buffer.
>>>
>>> On Wed, May 23, 2018 at 11:40 AM Stephan Kotze <
>>> stephanus.kotze@gmail.com> wrote:
>>>
>>>> Indeed, and very true :)
>>>>
>>>> More a flight of fancy, than anything else.
>>>>
>>>> Trying hard to find the right tool for the job, but something like this
>>>> could potentially (risks aside for this conversation as it is potentially a
>>>> hairy implementation with more effort and risk than worth the reward)
>>>> reduce the need for us to either, rewrite the app in Flink, or do some
>>>> parts in Beam and others in Flink.
>>>>
>>>> Dunno ;)
>>>>
>>>> Stephan
>>>>
>>>> On Wed, May 23, 2018 at 7:22 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> You could but would need to change how the translation of an Apache
>>>>> Beam pipeline is done within Flink
>>>>> https://github.com/apache/beam/blob/8345991842dbc1f3cd8c2902ea8989bb0a8e81fb/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L47
>>>>> (and possibly it's superclass). You would need to take care of how
>>>>> stream.key().orderByTime() affects windowing and triggering behavior with
>>>>> regards to Apache Beam and may need to be quite well versed in the Apache
>>>>> Flink execution model and Apache Beam model.
>>>>>
>>>>> You might as well as use Flink instead of writing an Apache Beam
>>>>> pipeline.
>>>>>
>>>>> On Wed, May 23, 2018 at 10:20 AM Stephan Kotze <
>>>>> stephanus.kotze@gmail.com> wrote:
>>>>>
>>>>>> Thanks Lukasz.
>>>>>>
>>>>>> As a Hypothetical question (to anyone more familiar with the runner
>>>>>> than I):
>>>>>> If one is using the Flink runner, would it theoretically be possible
>>>>>> to modify the runner so that it constructs its pipelines as:
>>>>>> stream.keyBy(...).orderByTime()
>>>>>> <https://cwiki.apache.org/confluence/display/FLINK/stream.keyBy(...).orderByTime()>
>>>>>> .
>>>>>> Unless I'm missing something, this could be a cheeky way (bypassing
>>>>>> the Beam API) to enforce stricter ordering (with all the ensuing costs) on
>>>>>> a runner that would appear to support the desired semantics?
>>>>>>
>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>> On Wed, May 23, 2018 at 5:41 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> The issue with Apache Beam and many stream processing systems is
>>>>>>> that they don't support back edges since they produce loops which may or
>>>>>>> may not be able to be resolved.
>>>>>>>
>>>>>>> One way to work around this is to emit your data to a pubsub system
>>>>>>> like Kafka or GCP Pubsub and read that as a source within your pipeline in
>>>>>>> addition to your normal source.
>>>>>>> OriginalSource --> 5 min aggregations --> CoGBK --> current sum =
>>>>>>> current aggregation + past sum  --> OriginalSink
>>>>>>> BackEdgeSource -------------------------/
>>>>>>>                              \-- BackEdgeSink
>>>>>>>
>>>>>>> This allows for both the OriginalSource and BackEdgeSource to
>>>>>>> maintain their own watermarks and for the CoGBK (or Flatten + GBK if the
>>>>>>> input has the same key and value type) to produce output based upon the
>>>>>>> windowing strategy you define (no manual state/timers management !). Its
>>>>>>> important that the BackEdgeSource's elements timestamps that are emitted
>>>>>>> are for the window it will be used in, so for a 5 min aggregation window if
>>>>>>> the current sum is at time X, you should emit it at X+5 to become the past
>>>>>>> sum for the next window. Similarly it would be wise to use triggers which
>>>>>>> only fire once (so no speculative or late firings).
>>>>>>>
>>>>>>>
>>>>>>> If you don't want to use the solution above, then using Global
>>>>>>> windows and stateful DoFn makes the most sense based upon what you
>>>>>>> described using "track the past X windows and eventually decides:
>>>>>>> OK I'm as consistent as I am willing to be, Re-order the events, emit
>>>>>>> results, purge the "complete set" and ignore any further late (due to
>>>>>>> internal beam processing rather than late data). Note that stateful DoFn is
>>>>>>> partitioned by key and window so if you use 5 minute windows you'll only
>>>>>>> see the state that that was recorded for that 5 mins which is why its
>>>>>>> important to use the global window, also note that its easy to have "leaks"
>>>>>>> in stateful DoFns since the watermark will never surpass the point in time
>>>>>>> when the state data would be garbage collected.
>>>>>>>
>>>>>>>
>>>>>>> You might have better success with a simpler application though, it
>>>>>>> all depends on what the rest of the system looks like and what your
>>>>>>> integrating with.
>>>>>>>
>>>>>>> On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <
>>>>>>> stephanus.kotze@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>>
>>>>>>>> We are trying to implement a scenario that requires a rigid order
>>>>>>>> of events arriving into a Global Aggregation.
>>>>>>>>
>>>>>>>> To protect the innocent, I've simplified and modified the exact
>>>>>>>> scenario into something like the following:
>>>>>>>>
>>>>>>>>
>>>>>>>>    1. I want to calculate a Client's all time Bank balance for
>>>>>>>>    example.
>>>>>>>>    2. I would like to at fixed intervals, emit this all time
>>>>>>>>    Balance, and join it onto other Fixed window aggregates.
>>>>>>>>
>>>>>>>>
>>>>>>>> In principle something like this:
>>>>>>>> [image: image.png]
>>>>>>>>
>>>>>>>> Now the problem is, that it would seem that the order in which the
>>>>>>>> fixed windows arrive in the Global window is not Guaranteed (given our
>>>>>>>> reading/experimenting).
>>>>>>>>
>>>>>>>> This means that something like the following could potentially
>>>>>>>> occur:
>>>>>>>> [image: image.png]
>>>>>>>>
>>>>>>>> Essentially, even though the events/fixed windows are ordered, we
>>>>>>>> can at any given point in time omit an incorrect balance for the particular
>>>>>>>> person. (Even though our underlying events are ordered, and no late data
>>>>>>>> can arrive).
>>>>>>>>
>>>>>>>> Given our current understanding  of how the compute gets
>>>>>>>> parallelised (which is how Beam Achieves its high throughput) this appears
>>>>>>>> unavoidable.
>>>>>>>>
>>>>>>>> Unfortunately, we do require a globally correct balance every time
>>>>>>>> we omit the value.
>>>>>>>>
>>>>>>>> Are we missing something?
>>>>>>>> Does one need to sacrifice this consistency if using Beam?
>>>>>>>> Must one write some sort of a stateful DoFn that keeps track of the
>>>>>>>> past X windows and eventually decides: OK I'm as consistent as I am willing
>>>>>>>> to be, Re-order the events, emit results, purge the "complete set" and
>>>>>>>> ignore any further late (due to internal beam processing rather than late
>>>>>>>> data) and move along?
>>>>>>>> Or is there something else we can do? (,Tweak some options :) or
>>>>>>>> Move this calc into another streaming framework, with stricter guarantees
>>>>>>>> for example).
>>>>>>>>
>>>>>>>> It seams like such an obvious use case at first glance, but
>>>>>>>> learning more about Beam, seems to indicate that this is not actually a use
>>>>>>>> case that it is well suited for?
>>>>>>>>
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>

Re: Global Window - Correctness/Completeness

Posted by Reza Rokni <re...@google.com>.
Hi,

Some code that I am working on but not yet fully tested which might be
interesting...

In the Dataflow runner timers will fire in order... So the code that I am
playing with to solve a similar type of use case (if I have understood your
use case correctly...)

When you enter the Keyed State on the Global Window , set a timer that will
fire in the next Fixed window (lower boundary + x).
Attach all accum objects to a List<Accum> in the Keyed State.

OnTimer fire()
{
 Read all elements in list
Sort
Output results...
}

Hope to blog about this soon if the testing doesnt find any issues...


On 25 May 2018 at 09:00, Robert Bradshaw <ro...@google.com> wrote:

> This is an interesting question.
>
> From a model perspective, one way to represent this is that you're trying
> to group all your events into windows with the custom windowing function
>
>     t -> { [-infinity, k*5min : k*5min < t] }
>
> where k varies over all integers. You would then do a per-key sum over
> these values. Unfortunately, no Beam runner supports infinite window sets
> such as these...
>
> As mentioned, you could do this with a stateful DoFn and timers in the
> global window. When a the partial sum at time T arrives, store it to your
> state, and set a timer for T. When this timer fires, you know you'll have
> received all partial sums up to T (plus possibly some after T), and you can
> emit (and store) the cumulative sum(s) at up to and including T and purge
> the buffer.
>
> On Thu, May 24, 2018 at 5:38 PM Raghu Angadi <ra...@google.com> wrote:
>
>> I don't think you need to keep last 'n' windows with Stateful DoFn in
>> order to reorder. You mentioned your source is ordered, which implies it
>> can effectively have a serial id (global or per key). With each 5 minute
>> aggregation you also include the serial-id range. Inside the StatefulDoFn,
>> you can easily decide if you can immediately emit (in the normal case of
>> time order, since window size is large), or if you need to buffer.
>>
>> On Wed, May 23, 2018 at 11:40 AM Stephan Kotze <st...@gmail.com>
>> wrote:
>>
>>> Indeed, and very true :)
>>>
>>> More a flight of fancy, than anything else.
>>>
>>> Trying hard to find the right tool for the job, but something like this
>>> could potentially (risks aside for this conversation as it is potentially a
>>> hairy implementation with more effort and risk than worth the reward)
>>> reduce the need for us to either, rewrite the app in Flink, or do some
>>> parts in Beam and others in Flink.
>>>
>>> Dunno ;)
>>>
>>> Stephan
>>>
>>> On Wed, May 23, 2018 at 7:22 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> You could but would need to change how the translation of an Apache
>>>> Beam pipeline is done within Flink https://github.com/apache/beam/blob/
>>>> 8345991842dbc1f3cd8c2902ea8989bb0a8e81fb/runners/flink/src/
>>>> main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslat
>>>> or.java#L47
>>>> (and possibly it's superclass). You would need to take care of how
>>>> stream.key().orderByTime() affects windowing and triggering behavior with
>>>> regards to Apache Beam and may need to be quite well versed in the Apache
>>>> Flink execution model and Apache Beam model.
>>>>
>>>> You might as well as use Flink instead of writing an Apache Beam
>>>> pipeline.
>>>>
>>>> On Wed, May 23, 2018 at 10:20 AM Stephan Kotze <
>>>> stephanus.kotze@gmail.com> wrote:
>>>>
>>>>> Thanks Lukasz.
>>>>>
>>>>> As a Hypothetical question (to anyone more familiar with the runner
>>>>> than I):
>>>>> If one is using the Flink runner, would it theoretically be possible
>>>>> to modify the runner so that it constructs its pipelines as:
>>>>> stream.keyBy(...).orderByTime()
>>>>> <https://cwiki.apache.org/confluence/display/FLINK/stream.keyBy(...).orderByTime()>
>>>>> .
>>>>> Unless I'm missing something, this could be a cheeky way (bypassing
>>>>> the Beam API) to enforce stricter ordering (with all the ensuing costs) on
>>>>> a runner that would appear to support the desired semantics?
>>>>>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/Time+
>>>>> and+Order+in+Streams
>>>>>
>>>>> Stephan
>>>>>
>>>>> On Wed, May 23, 2018 at 5:41 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> The issue with Apache Beam and many stream processing systems is that
>>>>>> they don't support back edges since they produce loops which may or may not
>>>>>> be able to be resolved.
>>>>>>
>>>>>> One way to work around this is to emit your data to a pubsub system
>>>>>> like Kafka or GCP Pubsub and read that as a source within your pipeline in
>>>>>> addition to your normal source.
>>>>>> OriginalSource --> 5 min aggregations --> CoGBK --> current sum =
>>>>>> current aggregation + past sum  --> OriginalSink
>>>>>> BackEdgeSource -------------------------/
>>>>>>                              \-- BackEdgeSink
>>>>>>
>>>>>> This allows for both the OriginalSource and BackEdgeSource to
>>>>>> maintain their own watermarks and for the CoGBK (or Flatten + GBK if the
>>>>>> input has the same key and value type) to produce output based upon the
>>>>>> windowing strategy you define (no manual state/timers management !). Its
>>>>>> important that the BackEdgeSource's elements timestamps that are emitted
>>>>>> are for the window it will be used in, so for a 5 min aggregation window if
>>>>>> the current sum is at time X, you should emit it at X+5 to become the past
>>>>>> sum for the next window. Similarly it would be wise to use triggers which
>>>>>> only fire once (so no speculative or late firings).
>>>>>>
>>>>>>
>>>>>> If you don't want to use the solution above, then using Global
>>>>>> windows and stateful DoFn makes the most sense based upon what you
>>>>>> described using "track the past X windows and eventually decides: OK
>>>>>> I'm as consistent as I am willing to be, Re-order the events, emit results,
>>>>>> purge the "complete set" and ignore any further late (due to internal beam
>>>>>> processing rather than late data). Note that stateful DoFn is partitioned
>>>>>> by key and window so if you use 5 minute windows you'll only see the state
>>>>>> that that was recorded for that 5 mins which is why its important to use
>>>>>> the global window, also note that its easy to have "leaks" in stateful
>>>>>> DoFns since the watermark will never surpass the point in time when the
>>>>>> state data would be garbage collected.
>>>>>>
>>>>>>
>>>>>> You might have better success with a simpler application though, it
>>>>>> all depends on what the rest of the system looks like and what your
>>>>>> integrating with.
>>>>>>
>>>>>> On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <
>>>>>> stephanus.kotze@gmail.com> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> We are trying to implement a scenario that requires a rigid order of
>>>>>>> events arriving into a Global Aggregation.
>>>>>>>
>>>>>>> To protect the innocent, I've simplified and modified the exact
>>>>>>> scenario into something like the following:
>>>>>>>
>>>>>>>
>>>>>>>    1. I want to calculate a Client's all time Bank balance for
>>>>>>>    example.
>>>>>>>    2. I would like to at fixed intervals, emit this all time
>>>>>>>    Balance, and join it onto other Fixed window aggregates.
>>>>>>>
>>>>>>>
>>>>>>> In principle something like this:
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> Now the problem is, that it would seem that the order in which the
>>>>>>> fixed windows arrive in the Global window is not Guaranteed (given our
>>>>>>> reading/experimenting).
>>>>>>>
>>>>>>> This means that something like the following could potentially occur:
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> Essentially, even though the events/fixed windows are ordered, we
>>>>>>> can at any given point in time omit an incorrect balance for the particular
>>>>>>> person. (Even though our underlying events are ordered, and no late data
>>>>>>> can arrive).
>>>>>>>
>>>>>>> Given our current understanding  of how the compute gets
>>>>>>> parallelised (which is how Beam Achieves its high throughput) this appears
>>>>>>> unavoidable.
>>>>>>>
>>>>>>> Unfortunately, we do require a globally correct balance every time
>>>>>>> we omit the value.
>>>>>>>
>>>>>>> Are we missing something?
>>>>>>> Does one need to sacrifice this consistency if using Beam?
>>>>>>> Must one write some sort of a stateful DoFn that keeps track of the
>>>>>>> past X windows and eventually decides: OK I'm as consistent as I am willing
>>>>>>> to be, Re-order the events, emit results, purge the "complete set" and
>>>>>>> ignore any further late (due to internal beam processing rather than late
>>>>>>> data) and move along?
>>>>>>> Or is there something else we can do? (,Tweak some options :) or
>>>>>>> Move this calc into another streaming framework, with stricter guarantees
>>>>>>> for example).
>>>>>>>
>>>>>>> It seams like such an obvious use case at first glance, but learning
>>>>>>> more about Beam, seems to indicate that this is not actually a use case
>>>>>>> that it is well suited for?
>>>>>>>
>>>>>>> Stephan
>>>>>>>
>>>>>>>


-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Re: Global Window - Correctness/Completeness

Posted by Raghu Angadi <ra...@google.com>.
On Thu, May 24, 2018 at 6:05 PM Robert Bradshaw <ro...@google.com> wrote:

As mentioned, you could do this with a stateful DoFn and timers in the
> global window. When a the partial sum at time T arrives, store it to your
> state, and set a timer for T. When this timer fires, you know you'll have
> received all partial sums up to T (plus possibly some after T), and you can
> emit (and store) the cumulative sum(s) at up to and including T and purge
> the buffer.
>

+1. This is simpler than what I mentioned above (no dependency on serial
ids).


> On Thu, May 24, 2018 at 5:38 PM Raghu Angadi <ra...@google.com> wrote:
>
>> I don't think you need to keep last 'n' windows with Stateful DoFn in
>> order to reorder. You mentioned your source is ordered, which implies it
>> can effectively have a serial id (global or per key). With each 5 minute
>> aggregation you also include the serial-id range. Inside the StatefulDoFn,
>> you can easily decide if you can immediately emit (in the normal case of
>> time order, since window size is large), or if you need to buffer.
>>
>> On Wed, May 23, 2018 at 11:40 AM Stephan Kotze <st...@gmail.com>
>> wrote:
>>
>>> Indeed, and very true :)
>>>
>>> More a flight of fancy, than anything else.
>>>
>>> Trying hard to find the right tool for the job, but something like this
>>> could potentially (risks aside for this conversation as it is potentially a
>>> hairy implementation with more effort and risk than worth the reward)
>>> reduce the need for us to either, rewrite the app in Flink, or do some
>>> parts in Beam and others in Flink.
>>>
>>> Dunno ;)
>>>
>>> Stephan
>>>
>>> On Wed, May 23, 2018 at 7:22 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> You could but would need to change how the translation of an Apache
>>>> Beam pipeline is done within Flink
>>>> https://github.com/apache/beam/blob/8345991842dbc1f3cd8c2902ea8989bb0a8e81fb/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L47
>>>> (and possibly it's superclass). You would need to take care of how
>>>> stream.key().orderByTime() affects windowing and triggering behavior with
>>>> regards to Apache Beam and may need to be quite well versed in the Apache
>>>> Flink execution model and Apache Beam model.
>>>>
>>>> You might as well as use Flink instead of writing an Apache Beam
>>>> pipeline.
>>>>
>>>> On Wed, May 23, 2018 at 10:20 AM Stephan Kotze <
>>>> stephanus.kotze@gmail.com> wrote:
>>>>
>>>>> Thanks Lukasz.
>>>>>
>>>>> As a Hypothetical question (to anyone more familiar with the runner
>>>>> than I):
>>>>> If one is using the Flink runner, would it theoretically be possible
>>>>> to modify the runner so that it constructs its pipelines as:
>>>>> stream.keyBy(...).orderByTime()
>>>>> <https://cwiki.apache.org/confluence/display/FLINK/stream.keyBy(...).orderByTime()>
>>>>> .
>>>>> Unless I'm missing something, this could be a cheeky way (bypassing
>>>>> the Beam API) to enforce stricter ordering (with all the ensuing costs) on
>>>>> a runner that would appear to support the desired semantics?
>>>>>
>>>>>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
>>>>>
>>>>> Stephan
>>>>>
>>>>> On Wed, May 23, 2018 at 5:41 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> The issue with Apache Beam and many stream processing systems is that
>>>>>> they don't support back edges since they produce loops which may or may not
>>>>>> be able to be resolved.
>>>>>>
>>>>>> One way to work around this is to emit your data to a pubsub system
>>>>>> like Kafka or GCP Pubsub and read that as a source within your pipeline in
>>>>>> addition to your normal source.
>>>>>> OriginalSource --> 5 min aggregations --> CoGBK --> current sum =
>>>>>> current aggregation + past sum  --> OriginalSink
>>>>>> BackEdgeSource -------------------------/
>>>>>>                              \-- BackEdgeSink
>>>>>>
>>>>>> This allows for both the OriginalSource and BackEdgeSource to
>>>>>> maintain their own watermarks and for the CoGBK (or Flatten + GBK if the
>>>>>> input has the same key and value type) to produce output based upon the
>>>>>> windowing strategy you define (no manual state/timers management !). Its
>>>>>> important that the BackEdgeSource's elements timestamps that are emitted
>>>>>> are for the window it will be used in, so for a 5 min aggregation window if
>>>>>> the current sum is at time X, you should emit it at X+5 to become the past
>>>>>> sum for the next window. Similarly it would be wise to use triggers which
>>>>>> only fire once (so no speculative or late firings).
>>>>>>
>>>>>>
>>>>>> If you don't want to use the solution above, then using Global
>>>>>> windows and stateful DoFn makes the most sense based upon what you
>>>>>> described using "track the past X windows and eventually decides: OK
>>>>>> I'm as consistent as I am willing to be, Re-order the events, emit results,
>>>>>> purge the "complete set" and ignore any further late (due to internal beam
>>>>>> processing rather than late data). Note that stateful DoFn is partitioned
>>>>>> by key and window so if you use 5 minute windows you'll only see the state
>>>>>> that that was recorded for that 5 mins which is why its important to use
>>>>>> the global window, also note that its easy to have "leaks" in stateful
>>>>>> DoFns since the watermark will never surpass the point in time when the
>>>>>> state data would be garbage collected.
>>>>>>
>>>>>>
>>>>>> You might have better success with a simpler application though, it
>>>>>> all depends on what the rest of the system looks like and what your
>>>>>> integrating with.
>>>>>>
>>>>>> On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <
>>>>>> stephanus.kotze@gmail.com> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> We are trying to implement a scenario that requires a rigid order of
>>>>>>> events arriving into a Global Aggregation.
>>>>>>>
>>>>>>> To protect the innocent, I've simplified and modified the exact
>>>>>>> scenario into something like the following:
>>>>>>>
>>>>>>>
>>>>>>>    1. I want to calculate a Client's all time Bank balance for
>>>>>>>    example.
>>>>>>>    2. I would like to at fixed intervals, emit this all time
>>>>>>>    Balance, and join it onto other Fixed window aggregates.
>>>>>>>
>>>>>>>
>>>>>>> In principle something like this:
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> Now the problem is, that it would seem that the order in which the
>>>>>>> fixed windows arrive in the Global window is not Guaranteed (given our
>>>>>>> reading/experimenting).
>>>>>>>
>>>>>>> This means that something like the following could potentially occur:
>>>>>>> [image: image.png]
>>>>>>>
>>>>>>> Essentially, even though the events/fixed windows are ordered, we
>>>>>>> can at any given point in time omit an incorrect balance for the particular
>>>>>>> person. (Even though our underlying events are ordered, and no late data
>>>>>>> can arrive).
>>>>>>>
>>>>>>> Given our current understanding  of how the compute gets
>>>>>>> parallelised (which is how Beam Achieves its high throughput) this appears
>>>>>>> unavoidable.
>>>>>>>
>>>>>>> Unfortunately, we do require a globally correct balance every time
>>>>>>> we omit the value.
>>>>>>>
>>>>>>> Are we missing something?
>>>>>>> Does one need to sacrifice this consistency if using Beam?
>>>>>>> Must one write some sort of a stateful DoFn that keeps track of the
>>>>>>> past X windows and eventually decides: OK I'm as consistent as I am willing
>>>>>>> to be, Re-order the events, emit results, purge the "complete set" and
>>>>>>> ignore any further late (due to internal beam processing rather than late
>>>>>>> data) and move along?
>>>>>>> Or is there something else we can do? (,Tweak some options :) or
>>>>>>> Move this calc into another streaming framework, with stricter guarantees
>>>>>>> for example).
>>>>>>>
>>>>>>> It seams like such an obvious use case at first glance, but learning
>>>>>>> more about Beam, seems to indicate that this is not actually a use case
>>>>>>> that it is well suited for?
>>>>>>>
>>>>>>> Stephan
>>>>>>>
>>>>>>>

Re: Global Window - Correctness/Completeness

Posted by Robert Bradshaw <ro...@google.com>.
This is an interesting question.

From a model perspective, one way to represent this is that you're trying
to group all your events into windows with the custom windowing function

    t -> { [-infinity, k*5min : k*5min < t] }

where k varies over all integers. You would then do a per-key sum over
these values. Unfortunately, no Beam runner supports infinite window sets
such as these...

As mentioned, you could do this with a stateful DoFn and timers in the
global window. When a the partial sum at time T arrives, store it to your
state, and set a timer for T. When this timer fires, you know you'll have
received all partial sums up to T (plus possibly some after T), and you can
emit (and store) the cumulative sum(s) at up to and including T and purge
the buffer.

On Thu, May 24, 2018 at 5:38 PM Raghu Angadi <ra...@google.com> wrote:

> I don't think you need to keep last 'n' windows with Stateful DoFn in
> order to reorder. You mentioned your source is ordered, which implies it
> can effectively have a serial id (global or per key). With each 5 minute
> aggregation you also include the serial-id range. Inside the StatefulDoFn,
> you can easily decide if you can immediately emit (in the normal case of
> time order, since window size is large), or if you need to buffer.
>
> On Wed, May 23, 2018 at 11:40 AM Stephan Kotze <st...@gmail.com>
> wrote:
>
>> Indeed, and very true :)
>>
>> More a flight of fancy, than anything else.
>>
>> Trying hard to find the right tool for the job, but something like this
>> could potentially (risks aside for this conversation as it is potentially a
>> hairy implementation with more effort and risk than worth the reward)
>> reduce the need for us to either, rewrite the app in Flink, or do some
>> parts in Beam and others in Flink.
>>
>> Dunno ;)
>>
>> Stephan
>>
>> On Wed, May 23, 2018 at 7:22 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> You could but would need to change how the translation of an Apache Beam
>>> pipeline is done within Flink
>>> https://github.com/apache/beam/blob/8345991842dbc1f3cd8c2902ea8989bb0a8e81fb/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L47
>>> (and possibly it's superclass). You would need to take care of how
>>> stream.key().orderByTime() affects windowing and triggering behavior with
>>> regards to Apache Beam and may need to be quite well versed in the Apache
>>> Flink execution model and Apache Beam model.
>>>
>>> You might as well as use Flink instead of writing an Apache Beam
>>> pipeline.
>>>
>>> On Wed, May 23, 2018 at 10:20 AM Stephan Kotze <
>>> stephanus.kotze@gmail.com> wrote:
>>>
>>>> Thanks Lukasz.
>>>>
>>>> As a Hypothetical question (to anyone more familiar with the runner
>>>> than I):
>>>> If one is using the Flink runner, would it theoretically be possible to
>>>> modify the runner so that it constructs its pipelines as:
>>>> stream.keyBy(...).orderByTime()
>>>> <https://cwiki.apache.org/confluence/display/FLINK/stream.keyBy(...).orderByTime()>
>>>> .
>>>> Unless I'm missing something, this could be a cheeky way (bypassing the
>>>> Beam API) to enforce stricter ordering (with all the ensuing costs) on a
>>>> runner that would appear to support the desired semantics?
>>>>
>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
>>>>
>>>> Stephan
>>>>
>>>> On Wed, May 23, 2018 at 5:41 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> The issue with Apache Beam and many stream processing systems is that
>>>>> they don't support back edges since they produce loops which may or may not
>>>>> be able to be resolved.
>>>>>
>>>>> One way to work around this is to emit your data to a pubsub system
>>>>> like Kafka or GCP Pubsub and read that as a source within your pipeline in
>>>>> addition to your normal source.
>>>>> OriginalSource --> 5 min aggregations --> CoGBK --> current sum =
>>>>> current aggregation + past sum  --> OriginalSink
>>>>> BackEdgeSource -------------------------/
>>>>>                            \-- BackEdgeSink
>>>>>
>>>>> This allows for both the OriginalSource and BackEdgeSource to maintain
>>>>> their own watermarks and for the CoGBK (or Flatten + GBK if the input has
>>>>> the same key and value type) to produce output based upon the windowing
>>>>> strategy you define (no manual state/timers management !). Its important
>>>>> that the BackEdgeSource's elements timestamps that are emitted are for the
>>>>> window it will be used in, so for a 5 min aggregation window if the current
>>>>> sum is at time X, you should emit it at X+5 to become the past sum for the
>>>>> next window. Similarly it would be wise to use triggers which only fire
>>>>> once (so no speculative or late firings).
>>>>>
>>>>>
>>>>> If you don't want to use the solution above, then using Global windows
>>>>> and stateful DoFn makes the most sense based upon what you described using "track
>>>>> the past X windows and eventually decides: OK I'm as consistent as I am
>>>>> willing to be, Re-order the events, emit results, purge the "complete set"
>>>>> and ignore any further late (due to internal beam processing rather than
>>>>> late data). Note that stateful DoFn is partitioned by key and window so if
>>>>> you use 5 minute windows you'll only see the state that that was recorded
>>>>> for that 5 mins which is why its important to use the global window, also
>>>>> note that its easy to have "leaks" in stateful DoFns since the watermark
>>>>> will never surpass the point in time when the state data would be garbage
>>>>> collected.
>>>>>
>>>>>
>>>>> You might have better success with a simpler application though, it
>>>>> all depends on what the rest of the system looks like and what your
>>>>> integrating with.
>>>>>
>>>>> On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <
>>>>> stephanus.kotze@gmail.com> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> We are trying to implement a scenario that requires a rigid order of
>>>>>> events arriving into a Global Aggregation.
>>>>>>
>>>>>> To protect the innocent, I've simplified and modified the exact
>>>>>> scenario into something like the following:
>>>>>>
>>>>>>
>>>>>>    1. I want to calculate a Client's all time Bank balance for
>>>>>>    example.
>>>>>>    2. I would like to at fixed intervals, emit this all time
>>>>>>    Balance, and join it onto other Fixed window aggregates.
>>>>>>
>>>>>>
>>>>>> In principle something like this:
>>>>>> [image: image.png]
>>>>>>
>>>>>> Now the problem is, that it would seem that the order in which the
>>>>>> fixed windows arrive in the Global window is not Guaranteed (given our
>>>>>> reading/experimenting).
>>>>>>
>>>>>> This means that something like the following could potentially occur:
>>>>>> [image: image.png]
>>>>>>
>>>>>> Essentially, even though the events/fixed windows are ordered, we can
>>>>>> at any given point in time omit an incorrect balance for the particular
>>>>>> person. (Even though our underlying events are ordered, and no late data
>>>>>> can arrive).
>>>>>>
>>>>>> Given our current understanding  of how the compute gets parallelised
>>>>>> (which is how Beam Achieves its high throughput) this appears unavoidable.
>>>>>>
>>>>>> Unfortunately, we do require a globally correct balance every time we
>>>>>> omit the value.
>>>>>>
>>>>>> Are we missing something?
>>>>>> Does one need to sacrifice this consistency if using Beam?
>>>>>> Must one write some sort of a stateful DoFn that keeps track of the
>>>>>> past X windows and eventually decides: OK I'm as consistent as I am willing
>>>>>> to be, Re-order the events, emit results, purge the "complete set" and
>>>>>> ignore any further late (due to internal beam processing rather than late
>>>>>> data) and move along?
>>>>>> Or is there something else we can do? (,Tweak some options :) or Move
>>>>>> this calc into another streaming framework, with stricter guarantees for
>>>>>> example).
>>>>>>
>>>>>> It seams like such an obvious use case at first glance, but learning
>>>>>> more about Beam, seems to indicate that this is not actually a use case
>>>>>> that it is well suited for?
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>

Re: Global Window - Correctness/Completeness

Posted by Raghu Angadi <ra...@google.com>.
I don't think you need to keep last 'n' windows with Stateful DoFn in order
to reorder. You mentioned your source is ordered, which implies it can
effectively have a serial id (global or per key). With each 5 minute
aggregation you also include the serial-id range. Inside the StatefulDoFn,
you can easily decide if you can immediately emit (in the normal case of
time order, since window size is large), or if you need to buffer.

On Wed, May 23, 2018 at 11:40 AM Stephan Kotze <st...@gmail.com>
wrote:

> Indeed, and very true :)
>
> More a flight of fancy, than anything else.
>
> Trying hard to find the right tool for the job, but something like this
> could potentially (risks aside for this conversation as it is potentially a
> hairy implementation with more effort and risk than worth the reward)
> reduce the need for us to either, rewrite the app in Flink, or do some
> parts in Beam and others in Flink.
>
> Dunno ;)
>
> Stephan
>
> On Wed, May 23, 2018 at 7:22 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> You could but would need to change how the translation of an Apache Beam
>> pipeline is done within Flink
>> https://github.com/apache/beam/blob/8345991842dbc1f3cd8c2902ea8989bb0a8e81fb/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L47
>> (and possibly it's superclass). You would need to take care of how
>> stream.key().orderByTime() affects windowing and triggering behavior with
>> regards to Apache Beam and may need to be quite well versed in the Apache
>> Flink execution model and Apache Beam model.
>>
>> You might as well as use Flink instead of writing an Apache Beam pipeline.
>>
>> On Wed, May 23, 2018 at 10:20 AM Stephan Kotze <st...@gmail.com>
>> wrote:
>>
>>> Thanks Lukasz.
>>>
>>> As a Hypothetical question (to anyone more familiar with the runner than
>>> I):
>>> If one is using the Flink runner, would it theoretically be possible to
>>> modify the runner so that it constructs its pipelines as:
>>> stream.keyBy(...).orderByTime()
>>> <https://cwiki.apache.org/confluence/display/FLINK/stream.keyBy(...).orderByTime()>
>>> .
>>> Unless I'm missing something, this could be a cheeky way (bypassing the
>>> Beam API) to enforce stricter ordering (with all the ensuing costs) on a
>>> runner that would appear to support the desired semantics?
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
>>>
>>> Stephan
>>>
>>> On Wed, May 23, 2018 at 5:41 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> The issue with Apache Beam and many stream processing systems is that
>>>> they don't support back edges since they produce loops which may or may not
>>>> be able to be resolved.
>>>>
>>>> One way to work around this is to emit your data to a pubsub system
>>>> like Kafka or GCP Pubsub and read that as a source within your pipeline in
>>>> addition to your normal source.
>>>> OriginalSource --> 5 min aggregations --> CoGBK --> current sum =
>>>> current aggregation + past sum  --> OriginalSink
>>>> BackEdgeSource -------------------------/
>>>>                            \-- BackEdgeSink
>>>>
>>>> This allows for both the OriginalSource and BackEdgeSource to maintain
>>>> their own watermarks and for the CoGBK (or Flatten + GBK if the input has
>>>> the same key and value type) to produce output based upon the windowing
>>>> strategy you define (no manual state/timers management !). Its important
>>>> that the BackEdgeSource's elements timestamps that are emitted are for the
>>>> window it will be used in, so for a 5 min aggregation window if the current
>>>> sum is at time X, you should emit it at X+5 to become the past sum for the
>>>> next window. Similarly it would be wise to use triggers which only fire
>>>> once (so no speculative or late firings).
>>>>
>>>>
>>>> If you don't want to use the solution above, then using Global windows
>>>> and stateful DoFn makes the most sense based upon what you described using "track
>>>> the past X windows and eventually decides: OK I'm as consistent as I am
>>>> willing to be, Re-order the events, emit results, purge the "complete set"
>>>> and ignore any further late (due to internal beam processing rather than
>>>> late data). Note that stateful DoFn is partitioned by key and window so if
>>>> you use 5 minute windows you'll only see the state that that was recorded
>>>> for that 5 mins which is why its important to use the global window, also
>>>> note that its easy to have "leaks" in stateful DoFns since the watermark
>>>> will never surpass the point in time when the state data would be garbage
>>>> collected.
>>>>
>>>>
>>>> You might have better success with a simpler application though, it all
>>>> depends on what the rest of the system looks like and what your integrating
>>>> with.
>>>>
>>>> On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <
>>>> stephanus.kotze@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> We are trying to implement a scenario that requires a rigid order of
>>>>> events arriving into a Global Aggregation.
>>>>>
>>>>> To protect the innocent, I've simplified and modified the exact
>>>>> scenario into something like the following:
>>>>>
>>>>>
>>>>>    1. I want to calculate a Client's all time Bank balance for
>>>>>    example.
>>>>>    2. I would like to at fixed intervals, emit this all time Balance,
>>>>>    and join it onto other Fixed window aggregates.
>>>>>
>>>>>
>>>>> In principle something like this:
>>>>> [image: image.png]
>>>>>
>>>>> Now the problem is, that it would seem that the order in which the
>>>>> fixed windows arrive in the Global window is not Guaranteed (given our
>>>>> reading/experimenting).
>>>>>
>>>>> This means that something like the following could potentially occur:
>>>>> [image: image.png]
>>>>>
>>>>> Essentially, even though the events/fixed windows are ordered, we can
>>>>> at any given point in time omit an incorrect balance for the particular
>>>>> person. (Even though our underlying events are ordered, and no late data
>>>>> can arrive).
>>>>>
>>>>> Given our current understanding  of how the compute gets parallelised
>>>>> (which is how Beam Achieves its high throughput) this appears unavoidable.
>>>>>
>>>>> Unfortunately, we do require a globally correct balance every time we
>>>>> omit the value.
>>>>>
>>>>> Are we missing something?
>>>>> Does one need to sacrifice this consistency if using Beam?
>>>>> Must one write some sort of a stateful DoFn that keeps track of the
>>>>> past X windows and eventually decides: OK I'm as consistent as I am willing
>>>>> to be, Re-order the events, emit results, purge the "complete set" and
>>>>> ignore any further late (due to internal beam processing rather than late
>>>>> data) and move along?
>>>>> Or is there something else we can do? (,Tweak some options :) or Move
>>>>> this calc into another streaming framework, with stricter guarantees for
>>>>> example).
>>>>>
>>>>> It seams like such an obvious use case at first glance, but learning
>>>>> more about Beam, seems to indicate that this is not actually a use case
>>>>> that it is well suited for?
>>>>>
>>>>> Stephan
>>>>>
>>>>>

Re: Global Window - Correctness/Completeness

Posted by Stephan Kotze <st...@gmail.com>.
Indeed, and very true :)

More a flight of fancy, than anything else.

Trying hard to find the right tool for the job, but something like this
could potentially (risks aside for this conversation as it is potentially a
hairy implementation with more effort and risk than worth the reward)
reduce the need for us to either, rewrite the app in Flink, or do some
parts in Beam and others in Flink.

Dunno ;)

Stephan

On Wed, May 23, 2018 at 7:22 PM Lukasz Cwik <lc...@google.com> wrote:

> You could but would need to change how the translation of an Apache Beam
> pipeline is done within Flink
> https://github.com/apache/beam/blob/8345991842dbc1f3cd8c2902ea8989bb0a8e81fb/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L47
> (and possibly it's superclass). You would need to take care of how
> stream.key().orderByTime() affects windowing and triggering behavior with
> regards to Apache Beam and may need to be quite well versed in the Apache
> Flink execution model and Apache Beam model.
>
> You might as well as use Flink instead of writing an Apache Beam pipeline.
>
> On Wed, May 23, 2018 at 10:20 AM Stephan Kotze <st...@gmail.com>
> wrote:
>
>> Thanks Lukasz.
>>
>> As a Hypothetical question (to anyone more familiar with the runner than
>> I):
>> If one is using the Flink runner, would it theoretically be possible to
>> modify the runner so that it constructs its pipelines as:
>> stream.keyBy(...).orderByTime()
>> <https://cwiki.apache.org/confluence/display/FLINK/stream.keyBy(...).orderByTime()>
>> .
>> Unless I'm missing something, this could be a cheeky way (bypassing the
>> Beam API) to enforce stricter ordering (with all the ensuing costs) on a
>> runner that would appear to support the desired semantics?
>>
>>
>> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
>>
>> Stephan
>>
>> On Wed, May 23, 2018 at 5:41 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> The issue with Apache Beam and many stream processing systems is that
>>> they don't support back edges since they produce loops which may or may not
>>> be able to be resolved.
>>>
>>> One way to work around this is to emit your data to a pubsub system like
>>> Kafka or GCP Pubsub and read that as a source within your pipeline in
>>> addition to your normal source.
>>> OriginalSource --> 5 min aggregations --> CoGBK --> current sum =
>>> current aggregation + past sum  --> OriginalSink
>>> BackEdgeSource -------------------------/
>>>                          \-- BackEdgeSink
>>>
>>> This allows for both the OriginalSource and BackEdgeSource to maintain
>>> their own watermarks and for the CoGBK (or Flatten + GBK if the input has
>>> the same key and value type) to produce output based upon the windowing
>>> strategy you define (no manual state/timers management !). Its important
>>> that the BackEdgeSource's elements timestamps that are emitted are for the
>>> window it will be used in, so for a 5 min aggregation window if the current
>>> sum is at time X, you should emit it at X+5 to become the past sum for the
>>> next window. Similarly it would be wise to use triggers which only fire
>>> once (so no speculative or late firings).
>>>
>>>
>>> If you don't want to use the solution above, then using Global windows
>>> and stateful DoFn makes the most sense based upon what you described using "track
>>> the past X windows and eventually decides: OK I'm as consistent as I am
>>> willing to be, Re-order the events, emit results, purge the "complete set"
>>> and ignore any further late (due to internal beam processing rather than
>>> late data). Note that stateful DoFn is partitioned by key and window so if
>>> you use 5 minute windows you'll only see the state that that was recorded
>>> for that 5 mins which is why its important to use the global window, also
>>> note that its easy to have "leaks" in stateful DoFns since the watermark
>>> will never surpass the point in time when the state data would be garbage
>>> collected.
>>>
>>>
>>> You might have better success with a simpler application though, it all
>>> depends on what the rest of the system looks like and what your integrating
>>> with.
>>>
>>> On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <st...@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> We are trying to implement a scenario that requires a rigid order of
>>>> events arriving into a Global Aggregation.
>>>>
>>>> To protect the innocent, I've simplified and modified the exact
>>>> scenario into something like the following:
>>>>
>>>>
>>>>    1. I want to calculate a Client's all time Bank balance for example.
>>>>    2. I would like to at fixed intervals, emit this all time Balance,
>>>>    and join it onto other Fixed window aggregates.
>>>>
>>>>
>>>> In principle something like this:
>>>> [image: image.png]
>>>>
>>>> Now the problem is, that it would seem that the order in which the
>>>> fixed windows arrive in the Global window is not Guaranteed (given our
>>>> reading/experimenting).
>>>>
>>>> This means that something like the following could potentially occur:
>>>> [image: image.png]
>>>>
>>>> Essentially, even though the events/fixed windows are ordered, we can
>>>> at any given point in time omit an incorrect balance for the particular
>>>> person. (Even though our underlying events are ordered, and no late data
>>>> can arrive).
>>>>
>>>> Given our current understanding  of how the compute gets parallelised
>>>> (which is how Beam Achieves its high throughput) this appears unavoidable.
>>>>
>>>> Unfortunately, we do require a globally correct balance every time we
>>>> omit the value.
>>>>
>>>> Are we missing something?
>>>> Does one need to sacrifice this consistency if using Beam?
>>>> Must one write some sort of a stateful DoFn that keeps track of the
>>>> past X windows and eventually decides: OK I'm as consistent as I am willing
>>>> to be, Re-order the events, emit results, purge the "complete set" and
>>>> ignore any further late (due to internal beam processing rather than late
>>>> data) and move along?
>>>> Or is there something else we can do? (,Tweak some options :) or Move
>>>> this calc into another streaming framework, with stricter guarantees for
>>>> example).
>>>>
>>>> It seams like such an obvious use case at first glance, but learning
>>>> more about Beam, seems to indicate that this is not actually a use case
>>>> that it is well suited for?
>>>>
>>>> Stephan
>>>>
>>>>

Re: Global Window - Correctness/Completeness

Posted by Lukasz Cwik <lc...@google.com>.
You could but would need to change how the translation of an Apache Beam
pipeline is done within Flink
https://github.com/apache/beam/blob/8345991842dbc1f3cd8c2902ea8989bb0a8e81fb/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L47
(and possibly it's superclass). You would need to take care of how
stream.key().orderByTime() affects windowing and triggering behavior with
regards to Apache Beam and may need to be quite well versed in the Apache
Flink execution model and Apache Beam model.

You might as well as use Flink instead of writing an Apache Beam pipeline.

On Wed, May 23, 2018 at 10:20 AM Stephan Kotze <st...@gmail.com>
wrote:

> Thanks Lukasz.
>
> As a Hypothetical question (to anyone more familiar with the runner than
> I):
> If one is using the Flink runner, would it theoretically be possible to
> modify the runner so that it constructs its pipelines as:
> stream.keyBy(...).orderByTime()
> <https://cwiki.apache.org/confluence/display/FLINK/stream.keyBy(...).orderByTime()>
> .
> Unless I'm missing something, this could be a cheeky way (bypassing the
> Beam API) to enforce stricter ordering (with all the ensuing costs) on a
> runner that would appear to support the desired semantics?
>
> https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
>
> Stephan
>
> On Wed, May 23, 2018 at 5:41 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> The issue with Apache Beam and many stream processing systems is that
>> they don't support back edges since they produce loops which may or may not
>> be able to be resolved.
>>
>> One way to work around this is to emit your data to a pubsub system like
>> Kafka or GCP Pubsub and read that as a source within your pipeline in
>> addition to your normal source.
>> OriginalSource --> 5 min aggregations --> CoGBK --> current sum = current
>> aggregation + past sum  --> OriginalSink
>> BackEdgeSource -------------------------/
>>                          \-- BackEdgeSink
>>
>> This allows for both the OriginalSource and BackEdgeSource to maintain
>> their own watermarks and for the CoGBK (or Flatten + GBK if the input has
>> the same key and value type) to produce output based upon the windowing
>> strategy you define (no manual state/timers management !). Its important
>> that the BackEdgeSource's elements timestamps that are emitted are for the
>> window it will be used in, so for a 5 min aggregation window if the current
>> sum is at time X, you should emit it at X+5 to become the past sum for the
>> next window. Similarly it would be wise to use triggers which only fire
>> once (so no speculative or late firings).
>>
>>
>> If you don't want to use the solution above, then using Global windows
>> and stateful DoFn makes the most sense based upon what you described using "track
>> the past X windows and eventually decides: OK I'm as consistent as I am
>> willing to be, Re-order the events, emit results, purge the "complete set"
>> and ignore any further late (due to internal beam processing rather than
>> late data). Note that stateful DoFn is partitioned by key and window so if
>> you use 5 minute windows you'll only see the state that that was recorded
>> for that 5 mins which is why its important to use the global window, also
>> note that its easy to have "leaks" in stateful DoFns since the watermark
>> will never surpass the point in time when the state data would be garbage
>> collected.
>>
>>
>> You might have better success with a simpler application though, it all
>> depends on what the rest of the system looks like and what your integrating
>> with.
>>
>> On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <st...@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> We are trying to implement a scenario that requires a rigid order of
>>> events arriving into a Global Aggregation.
>>>
>>> To protect the innocent, I've simplified and modified the exact scenario
>>> into something like the following:
>>>
>>>
>>>    1. I want to calculate a Client's all time Bank balance for example.
>>>    2. I would like to at fixed intervals, emit this all time Balance,
>>>    and join it onto other Fixed window aggregates.
>>>
>>>
>>> In principle something like this:
>>> [image: image.png]
>>>
>>> Now the problem is, that it would seem that the order in which the fixed
>>> windows arrive in the Global window is not Guaranteed (given our
>>> reading/experimenting).
>>>
>>> This means that something like the following could potentially occur:
>>> [image: image.png]
>>>
>>> Essentially, even though the events/fixed windows are ordered, we can at
>>> any given point in time omit an incorrect balance for the particular
>>> person. (Even though our underlying events are ordered, and no late data
>>> can arrive).
>>>
>>> Given our current understanding  of how the compute gets parallelised
>>> (which is how Beam Achieves its high throughput) this appears unavoidable.
>>>
>>> Unfortunately, we do require a globally correct balance every time we
>>> omit the value.
>>>
>>> Are we missing something?
>>> Does one need to sacrifice this consistency if using Beam?
>>> Must one write some sort of a stateful DoFn that keeps track of the past
>>> X windows and eventually decides: OK I'm as consistent as I am willing to
>>> be, Re-order the events, emit results, purge the "complete set" and ignore
>>> any further late (due to internal beam processing rather than late data)
>>> and move along?
>>> Or is there something else we can do? (,Tweak some options :) or Move
>>> this calc into another streaming framework, with stricter guarantees for
>>> example).
>>>
>>> It seams like such an obvious use case at first glance, but learning
>>> more about Beam, seems to indicate that this is not actually a use case
>>> that it is well suited for?
>>>
>>> Stephan
>>>
>>>

Re: Global Window - Correctness/Completeness

Posted by Stephan Kotze <st...@gmail.com>.
Thanks Lukasz.

As a Hypothetical question (to anyone more familiar with the runner than I):
If one is using the Flink runner, would it theoretically be possible to
modify the runner so that it constructs its pipelines as:
stream.keyBy(...).orderByTime()
<https://cwiki.apache.org/confluence/display/FLINK/stream.keyBy(...).orderByTime()>
.
Unless I'm missing something, this could be a cheeky way (bypassing the
Beam API) to enforce stricter ordering (with all the ensuing costs) on a
runner that would appear to support the desired semantics?

https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams

Stephan

On Wed, May 23, 2018 at 5:41 PM Lukasz Cwik <lc...@google.com> wrote:

> The issue with Apache Beam and many stream processing systems is that they
> don't support back edges since they produce loops which may or may not be
> able to be resolved.
>
> One way to work around this is to emit your data to a pubsub system like
> Kafka or GCP Pubsub and read that as a source within your pipeline in
> addition to your normal source.
> OriginalSource --> 5 min aggregations --> CoGBK --> current sum = current
> aggregation + past sum  --> OriginalSink
> BackEdgeSource -------------------------/
>                        \-- BackEdgeSink
>
> This allows for both the OriginalSource and BackEdgeSource to maintain
> their own watermarks and for the CoGBK (or Flatten + GBK if the input has
> the same key and value type) to produce output based upon the windowing
> strategy you define (no manual state/timers management !). Its important
> that the BackEdgeSource's elements timestamps that are emitted are for the
> window it will be used in, so for a 5 min aggregation window if the current
> sum is at time X, you should emit it at X+5 to become the past sum for the
> next window. Similarly it would be wise to use triggers which only fire
> once (so no speculative or late firings).
>
>
> If you don't want to use the solution above, then using Global windows and
> stateful DoFn makes the most sense based upon what you described using "track
> the past X windows and eventually decides: OK I'm as consistent as I am
> willing to be, Re-order the events, emit results, purge the "complete set"
> and ignore any further late (due to internal beam processing rather than
> late data). Note that stateful DoFn is partitioned by key and window so if
> you use 5 minute windows you'll only see the state that that was recorded
> for that 5 mins which is why its important to use the global window, also
> note that its easy to have "leaks" in stateful DoFns since the watermark
> will never surpass the point in time when the state data would be garbage
> collected.
>
>
> You might have better success with a simpler application though, it all
> depends on what the rest of the system looks like and what your integrating
> with.
>
> On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <st...@gmail.com>
> wrote:
>
>> Hi
>>
>> We are trying to implement a scenario that requires a rigid order of
>> events arriving into a Global Aggregation.
>>
>> To protect the innocent, I've simplified and modified the exact scenario
>> into something like the following:
>>
>>
>>    1. I want to calculate a Client's all time Bank balance for example.
>>    2. I would like to at fixed intervals, emit this all time Balance,
>>    and join it onto other Fixed window aggregates.
>>
>>
>> In principle something like this:
>> [image: image.png]
>>
>> Now the problem is, that it would seem that the order in which the fixed
>> windows arrive in the Global window is not Guaranteed (given our
>> reading/experimenting).
>>
>> This means that something like the following could potentially occur:
>> [image: image.png]
>>
>> Essentially, even though the events/fixed windows are ordered, we can at
>> any given point in time omit an incorrect balance for the particular
>> person. (Even though our underlying events are ordered, and no late data
>> can arrive).
>>
>> Given our current understanding  of how the compute gets parallelised
>> (which is how Beam Achieves its high throughput) this appears unavoidable.
>>
>> Unfortunately, we do require a globally correct balance every time we
>> omit the value.
>>
>> Are we missing something?
>> Does one need to sacrifice this consistency if using Beam?
>> Must one write some sort of a stateful DoFn that keeps track of the past
>> X windows and eventually decides: OK I'm as consistent as I am willing to
>> be, Re-order the events, emit results, purge the "complete set" and ignore
>> any further late (due to internal beam processing rather than late data)
>> and move along?
>> Or is there something else we can do? (,Tweak some options :) or Move
>> this calc into another streaming framework, with stricter guarantees for
>> example).
>>
>> It seams like such an obvious use case at first glance, but learning more
>> about Beam, seems to indicate that this is not actually a use case that it
>> is well suited for?
>>
>> Stephan
>>
>>

Re: Global Window - Correctness/Completeness

Posted by Lukasz Cwik <lc...@google.com>.
The issue with Apache Beam and many stream processing systems is that they
don't support back edges since they produce loops which may or may not be
able to be resolved.

One way to work around this is to emit your data to a pubsub system like
Kafka or GCP Pubsub and read that as a source within your pipeline in
addition to your normal source.
OriginalSource --> 5 min aggregations --> CoGBK --> current sum = current
aggregation + past sum  --> OriginalSink
BackEdgeSource -------------------------/
                       \-- BackEdgeSink

This allows for both the OriginalSource and BackEdgeSource to maintain
their own watermarks and for the CoGBK (or Flatten + GBK if the input has
the same key and value type) to produce output based upon the windowing
strategy you define (no manual state/timers management !). Its important
that the BackEdgeSource's elements timestamps that are emitted are for the
window it will be used in, so for a 5 min aggregation window if the current
sum is at time X, you should emit it at X+5 to become the past sum for the
next window. Similarly it would be wise to use triggers which only fire
once (so no speculative or late firings).


If you don't want to use the solution above, then using Global windows and
stateful DoFn makes the most sense based upon what you described using "track
the past X windows and eventually decides: OK I'm as consistent as I am
willing to be, Re-order the events, emit results, purge the "complete set"
and ignore any further late (due to internal beam processing rather than
late data). Note that stateful DoFn is partitioned by key and window so if
you use 5 minute windows you'll only see the state that that was recorded
for that 5 mins which is why its important to use the global window, also
note that its easy to have "leaks" in stateful DoFns since the watermark
will never surpass the point in time when the state data would be garbage
collected.


You might have better success with a simpler application though, it all
depends on what the rest of the system looks like and what your integrating
with.

On Wed, May 23, 2018 at 4:43 AM Stephan Kotze <st...@gmail.com>
wrote:

> Hi
>
> We are trying to implement a scenario that requires a rigid order of
> events arriving into a Global Aggregation.
>
> To protect the innocent, I've simplified and modified the exact scenario
> into something like the following:
>
>
>    1. I want to calculate a Client's all time Bank balance for example.
>    2. I would like to at fixed intervals, emit this all time Balance, and
>    join it onto other Fixed window aggregates.
>
>
> In principle something like this:
> [image: image.png]
>
> Now the problem is, that it would seem that the order in which the fixed
> windows arrive in the Global window is not Guaranteed (given our
> reading/experimenting).
>
> This means that something like the following could potentially occur:
> [image: image.png]
>
> Essentially, even though the events/fixed windows are ordered, we can at
> any given point in time omit an incorrect balance for the particular
> person. (Even though our underlying events are ordered, and no late data
> can arrive).
>
> Given our current understanding  of how the compute gets parallelised
> (which is how Beam Achieves its high throughput) this appears unavoidable.
>
> Unfortunately, we do require a globally correct balance every time we omit
> the value.
>
> Are we missing something?
> Does one need to sacrifice this consistency if using Beam?
> Must one write some sort of a stateful DoFn that keeps track of the past X
> windows and eventually decides: OK I'm as consistent as I am willing to be,
> Re-order the events, emit results, purge the "complete set" and ignore any
> further late (due to internal beam processing rather than late data) and
> move along?
> Or is there something else we can do? (,Tweak some options :) or Move this
> calc into another streaming framework, with stricter guarantees for
> example).
>
> It seams like such an obvious use case at first glance, but learning more
> about Beam, seems to indicate that this is not actually a use case that it
> is well suited for?
>
> Stephan
>
>