You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Steve Niemitz <sn...@apache.org> on 2019/02/12 15:31:05 UTC

Some questions about ensuring correctness with windowing and triggering

Hi everyone, I have some questions I want to ask about how windowing,
triggering, and panes work together, and how to ensure correctness
throughout a pipeline.

Lets assume I have a very simple streaming pipeline that looks like:
Source -> CombineByKey (Sum) -> Sink

Given fixed windows of 1 hour, early firings every minute, and accumulating
panes, this is pretty straight forward.  However, this can get more
complicated if we add steps after the CombineByKey, for instance (using the
same windowing strategy):

Say I want to buffer the results of the CombineByKey into batches of N
elements.  I can do this with the built-in GroupIntoBatches [1] transform,
now my pipeline looks like:

Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink

*This leads to my main question:*
Is ordering preserved somehow here?  ie: is it possible that the result
from early firing F+1 now comes BEFORE the firing F (because it was
re-ordered in the GroupIntoBatches).  This would mean that the sink then
gets F+1 before F, which means my resulting store has incorrect data
(possibly forever if F+1 was the final firing).

If ordering is not preserved, it seems as if I'd need to introduce my own
ordering back in after GroupIntoBatches.  GIB is an example here, but I
imagine this could happen with any GBK type operation.

Am I thinking about this the correct way?  Thanks!

[1]
https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Feb 20, 2019 at 6:54 PM Raghu Angadi <an...@gmail.com> wrote:
>
> On Tue, Feb 12, 2019 at 10:28 AM Robert Bradshaw <ro...@google.com> wrote:
>>
>> Correct, even within the same key there's no promise of event time ordering mapping of panes to real time ordering because the downstream operations may happen on a different machine. Multiply triggered windows add an element of non-determinism to the process.
>
> For clarification, the stage immediately after GBK itself processes fired panes in order, correct? Of course, any more stages downstream of that may see them out of order.

There is no such guarantee, but in runners that use the standard
group-also-by-windows libraries and do fusion this often happens to be
the case.

>> You're also correct that triggering with multiple panes requires lots of care, especially when it comes to operations with side effects (like sinks). Most safe is to only write the final pane to the sink, and handle early triggering in a different way. https://s.apache.org/beam-sink-triggers is a proposal to make this easier to reason about.
>>
>>
>> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sn...@apache.org> wrote:
>>>
>>> Also to clarify here (I re-read this and realized it could be slightly unclear).  My question is only about in-order delivery of panes.  ie: will pane P always be delivered before P+1.
>>>
>>> I realize the use of "in-order" before could be confusing, I don't care about the ordering of the elements per-se, just the ordering of the pane delivery.
>>>
>>> I want to make sure that given a GBK that produces 3 panes (P0, P1, P2) for a key, a downstream PCollection could never see P0, P2, P1.  OR at least, the final firing is always guaranteed to be delivered after all early-firings (eg we could have P0, P2, P1, but then always PLast).
>>>
>>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <sn...@apache.org> wrote:
>>>>
>>>> Are you also saying also that even in the first example (Source -> CombineByKey (Sum) -> Sink) there's no guarantee that events would be delivered in-order from the Combine -> Sink transforms?  This seems like a pretty big "got-cha" for correctness if you ever use accumulating triggering.
>>>>
>>>> I'd also like to point out I'm not talking about a global ordering across the entire PCollection, I'm talking about within the same key after a GBK transform.
>>>>
>>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <ro...@google.com> wrote:
>>>>>
>>>>> Due to the nature of distributed processing, order is not preserved. You can, however, inspect the PaneInfo to determine if an element was early, on-time, or late and act accordingly.
>>>>>
>>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <jc...@gmail.com> wrote:
>>>>>>
>>>>>> In my experience ordering is not guaranteed, you may need apply a transformation that sort the elements and then dispatch them sorted out.
>>>>>>
>>>>>> Or uses the Sorter extension for this:
>>>>>>
>>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>>>
>>>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb. 2019, 16:31:
>>>>>>>
>>>>>>> Hi everyone, I have some questions I want to ask about how windowing, triggering, and panes work together, and how to ensure correctness throughout a pipeline.
>>>>>>>
>>>>>>> Lets assume I have a very simple streaming pipeline that looks like:
>>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>>>>
>>>>>>> Given fixed windows of 1 hour, early firings every minute, and accumulating panes, this is pretty straight forward.  However, this can get more complicated if we add steps after the CombineByKey, for instance (using the same windowing strategy):
>>>>>>>
>>>>>>> Say I want to buffer the results of the CombineByKey into batches of N elements.  I can do this with the built-in GroupIntoBatches [1] transform, now my pipeline looks like:
>>>>>>>
>>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>>>>
>>>>>>> This leads to my main question:
>>>>>>> Is ordering preserved somehow here?  ie: is it possible that the result from early firing F+1 now comes BEFORE the firing F (because it was re-ordered in the GroupIntoBatches).  This would mean that the sink then gets F+1 before F, which means my resulting store has incorrect data (possibly forever if F+1 was the final firing).
>>>>>>>
>>>>>>> If ordering is not preserved, it seems as if I'd need to introduce my own ordering back in after GroupIntoBatches.  GIB is an example here, but I imagine this could happen with any GBK type operation.
>>>>>>>
>>>>>>> Am I thinking about this the correct way?  Thanks!
>>>>>>>
>>>>>>> [1] https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Raghu Angadi <an...@gmail.com>.
On Tue, Feb 12, 2019 at 10:28 AM Robert Bradshaw <ro...@google.com>
wrote:

> Correct, even within the same key there's no promise of event time
> ordering mapping of panes to real time ordering because the downstream
> operations *may* happen on a different machine. Multiply triggered
> windows add an element of non-determinism to the process.
>

For clarification, the stage immediately after GBK itself processes fired
panes in order, correct? Of course, any more stages downstream of that may
see them out of order.

Raghu.

>
> You're also correct that triggering with multiple panes requires lots of
> care, especially when it comes to operations with side effects (like
> sinks). Most safe is to only write the final pane to the sink, and handle
> early triggering in a different way.
> https://s.apache.org/beam-sink-triggers is a proposal to make this easier
> to reason about.
>
>
> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sn...@apache.org> wrote:
>
>> Also to clarify here (I re-read this and realized it could be slightly
>> unclear).  My question is only about in-order delivery of panes.  ie: will
>> pane P always be delivered before P+1.
>>
>> I realize the use of "in-order" before could be confusing, I don't care
>> about the ordering of the elements per-se, just the ordering of the pane
>> delivery.
>>
>> I want to make sure that given a GBK that produces 3 panes (P0, P1, P2)
>> for a key, a downstream PCollection could never see P0, P2, P1.  OR at
>> least, the final firing is always guaranteed to be delivered after all
>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>>
>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <sn...@apache.org>
>> wrote:
>>
>>> Are you also saying also that even in the first example (Source ->
>>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>>> pretty big "got-cha" for correctness if you ever use accumulating
>>> triggering.
>>>
>>> I'd also like to point out I'm not talking about a global ordering
>>> across the entire PCollection, I'm talking about within the same key after
>>> a GBK transform.
>>>
>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> Due to the nature of distributed processing, order is not preserved.
>>>> You can, however, inspect the PaneInfo to determine if an element was
>>>> early, on-time, or late and act accordingly.
>>>>
>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <jc...@gmail.com>
>>>> wrote:
>>>>
>>>>> In my experience ordering is not guaranteed, you may need apply a
>>>>> transformation that sort the elements and then dispatch them sorted out.
>>>>>
>>>>> Or uses the Sorter extension for this:
>>>>>
>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>>
>>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb. 2019,
>>>>> 16:31:
>>>>>
>>>>>> Hi everyone, I have some questions I want to ask about how windowing,
>>>>>> triggering, and panes work together, and how to ensure correctness
>>>>>> throughout a pipeline.
>>>>>>
>>>>>> Lets assume I have a very simple streaming pipeline that looks like:
>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>>>
>>>>>> Given fixed windows of 1 hour, early firings every minute, and
>>>>>> accumulating panes, this is pretty straight forward.  However, this can get
>>>>>> more complicated if we add steps after the CombineByKey, for instance
>>>>>> (using the same windowing strategy):
>>>>>>
>>>>>> Say I want to buffer the results of the CombineByKey into batches of
>>>>>> N elements.  I can do this with the built-in GroupIntoBatches [1]
>>>>>> transform, now my pipeline looks like:
>>>>>>
>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>>>
>>>>>> *This leads to my main question:*
>>>>>> Is ordering preserved somehow here?  ie: is it possible that the
>>>>>> result from early firing F+1 now comes BEFORE the firing F (because it was
>>>>>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>>>>>> gets F+1 before F, which means my resulting store has incorrect data
>>>>>> (possibly forever if F+1 was the final firing).
>>>>>>
>>>>>> If ordering is not preserved, it seems as if I'd need to introduce my
>>>>>> own ordering back in after GroupIntoBatches.  GIB is an example here, but I
>>>>>> imagine this could happen with any GBK type operation.
>>>>>>
>>>>>> Am I thinking about this the correct way?  Thanks!
>>>>>>
>>>>>> [1]
>>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>>>>
>>>>>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Robert Bradshaw <ro...@google.com>.
I also think that we should improve our documentation. Multiply-fired
triggers is an advanced and dangerous feature, and care needs to be
taken that the downstream portions of the pipeline handle them
correctly.

I filed https://issues.apache.org/jira/browse/BEAM-6716

On Wed, Feb 20, 2019 at 5:40 AM Kenneth Knowles <ke...@apache.org> wrote:
>
> This is a very valid concern so I want to offer advice you can apply today.
>
> "We actually do track tentative vs final values already, but checking that at write-time would impose a pretty big overhead in the write path."
>
> You mention you are writing to BigTable. If your key is conceptually the tuple of (isFinal, actual key) then you can avoid read-modify-write, and at query time (of your BigTable) you can choose to only look at final values. There are probably variations of this so you can get "on-time" as another category to fetch.
>
> One reason we have not taken on the large-scale re-architecting is that if you control the whole pipeline it is usually possible to meet your business need with some combination of accumulation mode, existing sinks, and behavior of your downstream client.
>
> I also wanted to respond to this bit:
>
> >> I was reading this yesterday, but couldn't see how it solved the out-of-order delivery problem here...
>
> > It moves the responsibility of doing things in the right order (and even defining what order is "correct enough") to the runner (and sinks) such that the side effects happen in order...
>
> Again, emphasis on sinks. Many of the ideas there just apply to sinks in general. For example, the sink that writes deltas and the sink that writes whole elements are _different sinks_. The sink that treats retractions as "delete" side effects and the sink that write a row representing the retraction are _different sinks_. These are steps we could take in Beam today.
>
> Kenn
>
>
> On Tue, Feb 19, 2019 at 6:55 AM Steve Niemitz <sn...@twitter.com> wrote:
>>
>> Thanks again for all the replies everyone.  Just as a final follow up here, are there any concrete plans on addressing these issues I could start following?  The sink trigger doc seems like a start, but also seems like just a starting point in a larger re-architecture of sinks.
>>
>> On Fri, Feb 15, 2019 at 4:34 PM Kenneth Knowles <kl...@google.com> wrote:
>>>
>>>
>>>
>>> On Wed, Feb 13, 2019 at 3:11 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>
>>>> On Wed, Feb 13, 2019 at 11:39 PM Steve Niemitz <sn...@twitter.com> wrote:
>>>>>
>>>>>
>>>>> On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>>>
>>>>>> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz <sn...@apache.org> wrote:
>>>>>>>
>>>>>>> Thanks again for the answers so far!  I really appreciate it.  As for my specific use-case, we're using Bigtable as the final sink, and I'd prefer to keep our writes fully idempotent for other reasons (ie no read-modify-write).  We actually do track tentative vs final values already, but checking that at write-time would impose a pretty big overhead in the write path.
>>>>>>>
>>>>>>> After this I actually instrumented one of my running pipelines to detect these "time traveling" panes, and did see it occurring pretty frequently, particularly when dataflow decides to scale up/down the job, so that was interesting.
>>>>>>>
>>>>>>> From all this, it seems like using a stateful DoFn to prevent time traveling panes from overwriting newer ones is the best solution for now.
>>>>>>
>>>>>>
>>>>>> Note that you can't "filter out" these time traveling panes, because at the next fusion break they might get re-ordered again.
>>>>>
>>>>>
>>>>> Ack, in a general sense.  To solve my specific problem my plan was to ensure the final writer sink would be fused to this filter step (or even build it directly into the DoFn itself that does the write), which would work in my specific case (it seems like at least).
>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> My last question / statement is just around general education and documentation about this.  I think the fact that PCollection are unordered makes sense and is pretty intuitive, but fired panes being delivered out-of-order seems very surprising.  I'm curious how many other pipelines exist that run into this (and produce incorrect results!) but people are unaware of.  Is there a way we can call this behavior out?  For example, many of the sample beam projects use early firings, but there's never any mention that the output may be out-of-order.
>>>>>>
>>>>>>
>>>>>> +1 to improving the documentation here. Basically multiple firings become independent elements of the resulting PCollection, they don't retain any association/ordering.
>>>>>>
>>>>>> Multiply-triggered window are difficult to reason about (and not just in this case), https://s.apache.org/beam-sink-triggers is IMHO the right answer.
>>>>>
>>>>>
>>>>> I was reading this yesterday, but couldn't see how it solved the out-of-order delivery problem here.  I do like the overall direction its proposing though, from my work with triggers so far I have found them very difficult to reason about (like you said).
>>>>
>>>>
>>>> It moves the responsibility of doing things in the right order (and even defining what order is "correct enough") to the runner (and sinks) such that the side effects happen in order, even if all the processing did not. To be clear there's still a fair amount of design to make that doc into a workable system...
>>>
>>>
>>> With or without sink triggers, transforms that write need to be pane-index-aware. The outputs themselves may be out of order, but they have sequence numbers on them, so sinks likely need to be made stateful so they can be idempotent in the face of reordering.
>>>
>>> Kenn
>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <ro...@google.com> wrote:
>>>>>>>>
>>>>>>>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <sn...@twitter.com> wrote:
>>>>>>>> >
>>>>>>>> > wow, thats super unexpected and dangerous, thanks for clarifying!  Time to go re-think how we do some of our writes w/ early firings then.
>>>>>>>> >
>>>>>>>> > Are there any workarounds to make things happen in-order in dataflow?  eg if the sink gets fused to the output of the GBK operation, will it always happen effectively in order (per key) even though it's not a guarantee?
>>>>>>>>
>>>>>>>> If things get fused, yes. Note that sinks themselves sometimes have fusion barriers though.
>>>>>>>>
>>>>>>>> > I also guess I could keep track of the last pane index my sink has seen, and ignore earlier ones (using state to keep track)?
>>>>>>>>
>>>>>>>> Yes, that would work.
>>>>>>>>
>>>>>>>> What kind of sink are you using? If it supports read-modify-write or some kind of transaction you may be able to mark early results as tentative (which would be useful anyway) and only overwrite tentative ones.
>>>>>>>>
>>>>>>>>
>>>>>>>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <ro...@google.com> wrote:
>>>>>>>> >>
>>>>>>>> >> Correct, even within the same key there's no promise of event time ordering mapping of panes to real time ordering because the downstream operations may happen on a different machine. Multiply triggered windows add an element of non-determinism to the process.
>>>>>>>> >>
>>>>>>>> >> You're also correct that triggering with multiple panes requires lots of care, especially when it comes to operations with side effects (like sinks). Most safe is to only write the final pane to the sink, and handle early triggering in a different way. https://s.apache.org/beam-sink-triggers is a proposal to make this easier to reason about.
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sn...@apache.org> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Also to clarify here (I re-read this and realized it could be slightly unclear).  My question is only about in-order delivery of panes.  ie: will pane P always be delivered before P+1.
>>>>>>>> >>>
>>>>>>>> >>> I realize the use of "in-order" before could be confusing, I don't care about the ordering of the elements per-se, just the ordering of the pane delivery.
>>>>>>>> >>>
>>>>>>>> >>> I want to make sure that given a GBK that produces 3 panes (P0, P1, P2) for a key, a downstream PCollection could never see P0, P2, P1.  OR at least, the final firing is always guaranteed to be delivered after all early-firings (eg we could have P0, P2, P1, but then always PLast).
>>>>>>>> >>>
>>>>>>>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <sn...@apache.org> wrote:
>>>>>>>> >>>>
>>>>>>>> >>>> Are you also saying also that even in the first example (Source -> CombineByKey (Sum) -> Sink) there's no guarantee that events would be delivered in-order from the Combine -> Sink transforms?  This seems like a pretty big "got-cha" for correctness if you ever use accumulating triggering.
>>>>>>>> >>>>
>>>>>>>> >>>> I'd also like to point out I'm not talking about a global ordering across the entire PCollection, I'm talking about within the same key after a GBK transform.
>>>>>>>> >>>>
>>>>>>>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <ro...@google.com> wrote:
>>>>>>>> >>>>>
>>>>>>>> >>>>> Due to the nature of distributed processing, order is not preserved. You can, however, inspect the PaneInfo to determine if an element was early, on-time, or late and act accordingly.
>>>>>>>> >>>>>
>>>>>>>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <jc...@gmail.com> wrote:
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> In my experience ordering is not guaranteed, you may need apply a transformation that sort the elements and then dispatch them sorted out.
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> Or uses the Sorter extension for this:
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>>>>> >>>>>>
>>>>>>>> >>>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb. 2019, 16:31:
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Hi everyone, I have some questions I want to ask about how windowing, triggering, and panes work together, and how to ensure correctness throughout a pipeline.
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Lets assume I have a very simple streaming pipeline that looks like:
>>>>>>>> >>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Given fixed windows of 1 hour, early firings every minute, and accumulating panes, this is pretty straight forward.  However, this can get more complicated if we add steps after the CombineByKey, for instance (using the same windowing strategy):
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Say I want to buffer the results of the CombineByKey into batches of N elements.  I can do this with the built-in GroupIntoBatches [1] transform, now my pipeline looks like:
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> This leads to my main question:
>>>>>>>> >>>>>>> Is ordering preserved somehow here?  ie: is it possible that the result from early firing F+1 now comes BEFORE the firing F (because it was re-ordered in the GroupIntoBatches).  This would mean that the sink then gets F+1 before F, which means my resulting store has incorrect data (possibly forever if F+1 was the final firing).
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> If ordering is not preserved, it seems as if I'd need to introduce my own ordering back in after GroupIntoBatches.  GIB is an example here, but I imagine this could happen with any GBK type operation.
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> Am I thinking about this the correct way?  Thanks!
>>>>>>>> >>>>>>>
>>>>>>>> >>>>>>> [1] https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Kenneth Knowles <ke...@apache.org>.
This is a very valid concern so I want to offer advice you can apply today.

"We actually do track tentative vs final values already, but checking that
at write-time would impose a pretty big overhead in the write path."

You mention you are writing to BigTable. If your key is conceptually the
tuple of (isFinal, actual key) then you can avoid read-modify-write, and at
query time (of your BigTable) you can choose to only look at final values.
There are probably variations of this so you can get "on-time" as another
category to fetch.

One reason we have not taken on the large-scale re-architecting is that if
you control the whole pipeline it is usually possible to meet your business
need with some combination of accumulation mode, existing sinks, and
behavior of your downstream client.

I also wanted to respond to this bit:

>> I was reading this yesterday, but couldn't see how it solved the
out-of-order delivery problem here...

> It moves the responsibility of doing things in the right order (and even
defining what order is "correct enough") to the runner (and sinks) such
that the side effects happen in order...

Again, emphasis on sinks. Many of the ideas there just apply to sinks in
general. For example, the sink that writes deltas and the sink that writes
whole elements are _different sinks_. The sink that treats retractions as
"delete" side effects and the sink that write a row representing the
retraction are _different sinks_. These are steps we could take in Beam
today.

Kenn


On Tue, Feb 19, 2019 at 6:55 AM Steve Niemitz <sn...@twitter.com> wrote:

> Thanks again for all the replies everyone.  Just as a final follow up
> here, are there any concrete plans on addressing these issues I could start
> following?  The sink trigger doc seems like a start, but also seems like
> just a starting point in a larger re-architecture of sinks.
>
> On Fri, Feb 15, 2019 at 4:34 PM Kenneth Knowles <kl...@google.com> wrote:
>
>>
>>
>> On Wed, Feb 13, 2019 at 3:11 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Wed, Feb 13, 2019 at 11:39 PM Steve Niemitz <sn...@twitter.com>
>>> wrote:
>>>
>>>>
>>>> On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz <sn...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Thanks again for the answers so far!  I really appreciate it.  As for
>>>>>> my specific use-case, we're using Bigtable as the final sink, and I'd
>>>>>> prefer to keep our writes fully idempotent for other reasons (ie no
>>>>>> read-modify-write).  We actually do track tentative vs final values
>>>>>> already, but checking that at write-time would impose a pretty big overhead
>>>>>> in the write path.
>>>>>>
>>>>>> After this I actually instrumented one of my running pipelines to
>>>>>> detect these "time traveling" panes, and did see it occurring pretty
>>>>>> frequently, particularly when dataflow decides to scale up/down the job, so
>>>>>> that was interesting.
>>>>>>
>>>>>> From all this, it seems like using a stateful DoFn to prevent time
>>>>>> traveling panes from overwriting newer ones is the best solution for now.
>>>>>>
>>>>>
>>>>> Note that you can't "filter out" these time traveling panes, because
>>>>> at the next fusion break they might get re-ordered again.
>>>>>
>>>>
>>>> Ack, in a general sense.  To solve my specific problem my plan was to
>>>> ensure the final writer sink would be fused to this filter step (or even
>>>> build it directly into the DoFn itself that does the write), which would
>>>> work in my specific case (it seems like at least).
>>>>
>>>>
>>>>>
>>>>>
>>>>>> My last question / statement is just around general education and
>>>>>> documentation about this.  I think the fact that PCollection are unordered
>>>>>> makes sense and is pretty intuitive, but fired panes being delivered
>>>>>> out-of-order seems very surprising.  I'm curious how many other pipelines
>>>>>> exist that run into this (and produce incorrect results!) but people are
>>>>>> unaware of.  Is there a way we can call this behavior out?  For example,
>>>>>> many of the sample beam projects use early firings, but there's never any
>>>>>> mention that the output may be out-of-order.
>>>>>>
>>>>>
>>>>> +1 to improving the documentation here. Basically multiple firings
>>>>> become independent elements of the resulting PCollection, they don't retain
>>>>> any association/ordering.
>>>>>
>>>>> Multiply-triggered window are difficult to reason about (and not just
>>>>> in this case), https://s.apache.org/beam-sink-triggers is IMHO the
>>>>> right answer.
>>>>>
>>>>
>>>> I was reading this yesterday, but couldn't see how it solved the
>>>> out-of-order delivery problem here.  I do like the overall direction its
>>>> proposing though, from my work with triggers so far I have found them very
>>>> difficult to reason about (like you said).
>>>>
>>>
>>> It moves the responsibility of doing things in the right order (and even
>>> defining what order is "correct enough") to the runner (and sinks) such
>>> that the side effects happen in order, even if all the processing did not.
>>> To be clear there's still a fair amount of design to make that doc into a
>>> workable system...
>>>
>>
>> With or without sink triggers, transforms that write need to be
>> pane-index-aware. The outputs themselves may be out of order, but they have
>> sequence numbers on them, so sinks likely need to be made stateful so they
>> can be idempotent in the face of reordering.
>>
>> Kenn
>>
>>
>>>
>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>>> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <sn...@twitter.com>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > wow, thats super unexpected and dangerous, thanks for clarifying!
>>>>>>> Time to go re-think how we do some of our writes w/ early firings then.
>>>>>>> >
>>>>>>> > Are there any workarounds to make things happen in-order in
>>>>>>> dataflow?  eg if the sink gets fused to the output of the GBK operation,
>>>>>>> will it always happen effectively in order (per key) even though it's not a
>>>>>>> guarantee?
>>>>>>>
>>>>>>> If things get fused, yes. Note that sinks themselves sometimes have
>>>>>>> fusion barriers though.
>>>>>>>
>>>>>>> > I also guess I could keep track of the last pane index my sink has
>>>>>>> seen, and ignore earlier ones (using state to keep track)?
>>>>>>>
>>>>>>> Yes, that would work.
>>>>>>>
>>>>>>> What kind of sink are you using? If it supports read-modify-write or
>>>>>>> some kind of transaction you may be able to mark early results as tentative
>>>>>>> (which would be useful anyway) and only overwrite tentative ones.
>>>>>>>
>>>>>>>
>>>>>>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <
>>>>>>> robertwb@google.com> wrote:
>>>>>>> >>
>>>>>>> >> Correct, even within the same key there's no promise of event
>>>>>>> time ordering mapping of panes to real time ordering because the downstream
>>>>>>> operations may happen on a different machine. Multiply triggered windows
>>>>>>> add an element of non-determinism to the process.
>>>>>>> >>
>>>>>>> >> You're also correct that triggering with multiple panes requires
>>>>>>> lots of care, especially when it comes to operations with side effects
>>>>>>> (like sinks). Most safe is to only write the final pane to the sink, and
>>>>>>> handle early triggering in a different way.
>>>>>>> https://s.apache.org/beam-sink-triggers is a proposal to make this
>>>>>>> easier to reason about.
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <
>>>>>>> sniemitz@apache.org> wrote:
>>>>>>> >>>
>>>>>>> >>> Also to clarify here (I re-read this and realized it could be
>>>>>>> slightly unclear).  My question is only about in-order delivery of panes.
>>>>>>>  ie: will pane P always be delivered before P+1.
>>>>>>> >>>
>>>>>>> >>> I realize the use of "in-order" before could be confusing, I
>>>>>>> don't care about the ordering of the elements per-se, just the ordering of
>>>>>>> the pane delivery.
>>>>>>> >>>
>>>>>>> >>> I want to make sure that given a GBK that produces 3 panes (P0,
>>>>>>> P1, P2) for a key, a downstream PCollection could never see P0, P2, P1.  OR
>>>>>>> at least, the final firing is always guaranteed to be delivered after all
>>>>>>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>>>>>>> >>>
>>>>>>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <
>>>>>>> sniemitz@apache.org> wrote:
>>>>>>> >>>>
>>>>>>> >>>> Are you also saying also that even in the first example (Source
>>>>>>> -> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>>>>>>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>>>>>>> pretty big "got-cha" for correctness if you ever use accumulating
>>>>>>> triggering.
>>>>>>> >>>>
>>>>>>> >>>> I'd also like to point out I'm not talking about a global
>>>>>>> ordering across the entire PCollection, I'm talking about within the same
>>>>>>> key after a GBK transform.
>>>>>>> >>>>
>>>>>>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <
>>>>>>> robertwb@google.com> wrote:
>>>>>>> >>>>>
>>>>>>> >>>>> Due to the nature of distributed processing, order is not
>>>>>>> preserved. You can, however, inspect the PaneInfo to determine if an
>>>>>>> element was early, on-time, or late and act accordingly.
>>>>>>> >>>>>
>>>>>>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <
>>>>>>> jcgarciam@gmail.com> wrote:
>>>>>>> >>>>>>
>>>>>>> >>>>>> In my experience ordering is not guaranteed, you may need
>>>>>>> apply a transformation that sort the elements and then dispatch them sorted
>>>>>>> out.
>>>>>>> >>>>>>
>>>>>>> >>>>>> Or uses the Sorter extension for this:
>>>>>>> >>>>>>
>>>>>>> >>>>>>
>>>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>>>> >>>>>>
>>>>>>> >>>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb.
>>>>>>> 2019, 16:31:
>>>>>>> >>>>>>>
>>>>>>> >>>>>>> Hi everyone, I have some questions I want to ask about how
>>>>>>> windowing, triggering, and panes work together, and how to ensure
>>>>>>> correctness throughout a pipeline.
>>>>>>> >>>>>>>
>>>>>>> >>>>>>> Lets assume I have a very simple streaming pipeline that
>>>>>>> looks like:
>>>>>>> >>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>>>> >>>>>>>
>>>>>>> >>>>>>> Given fixed windows of 1 hour, early firings every minute,
>>>>>>> and accumulating panes, this is pretty straight forward.  However, this can
>>>>>>> get more complicated if we add steps after the CombineByKey, for instance
>>>>>>> (using the same windowing strategy):
>>>>>>> >>>>>>>
>>>>>>> >>>>>>> Say I want to buffer the results of the CombineByKey into
>>>>>>> batches of N elements.  I can do this with the built-in GroupIntoBatches
>>>>>>> [1] transform, now my pipeline looks like:
>>>>>>> >>>>>>>
>>>>>>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>>>> >>>>>>>
>>>>>>> >>>>>>> This leads to my main question:
>>>>>>> >>>>>>> Is ordering preserved somehow here?  ie: is it possible that
>>>>>>> the result from early firing F+1 now comes BEFORE the firing F (because it
>>>>>>> was re-ordered in the GroupIntoBatches).  This would mean that the sink
>>>>>>> then gets F+1 before F, which means my resulting store has incorrect data
>>>>>>> (possibly forever if F+1 was the final firing).
>>>>>>> >>>>>>>
>>>>>>> >>>>>>> If ordering is not preserved, it seems as if I'd need to
>>>>>>> introduce my own ordering back in after GroupIntoBatches.  GIB is an
>>>>>>> example here, but I imagine this could happen with any GBK type operation.
>>>>>>> >>>>>>>
>>>>>>> >>>>>>> Am I thinking about this the correct way?  Thanks!
>>>>>>> >>>>>>>
>>>>>>> >>>>>>> [1]
>>>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>>>>>
>>>>>>>>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Steve Niemitz <sn...@twitter.com>.
Thanks again for all the replies everyone.  Just as a final follow up here,
are there any concrete plans on addressing these issues I could start
following?  The sink trigger doc seems like a start, but also seems like
just a starting point in a larger re-architecture of sinks.

On Fri, Feb 15, 2019 at 4:34 PM Kenneth Knowles <kl...@google.com> wrote:

>
>
> On Wed, Feb 13, 2019 at 3:11 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Wed, Feb 13, 2019 at 11:39 PM Steve Niemitz <sn...@twitter.com>
>> wrote:
>>
>>>
>>> On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz <sn...@apache.org>
>>>> wrote:
>>>>
>>>>> Thanks again for the answers so far!  I really appreciate it.  As for
>>>>> my specific use-case, we're using Bigtable as the final sink, and I'd
>>>>> prefer to keep our writes fully idempotent for other reasons (ie no
>>>>> read-modify-write).  We actually do track tentative vs final values
>>>>> already, but checking that at write-time would impose a pretty big overhead
>>>>> in the write path.
>>>>>
>>>>> After this I actually instrumented one of my running pipelines to
>>>>> detect these "time traveling" panes, and did see it occurring pretty
>>>>> frequently, particularly when dataflow decides to scale up/down the job, so
>>>>> that was interesting.
>>>>>
>>>>> From all this, it seems like using a stateful DoFn to prevent time
>>>>> traveling panes from overwriting newer ones is the best solution for now.
>>>>>
>>>>
>>>> Note that you can't "filter out" these time traveling panes, because at
>>>> the next fusion break they might get re-ordered again.
>>>>
>>>
>>> Ack, in a general sense.  To solve my specific problem my plan was to
>>> ensure the final writer sink would be fused to this filter step (or even
>>> build it directly into the DoFn itself that does the write), which would
>>> work in my specific case (it seems like at least).
>>>
>>>
>>>>
>>>>
>>>>> My last question / statement is just around general education and
>>>>> documentation about this.  I think the fact that PCollection are unordered
>>>>> makes sense and is pretty intuitive, but fired panes being delivered
>>>>> out-of-order seems very surprising.  I'm curious how many other pipelines
>>>>> exist that run into this (and produce incorrect results!) but people are
>>>>> unaware of.  Is there a way we can call this behavior out?  For example,
>>>>> many of the sample beam projects use early firings, but there's never any
>>>>> mention that the output may be out-of-order.
>>>>>
>>>>
>>>> +1 to improving the documentation here. Basically multiple firings
>>>> become independent elements of the resulting PCollection, they don't retain
>>>> any association/ordering.
>>>>
>>>> Multiply-triggered window are difficult to reason about (and not just
>>>> in this case), https://s.apache.org/beam-sink-triggers is IMHO the
>>>> right answer.
>>>>
>>>
>>> I was reading this yesterday, but couldn't see how it solved the
>>> out-of-order delivery problem here.  I do like the overall direction its
>>> proposing though, from my work with triggers so far I have found them very
>>> difficult to reason about (like you said).
>>>
>>
>> It moves the responsibility of doing things in the right order (and even
>> defining what order is "correct enough") to the runner (and sinks) such
>> that the side effects happen in order, even if all the processing did not.
>> To be clear there's still a fair amount of design to make that doc into a
>> workable system...
>>
>
> With or without sink triggers, transforms that write need to be
> pane-index-aware. The outputs themselves may be out of order, but they have
> sequence numbers on them, so sinks likely need to be made stateful so they
> can be idempotent in the face of reordering.
>
> Kenn
>
>
>>
>>
>>>
>>>
>>>>
>>>>
>>>>> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <sn...@twitter.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > wow, thats super unexpected and dangerous, thanks for clarifying!
>>>>>> Time to go re-think how we do some of our writes w/ early firings then.
>>>>>> >
>>>>>> > Are there any workarounds to make things happen in-order in
>>>>>> dataflow?  eg if the sink gets fused to the output of the GBK operation,
>>>>>> will it always happen effectively in order (per key) even though it's not a
>>>>>> guarantee?
>>>>>>
>>>>>> If things get fused, yes. Note that sinks themselves sometimes have
>>>>>> fusion barriers though.
>>>>>>
>>>>>> > I also guess I could keep track of the last pane index my sink has
>>>>>> seen, and ignore earlier ones (using state to keep track)?
>>>>>>
>>>>>> Yes, that would work.
>>>>>>
>>>>>> What kind of sink are you using? If it supports read-modify-write or
>>>>>> some kind of transaction you may be able to mark early results as tentative
>>>>>> (which would be useful anyway) and only overwrite tentative ones.
>>>>>>
>>>>>>
>>>>>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <
>>>>>> robertwb@google.com> wrote:
>>>>>> >>
>>>>>> >> Correct, even within the same key there's no promise of event time
>>>>>> ordering mapping of panes to real time ordering because the downstream
>>>>>> operations may happen on a different machine. Multiply triggered windows
>>>>>> add an element of non-determinism to the process.
>>>>>> >>
>>>>>> >> You're also correct that triggering with multiple panes requires
>>>>>> lots of care, especially when it comes to operations with side effects
>>>>>> (like sinks). Most safe is to only write the final pane to the sink, and
>>>>>> handle early triggering in a different way.
>>>>>> https://s.apache.org/beam-sink-triggers is a proposal to make this
>>>>>> easier to reason about.
>>>>>> >>
>>>>>> >>
>>>>>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sn...@apache.org>
>>>>>> wrote:
>>>>>> >>>
>>>>>> >>> Also to clarify here (I re-read this and realized it could be
>>>>>> slightly unclear).  My question is only about in-order delivery of panes.
>>>>>>  ie: will pane P always be delivered before P+1.
>>>>>> >>>
>>>>>> >>> I realize the use of "in-order" before could be confusing, I
>>>>>> don't care about the ordering of the elements per-se, just the ordering of
>>>>>> the pane delivery.
>>>>>> >>>
>>>>>> >>> I want to make sure that given a GBK that produces 3 panes (P0,
>>>>>> P1, P2) for a key, a downstream PCollection could never see P0, P2, P1.  OR
>>>>>> at least, the final firing is always guaranteed to be delivered after all
>>>>>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>>>>>> >>>
>>>>>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <
>>>>>> sniemitz@apache.org> wrote:
>>>>>> >>>>
>>>>>> >>>> Are you also saying also that even in the first example (Source
>>>>>> -> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>>>>>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>>>>>> pretty big "got-cha" for correctness if you ever use accumulating
>>>>>> triggering.
>>>>>> >>>>
>>>>>> >>>> I'd also like to point out I'm not talking about a global
>>>>>> ordering across the entire PCollection, I'm talking about within the same
>>>>>> key after a GBK transform.
>>>>>> >>>>
>>>>>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <
>>>>>> robertwb@google.com> wrote:
>>>>>> >>>>>
>>>>>> >>>>> Due to the nature of distributed processing, order is not
>>>>>> preserved. You can, however, inspect the PaneInfo to determine if an
>>>>>> element was early, on-time, or late and act accordingly.
>>>>>> >>>>>
>>>>>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <
>>>>>> jcgarciam@gmail.com> wrote:
>>>>>> >>>>>>
>>>>>> >>>>>> In my experience ordering is not guaranteed, you may need
>>>>>> apply a transformation that sort the elements and then dispatch them sorted
>>>>>> out.
>>>>>> >>>>>>
>>>>>> >>>>>> Or uses the Sorter extension for this:
>>>>>> >>>>>>
>>>>>> >>>>>>
>>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>>> >>>>>>
>>>>>> >>>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb.
>>>>>> 2019, 16:31:
>>>>>> >>>>>>>
>>>>>> >>>>>>> Hi everyone, I have some questions I want to ask about how
>>>>>> windowing, triggering, and panes work together, and how to ensure
>>>>>> correctness throughout a pipeline.
>>>>>> >>>>>>>
>>>>>> >>>>>>> Lets assume I have a very simple streaming pipeline that
>>>>>> looks like:
>>>>>> >>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>>> >>>>>>>
>>>>>> >>>>>>> Given fixed windows of 1 hour, early firings every minute,
>>>>>> and accumulating panes, this is pretty straight forward.  However, this can
>>>>>> get more complicated if we add steps after the CombineByKey, for instance
>>>>>> (using the same windowing strategy):
>>>>>> >>>>>>>
>>>>>> >>>>>>> Say I want to buffer the results of the CombineByKey into
>>>>>> batches of N elements.  I can do this with the built-in GroupIntoBatches
>>>>>> [1] transform, now my pipeline looks like:
>>>>>> >>>>>>>
>>>>>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>>> >>>>>>>
>>>>>> >>>>>>> This leads to my main question:
>>>>>> >>>>>>> Is ordering preserved somehow here?  ie: is it possible that
>>>>>> the result from early firing F+1 now comes BEFORE the firing F (because it
>>>>>> was re-ordered in the GroupIntoBatches).  This would mean that the sink
>>>>>> then gets F+1 before F, which means my resulting store has incorrect data
>>>>>> (possibly forever if F+1 was the final firing).
>>>>>> >>>>>>>
>>>>>> >>>>>>> If ordering is not preserved, it seems as if I'd need to
>>>>>> introduce my own ordering back in after GroupIntoBatches.  GIB is an
>>>>>> example here, but I imagine this could happen with any GBK type operation.
>>>>>> >>>>>>>
>>>>>> >>>>>>> Am I thinking about this the correct way?  Thanks!
>>>>>> >>>>>>>
>>>>>> >>>>>>> [1]
>>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>>>>
>>>>>>>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Kenneth Knowles <kl...@google.com>.
On Wed, Feb 13, 2019 at 3:11 PM Robert Bradshaw <ro...@google.com> wrote:

> On Wed, Feb 13, 2019 at 11:39 PM Steve Niemitz <sn...@twitter.com>
> wrote:
>
>>
>> On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz <sn...@apache.org>
>>> wrote:
>>>
>>>> Thanks again for the answers so far!  I really appreciate it.  As for
>>>> my specific use-case, we're using Bigtable as the final sink, and I'd
>>>> prefer to keep our writes fully idempotent for other reasons (ie no
>>>> read-modify-write).  We actually do track tentative vs final values
>>>> already, but checking that at write-time would impose a pretty big overhead
>>>> in the write path.
>>>>
>>>> After this I actually instrumented one of my running pipelines to
>>>> detect these "time traveling" panes, and did see it occurring pretty
>>>> frequently, particularly when dataflow decides to scale up/down the job, so
>>>> that was interesting.
>>>>
>>>> From all this, it seems like using a stateful DoFn to prevent time
>>>> traveling panes from overwriting newer ones is the best solution for now.
>>>>
>>>
>>> Note that you can't "filter out" these time traveling panes, because at
>>> the next fusion break they might get re-ordered again.
>>>
>>
>> Ack, in a general sense.  To solve my specific problem my plan was to
>> ensure the final writer sink would be fused to this filter step (or even
>> build it directly into the DoFn itself that does the write), which would
>> work in my specific case (it seems like at least).
>>
>>
>>>
>>>
>>>> My last question / statement is just around general education and
>>>> documentation about this.  I think the fact that PCollection are unordered
>>>> makes sense and is pretty intuitive, but fired panes being delivered
>>>> out-of-order seems very surprising.  I'm curious how many other pipelines
>>>> exist that run into this (and produce incorrect results!) but people are
>>>> unaware of.  Is there a way we can call this behavior out?  For example,
>>>> many of the sample beam projects use early firings, but there's never any
>>>> mention that the output may be out-of-order.
>>>>
>>>
>>> +1 to improving the documentation here. Basically multiple firings
>>> become independent elements of the resulting PCollection, they don't retain
>>> any association/ordering.
>>>
>>> Multiply-triggered window are difficult to reason about (and not just in
>>> this case), https://s.apache.org/beam-sink-triggers is IMHO the right
>>> answer.
>>>
>>
>> I was reading this yesterday, but couldn't see how it solved the
>> out-of-order delivery problem here.  I do like the overall direction its
>> proposing though, from my work with triggers so far I have found them very
>> difficult to reason about (like you said).
>>
>
> It moves the responsibility of doing things in the right order (and even
> defining what order is "correct enough") to the runner (and sinks) such
> that the side effects happen in order, even if all the processing did not.
> To be clear there's still a fair amount of design to make that doc into a
> workable system...
>

With or without sink triggers, transforms that write need to be
pane-index-aware. The outputs themselves may be out of order, but they have
sequence numbers on them, so sinks likely need to be made stateful so they
can be idempotent in the face of reordering.

Kenn


>
>
>>
>>
>>>
>>>
>>>> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <sn...@twitter.com>
>>>>> wrote:
>>>>> >
>>>>> > wow, thats super unexpected and dangerous, thanks for clarifying!
>>>>> Time to go re-think how we do some of our writes w/ early firings then.
>>>>> >
>>>>> > Are there any workarounds to make things happen in-order in
>>>>> dataflow?  eg if the sink gets fused to the output of the GBK operation,
>>>>> will it always happen effectively in order (per key) even though it's not a
>>>>> guarantee?
>>>>>
>>>>> If things get fused, yes. Note that sinks themselves sometimes have
>>>>> fusion barriers though.
>>>>>
>>>>> > I also guess I could keep track of the last pane index my sink has
>>>>> seen, and ignore earlier ones (using state to keep track)?
>>>>>
>>>>> Yes, that would work.
>>>>>
>>>>> What kind of sink are you using? If it supports read-modify-write or
>>>>> some kind of transaction you may be able to mark early results as tentative
>>>>> (which would be useful anyway) and only overwrite tentative ones.
>>>>>
>>>>>
>>>>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>> >>
>>>>> >> Correct, even within the same key there's no promise of event time
>>>>> ordering mapping of panes to real time ordering because the downstream
>>>>> operations may happen on a different machine. Multiply triggered windows
>>>>> add an element of non-determinism to the process.
>>>>> >>
>>>>> >> You're also correct that triggering with multiple panes requires
>>>>> lots of care, especially when it comes to operations with side effects
>>>>> (like sinks). Most safe is to only write the final pane to the sink, and
>>>>> handle early triggering in a different way.
>>>>> https://s.apache.org/beam-sink-triggers is a proposal to make this
>>>>> easier to reason about.
>>>>> >>
>>>>> >>
>>>>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sn...@apache.org>
>>>>> wrote:
>>>>> >>>
>>>>> >>> Also to clarify here (I re-read this and realized it could be
>>>>> slightly unclear).  My question is only about in-order delivery of panes.
>>>>>  ie: will pane P always be delivered before P+1.
>>>>> >>>
>>>>> >>> I realize the use of "in-order" before could be confusing, I don't
>>>>> care about the ordering of the elements per-se, just the ordering of the
>>>>> pane delivery.
>>>>> >>>
>>>>> >>> I want to make sure that given a GBK that produces 3 panes (P0,
>>>>> P1, P2) for a key, a downstream PCollection could never see P0, P2, P1.  OR
>>>>> at least, the final firing is always guaranteed to be delivered after all
>>>>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>>>>> >>>
>>>>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <
>>>>> sniemitz@apache.org> wrote:
>>>>> >>>>
>>>>> >>>> Are you also saying also that even in the first example (Source
>>>>> -> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>>>>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>>>>> pretty big "got-cha" for correctness if you ever use accumulating
>>>>> triggering.
>>>>> >>>>
>>>>> >>>> I'd also like to point out I'm not talking about a global
>>>>> ordering across the entire PCollection, I'm talking about within the same
>>>>> key after a GBK transform.
>>>>> >>>>
>>>>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <
>>>>> robertwb@google.com> wrote:
>>>>> >>>>>
>>>>> >>>>> Due to the nature of distributed processing, order is not
>>>>> preserved. You can, however, inspect the PaneInfo to determine if an
>>>>> element was early, on-time, or late and act accordingly.
>>>>> >>>>>
>>>>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <
>>>>> jcgarciam@gmail.com> wrote:
>>>>> >>>>>>
>>>>> >>>>>> In my experience ordering is not guaranteed, you may need apply
>>>>> a transformation that sort the elements and then dispatch them sorted out.
>>>>> >>>>>>
>>>>> >>>>>> Or uses the Sorter extension for this:
>>>>> >>>>>>
>>>>> >>>>>>
>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>> >>>>>>
>>>>> >>>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb.
>>>>> 2019, 16:31:
>>>>> >>>>>>>
>>>>> >>>>>>> Hi everyone, I have some questions I want to ask about how
>>>>> windowing, triggering, and panes work together, and how to ensure
>>>>> correctness throughout a pipeline.
>>>>> >>>>>>>
>>>>> >>>>>>> Lets assume I have a very simple streaming pipeline that looks
>>>>> like:
>>>>> >>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>> >>>>>>>
>>>>> >>>>>>> Given fixed windows of 1 hour, early firings every minute, and
>>>>> accumulating panes, this is pretty straight forward.  However, this can get
>>>>> more complicated if we add steps after the CombineByKey, for instance
>>>>> (using the same windowing strategy):
>>>>> >>>>>>>
>>>>> >>>>>>> Say I want to buffer the results of the CombineByKey into
>>>>> batches of N elements.  I can do this with the built-in GroupIntoBatches
>>>>> [1] transform, now my pipeline looks like:
>>>>> >>>>>>>
>>>>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>> >>>>>>>
>>>>> >>>>>>> This leads to my main question:
>>>>> >>>>>>> Is ordering preserved somehow here?  ie: is it possible that
>>>>> the result from early firing F+1 now comes BEFORE the firing F (because it
>>>>> was re-ordered in the GroupIntoBatches).  This would mean that the sink
>>>>> then gets F+1 before F, which means my resulting store has incorrect data
>>>>> (possibly forever if F+1 was the final firing).
>>>>> >>>>>>>
>>>>> >>>>>>> If ordering is not preserved, it seems as if I'd need to
>>>>> introduce my own ordering back in after GroupIntoBatches.  GIB is an
>>>>> example here, but I imagine this could happen with any GBK type operation.
>>>>> >>>>>>>
>>>>> >>>>>>> Am I thinking about this the correct way?  Thanks!
>>>>> >>>>>>>
>>>>> >>>>>>> [1]
>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>>>
>>>>>>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Feb 13, 2019 at 11:39 PM Steve Niemitz <sn...@twitter.com> wrote:

>
> On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz <sn...@apache.org>
>> wrote:
>>
>>> Thanks again for the answers so far!  I really appreciate it.  As for my
>>> specific use-case, we're using Bigtable as the final sink, and I'd prefer
>>> to keep our writes fully idempotent for other reasons (ie no
>>> read-modify-write).  We actually do track tentative vs final values
>>> already, but checking that at write-time would impose a pretty big overhead
>>> in the write path.
>>>
>>> After this I actually instrumented one of my running pipelines to detect
>>> these "time traveling" panes, and did see it occurring pretty frequently,
>>> particularly when dataflow decides to scale up/down the job, so that was
>>> interesting.
>>>
>>> From all this, it seems like using a stateful DoFn to prevent time
>>> traveling panes from overwriting newer ones is the best solution for now.
>>>
>>
>> Note that you can't "filter out" these time traveling panes, because at
>> the next fusion break they might get re-ordered again.
>>
>
> Ack, in a general sense.  To solve my specific problem my plan was to
> ensure the final writer sink would be fused to this filter step (or even
> build it directly into the DoFn itself that does the write), which would
> work in my specific case (it seems like at least).
>
>
>>
>>
>>> My last question / statement is just around general education and
>>> documentation about this.  I think the fact that PCollection are unordered
>>> makes sense and is pretty intuitive, but fired panes being delivered
>>> out-of-order seems very surprising.  I'm curious how many other pipelines
>>> exist that run into this (and produce incorrect results!) but people are
>>> unaware of.  Is there a way we can call this behavior out?  For example,
>>> many of the sample beam projects use early firings, but there's never any
>>> mention that the output may be out-of-order.
>>>
>>
>> +1 to improving the documentation here. Basically multiple firings become
>> independent elements of the resulting PCollection, they don't retain any
>> association/ordering.
>>
>> Multiply-triggered window are difficult to reason about (and not just in
>> this case), https://s.apache.org/beam-sink-triggers is IMHO the right
>> answer.
>>
>
> I was reading this yesterday, but couldn't see how it solved the
> out-of-order delivery problem here.  I do like the overall direction its
> proposing though, from my work with triggers so far I have found them very
> difficult to reason about (like you said).
>

It moves the responsibility of doing things in the right order (and even
defining what order is "correct enough") to the runner (and sinks) such
that the side effects happen in order, even if all the processing did not.
To be clear there's still a fair amount of design to make that doc into a
workable system...


>
>
>>
>>
>>> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <sn...@twitter.com>
>>>> wrote:
>>>> >
>>>> > wow, thats super unexpected and dangerous, thanks for clarifying!
>>>> Time to go re-think how we do some of our writes w/ early firings then.
>>>> >
>>>> > Are there any workarounds to make things happen in-order in dataflow?
>>>>  eg if the sink gets fused to the output of the GBK operation, will it
>>>> always happen effectively in order (per key) even though it's not a
>>>> guarantee?
>>>>
>>>> If things get fused, yes. Note that sinks themselves sometimes have
>>>> fusion barriers though.
>>>>
>>>> > I also guess I could keep track of the last pane index my sink has
>>>> seen, and ignore earlier ones (using state to keep track)?
>>>>
>>>> Yes, that would work.
>>>>
>>>> What kind of sink are you using? If it supports read-modify-write or
>>>> some kind of transaction you may be able to mark early results as tentative
>>>> (which would be useful anyway) and only overwrite tentative ones.
>>>>
>>>>
>>>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>> >>
>>>> >> Correct, even within the same key there's no promise of event time
>>>> ordering mapping of panes to real time ordering because the downstream
>>>> operations may happen on a different machine. Multiply triggered windows
>>>> add an element of non-determinism to the process.
>>>> >>
>>>> >> You're also correct that triggering with multiple panes requires
>>>> lots of care, especially when it comes to operations with side effects
>>>> (like sinks). Most safe is to only write the final pane to the sink, and
>>>> handle early triggering in a different way.
>>>> https://s.apache.org/beam-sink-triggers is a proposal to make this
>>>> easier to reason about.
>>>> >>
>>>> >>
>>>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sn...@apache.org>
>>>> wrote:
>>>> >>>
>>>> >>> Also to clarify here (I re-read this and realized it could be
>>>> slightly unclear).  My question is only about in-order delivery of panes.
>>>>  ie: will pane P always be delivered before P+1.
>>>> >>>
>>>> >>> I realize the use of "in-order" before could be confusing, I don't
>>>> care about the ordering of the elements per-se, just the ordering of the
>>>> pane delivery.
>>>> >>>
>>>> >>> I want to make sure that given a GBK that produces 3 panes (P0, P1,
>>>> P2) for a key, a downstream PCollection could never see P0, P2, P1.  OR at
>>>> least, the final firing is always guaranteed to be delivered after all
>>>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>>>> >>>
>>>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <sn...@apache.org>
>>>> wrote:
>>>> >>>>
>>>> >>>> Are you also saying also that even in the first example (Source ->
>>>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>>>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>>>> pretty big "got-cha" for correctness if you ever use accumulating
>>>> triggering.
>>>> >>>>
>>>> >>>> I'd also like to point out I'm not talking about a global ordering
>>>> across the entire PCollection, I'm talking about within the same key after
>>>> a GBK transform.
>>>> >>>>
>>>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <
>>>> robertwb@google.com> wrote:
>>>> >>>>>
>>>> >>>>> Due to the nature of distributed processing, order is not
>>>> preserved. You can, however, inspect the PaneInfo to determine if an
>>>> element was early, on-time, or late and act accordingly.
>>>> >>>>>
>>>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <
>>>> jcgarciam@gmail.com> wrote:
>>>> >>>>>>
>>>> >>>>>> In my experience ordering is not guaranteed, you may need apply
>>>> a transformation that sort the elements and then dispatch them sorted out.
>>>> >>>>>>
>>>> >>>>>> Or uses the Sorter extension for this:
>>>> >>>>>>
>>>> >>>>>>
>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>> >>>>>>
>>>> >>>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb.
>>>> 2019, 16:31:
>>>> >>>>>>>
>>>> >>>>>>> Hi everyone, I have some questions I want to ask about how
>>>> windowing, triggering, and panes work together, and how to ensure
>>>> correctness throughout a pipeline.
>>>> >>>>>>>
>>>> >>>>>>> Lets assume I have a very simple streaming pipeline that looks
>>>> like:
>>>> >>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>> >>>>>>>
>>>> >>>>>>> Given fixed windows of 1 hour, early firings every minute, and
>>>> accumulating panes, this is pretty straight forward.  However, this can get
>>>> more complicated if we add steps after the CombineByKey, for instance
>>>> (using the same windowing strategy):
>>>> >>>>>>>
>>>> >>>>>>> Say I want to buffer the results of the CombineByKey into
>>>> batches of N elements.  I can do this with the built-in GroupIntoBatches
>>>> [1] transform, now my pipeline looks like:
>>>> >>>>>>>
>>>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>> >>>>>>>
>>>> >>>>>>> This leads to my main question:
>>>> >>>>>>> Is ordering preserved somehow here?  ie: is it possible that
>>>> the result from early firing F+1 now comes BEFORE the firing F (because it
>>>> was re-ordered in the GroupIntoBatches).  This would mean that the sink
>>>> then gets F+1 before F, which means my resulting store has incorrect data
>>>> (possibly forever if F+1 was the final firing).
>>>> >>>>>>>
>>>> >>>>>>> If ordering is not preserved, it seems as if I'd need to
>>>> introduce my own ordering back in after GroupIntoBatches.  GIB is an
>>>> example here, but I imagine this could happen with any GBK type operation.
>>>> >>>>>>>
>>>> >>>>>>> Am I thinking about this the correct way?  Thanks!
>>>> >>>>>>>
>>>> >>>>>>> [1]
>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>>
>>>>>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Steve Niemitz <sn...@twitter.com>.
On Wed, Feb 13, 2019 at 5:01 PM Robert Bradshaw <ro...@google.com> wrote:

> On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz <sn...@apache.org> wrote:
>
>> Thanks again for the answers so far!  I really appreciate it.  As for my
>> specific use-case, we're using Bigtable as the final sink, and I'd prefer
>> to keep our writes fully idempotent for other reasons (ie no
>> read-modify-write).  We actually do track tentative vs final values
>> already, but checking that at write-time would impose a pretty big overhead
>> in the write path.
>>
>> After this I actually instrumented one of my running pipelines to detect
>> these "time traveling" panes, and did see it occurring pretty frequently,
>> particularly when dataflow decides to scale up/down the job, so that was
>> interesting.
>>
>> From all this, it seems like using a stateful DoFn to prevent time
>> traveling panes from overwriting newer ones is the best solution for now.
>>
>
> Note that you can't "filter out" these time traveling panes, because at
> the next fusion break they might get re-ordered again.
>

Ack, in a general sense.  To solve my specific problem my plan was to
ensure the final writer sink would be fused to this filter step (or even
build it directly into the DoFn itself that does the write), which would
work in my specific case (it seems like at least).


>
>
>> My last question / statement is just around general education and
>> documentation about this.  I think the fact that PCollection are unordered
>> makes sense and is pretty intuitive, but fired panes being delivered
>> out-of-order seems very surprising.  I'm curious how many other pipelines
>> exist that run into this (and produce incorrect results!) but people are
>> unaware of.  Is there a way we can call this behavior out?  For example,
>> many of the sample beam projects use early firings, but there's never any
>> mention that the output may be out-of-order.
>>
>
> +1 to improving the documentation here. Basically multiple firings become
> independent elements of the resulting PCollection, they don't retain any
> association/ordering.
>
> Multiply-triggered window are difficult to reason about (and not just in
> this case), https://s.apache.org/beam-sink-triggers is IMHO the right
> answer.
>

I was reading this yesterday, but couldn't see how it solved the
out-of-order delivery problem here.  I do like the overall direction its
proposing though, from my work with triggers so far I have found them very
difficult to reason about (like you said).


>
>
>> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <sn...@twitter.com>
>>> wrote:
>>> >
>>> > wow, thats super unexpected and dangerous, thanks for clarifying!
>>> Time to go re-think how we do some of our writes w/ early firings then.
>>> >
>>> > Are there any workarounds to make things happen in-order in dataflow?
>>>  eg if the sink gets fused to the output of the GBK operation, will it
>>> always happen effectively in order (per key) even though it's not a
>>> guarantee?
>>>
>>> If things get fused, yes. Note that sinks themselves sometimes have
>>> fusion barriers though.
>>>
>>> > I also guess I could keep track of the last pane index my sink has
>>> seen, and ignore earlier ones (using state to keep track)?
>>>
>>> Yes, that would work.
>>>
>>> What kind of sink are you using? If it supports read-modify-write or
>>> some kind of transaction you may be able to mark early results as tentative
>>> (which would be useful anyway) and only overwrite tentative ones.
>>>
>>>
>>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>> >>
>>> >> Correct, even within the same key there's no promise of event time
>>> ordering mapping of panes to real time ordering because the downstream
>>> operations may happen on a different machine. Multiply triggered windows
>>> add an element of non-determinism to the process.
>>> >>
>>> >> You're also correct that triggering with multiple panes requires lots
>>> of care, especially when it comes to operations with side effects (like
>>> sinks). Most safe is to only write the final pane to the sink, and handle
>>> early triggering in a different way.
>>> https://s.apache.org/beam-sink-triggers is a proposal to make this
>>> easier to reason about.
>>> >>
>>> >>
>>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sn...@apache.org>
>>> wrote:
>>> >>>
>>> >>> Also to clarify here (I re-read this and realized it could be
>>> slightly unclear).  My question is only about in-order delivery of panes.
>>>  ie: will pane P always be delivered before P+1.
>>> >>>
>>> >>> I realize the use of "in-order" before could be confusing, I don't
>>> care about the ordering of the elements per-se, just the ordering of the
>>> pane delivery.
>>> >>>
>>> >>> I want to make sure that given a GBK that produces 3 panes (P0, P1,
>>> P2) for a key, a downstream PCollection could never see P0, P2, P1.  OR at
>>> least, the final firing is always guaranteed to be delivered after all
>>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>>> >>>
>>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <sn...@apache.org>
>>> wrote:
>>> >>>>
>>> >>>> Are you also saying also that even in the first example (Source ->
>>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>>> pretty big "got-cha" for correctness if you ever use accumulating
>>> triggering.
>>> >>>>
>>> >>>> I'd also like to point out I'm not talking about a global ordering
>>> across the entire PCollection, I'm talking about within the same key after
>>> a GBK transform.
>>> >>>>
>>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <
>>> robertwb@google.com> wrote:
>>> >>>>>
>>> >>>>> Due to the nature of distributed processing, order is not
>>> preserved. You can, however, inspect the PaneInfo to determine if an
>>> element was early, on-time, or late and act accordingly.
>>> >>>>>
>>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <
>>> jcgarciam@gmail.com> wrote:
>>> >>>>>>
>>> >>>>>> In my experience ordering is not guaranteed, you may need apply a
>>> transformation that sort the elements and then dispatch them sorted out.
>>> >>>>>>
>>> >>>>>> Or uses the Sorter extension for this:
>>> >>>>>>
>>> >>>>>>
>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>> >>>>>>
>>> >>>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb.
>>> 2019, 16:31:
>>> >>>>>>>
>>> >>>>>>> Hi everyone, I have some questions I want to ask about how
>>> windowing, triggering, and panes work together, and how to ensure
>>> correctness throughout a pipeline.
>>> >>>>>>>
>>> >>>>>>> Lets assume I have a very simple streaming pipeline that looks
>>> like:
>>> >>>>>>> Source -> CombineByKey (Sum) -> Sink
>>> >>>>>>>
>>> >>>>>>> Given fixed windows of 1 hour, early firings every minute, and
>>> accumulating panes, this is pretty straight forward.  However, this can get
>>> more complicated if we add steps after the CombineByKey, for instance
>>> (using the same windowing strategy):
>>> >>>>>>>
>>> >>>>>>> Say I want to buffer the results of the CombineByKey into
>>> batches of N elements.  I can do this with the built-in GroupIntoBatches
>>> [1] transform, now my pipeline looks like:
>>> >>>>>>>
>>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>> >>>>>>>
>>> >>>>>>> This leads to my main question:
>>> >>>>>>> Is ordering preserved somehow here?  ie: is it possible that the
>>> result from early firing F+1 now comes BEFORE the firing F (because it was
>>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>>> gets F+1 before F, which means my resulting store has incorrect data
>>> (possibly forever if F+1 was the final firing).
>>> >>>>>>>
>>> >>>>>>> If ordering is not preserved, it seems as if I'd need to
>>> introduce my own ordering back in after GroupIntoBatches.  GIB is an
>>> example here, but I imagine this could happen with any GBK type operation.
>>> >>>>>>>
>>> >>>>>>> Am I thinking about this the correct way?  Thanks!
>>> >>>>>>>
>>> >>>>>>> [1]
>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>
>>>>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Feb 13, 2019 at 5:07 PM Steve Niemitz <sn...@apache.org> wrote:

> Thanks again for the answers so far!  I really appreciate it.  As for my
> specific use-case, we're using Bigtable as the final sink, and I'd prefer
> to keep our writes fully idempotent for other reasons (ie no
> read-modify-write).  We actually do track tentative vs final values
> already, but checking that at write-time would impose a pretty big overhead
> in the write path.
>
> After this I actually instrumented one of my running pipelines to detect
> these "time traveling" panes, and did see it occurring pretty frequently,
> particularly when dataflow decides to scale up/down the job, so that was
> interesting.
>
> From all this, it seems like using a stateful DoFn to prevent time
> traveling panes from overwriting newer ones is the best solution for now.
>

Note that you can't "filter out" these time traveling panes, because at the
next fusion break they might get re-ordered again.


> My last question / statement is just around general education and
> documentation about this.  I think the fact that PCollection are unordered
> makes sense and is pretty intuitive, but fired panes being delivered
> out-of-order seems very surprising.  I'm curious how many other pipelines
> exist that run into this (and produce incorrect results!) but people are
> unaware of.  Is there a way we can call this behavior out?  For example,
> many of the sample beam projects use early firings, but there's never any
> mention that the output may be out-of-order.
>

+1 to improving the documentation here. Basically multiple firings become
independent elements of the resulting PCollection, they don't retain any
association/ordering.

Multiply-triggered window are difficult to reason about (and not just in
this case), https://s.apache.org/beam-sink-triggers is IMHO the right
answer.


> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <sn...@twitter.com>
>> wrote:
>> >
>> > wow, thats super unexpected and dangerous, thanks for clarifying!  Time
>> to go re-think how we do some of our writes w/ early firings then.
>> >
>> > Are there any workarounds to make things happen in-order in dataflow?
>>  eg if the sink gets fused to the output of the GBK operation, will it
>> always happen effectively in order (per key) even though it's not a
>> guarantee?
>>
>> If things get fused, yes. Note that sinks themselves sometimes have
>> fusion barriers though.
>>
>> > I also guess I could keep track of the last pane index my sink has
>> seen, and ignore earlier ones (using state to keep track)?
>>
>> Yes, that would work.
>>
>> What kind of sink are you using? If it supports read-modify-write or some
>> kind of transaction you may be able to mark early results as tentative
>> (which would be useful anyway) and only overwrite tentative ones.
>>
>>
>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>
>> >> Correct, even within the same key there's no promise of event time
>> ordering mapping of panes to real time ordering because the downstream
>> operations may happen on a different machine. Multiply triggered windows
>> add an element of non-determinism to the process.
>> >>
>> >> You're also correct that triggering with multiple panes requires lots
>> of care, especially when it comes to operations with side effects (like
>> sinks). Most safe is to only write the final pane to the sink, and handle
>> early triggering in a different way.
>> https://s.apache.org/beam-sink-triggers is a proposal to make this
>> easier to reason about.
>> >>
>> >>
>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sn...@apache.org>
>> wrote:
>> >>>
>> >>> Also to clarify here (I re-read this and realized it could be
>> slightly unclear).  My question is only about in-order delivery of panes.
>>  ie: will pane P always be delivered before P+1.
>> >>>
>> >>> I realize the use of "in-order" before could be confusing, I don't
>> care about the ordering of the elements per-se, just the ordering of the
>> pane delivery.
>> >>>
>> >>> I want to make sure that given a GBK that produces 3 panes (P0, P1,
>> P2) for a key, a downstream PCollection could never see P0, P2, P1.  OR at
>> least, the final firing is always guaranteed to be delivered after all
>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>> >>>
>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <sn...@apache.org>
>> wrote:
>> >>>>
>> >>>> Are you also saying also that even in the first example (Source ->
>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>> pretty big "got-cha" for correctness if you ever use accumulating
>> triggering.
>> >>>>
>> >>>> I'd also like to point out I'm not talking about a global ordering
>> across the entire PCollection, I'm talking about within the same key after
>> a GBK transform.
>> >>>>
>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >>>>>
>> >>>>> Due to the nature of distributed processing, order is not
>> preserved. You can, however, inspect the PaneInfo to determine if an
>> element was early, on-time, or late and act accordingly.
>> >>>>>
>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <
>> jcgarciam@gmail.com> wrote:
>> >>>>>>
>> >>>>>> In my experience ordering is not guaranteed, you may need apply a
>> transformation that sort the elements and then dispatch them sorted out.
>> >>>>>>
>> >>>>>> Or uses the Sorter extension for this:
>> >>>>>>
>> >>>>>>
>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>> >>>>>>
>> >>>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb.
>> 2019, 16:31:
>> >>>>>>>
>> >>>>>>> Hi everyone, I have some questions I want to ask about how
>> windowing, triggering, and panes work together, and how to ensure
>> correctness throughout a pipeline.
>> >>>>>>>
>> >>>>>>> Lets assume I have a very simple streaming pipeline that looks
>> like:
>> >>>>>>> Source -> CombineByKey (Sum) -> Sink
>> >>>>>>>
>> >>>>>>> Given fixed windows of 1 hour, early firings every minute, and
>> accumulating panes, this is pretty straight forward.  However, this can get
>> more complicated if we add steps after the CombineByKey, for instance
>> (using the same windowing strategy):
>> >>>>>>>
>> >>>>>>> Say I want to buffer the results of the CombineByKey into batches
>> of N elements.  I can do this with the built-in GroupIntoBatches [1]
>> transform, now my pipeline looks like:
>> >>>>>>>
>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>> >>>>>>>
>> >>>>>>> This leads to my main question:
>> >>>>>>> Is ordering preserved somehow here?  ie: is it possible that the
>> result from early firing F+1 now comes BEFORE the firing F (because it was
>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>> gets F+1 before F, which means my resulting store has incorrect data
>> (possibly forever if F+1 was the final firing).
>> >>>>>>>
>> >>>>>>> If ordering is not preserved, it seems as if I'd need to
>> introduce my own ordering back in after GroupIntoBatches.  GIB is an
>> example here, but I imagine this could happen with any GBK type operation.
>> >>>>>>>
>> >>>>>>> Am I thinking about this the correct way?  Thanks!
>> >>>>>>>
>> >>>>>>> [1]
>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>
>>>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Chamikara Jayalath <ch...@google.com>.
On Wed, Feb 13, 2019 at 8:07 AM Steve Niemitz <sn...@apache.org> wrote:

> Thanks again for the answers so far!  I really appreciate it.  As for my
> specific use-case, we're using Bigtable as the final sink, and I'd prefer
> to keep our writes fully idempotent for other reasons (ie no
> read-modify-write).  We actually do track tentative vs final values
> already, but checking that at write-time would impose a pretty big overhead
> in the write path.
>

I think writes to BigTable are made idempotent using row keys. For example,
bundle retries will not result in duplicate data since basically we are
mutating the same row twice. But if you update the same row twice within
the same stage with different values, ordering is not guaranteed.


>
> After this I actually instrumented one of my running pipelines to detect
> these "time traveling" panes, and did see it occurring pretty frequently,
> particularly when dataflow decides to scale up/down the job, so that was
> interesting.
>
> From all this, it seems like using a stateful DoFn to prevent time
> traveling panes from overwriting newer ones is the best solution for now.
>
> My last question / statement is just around general education and
> documentation about this.  I think the fact that PCollection are unordered
> makes sense and is pretty intuitive, but fired panes being delivered
> out-of-order seems very surprising.  I'm curious how many other pipelines
> exist that run into this (and produce incorrect results!) but people are
> unaware of.  Is there a way we can call this behavior out?  For example,
> many of the sample beam projects use early firings, but there's never any
> mention that the output may be out-of-order.
>
> On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <sn...@twitter.com>
>> wrote:
>> >
>> > wow, thats super unexpected and dangerous, thanks for clarifying!  Time
>> to go re-think how we do some of our writes w/ early firings then.
>> >
>> > Are there any workarounds to make things happen in-order in dataflow?
>>  eg if the sink gets fused to the output of the GBK operation, will it
>> always happen effectively in order (per key) even though it's not a
>> guarantee?
>>
>> If things get fused, yes. Note that sinks themselves sometimes have
>> fusion barriers though.
>>
>> > I also guess I could keep track of the last pane index my sink has
>> seen, and ignore earlier ones (using state to keep track)?
>>
>> Yes, that would work.
>>
>> What kind of sink are you using? If it supports read-modify-write or some
>> kind of transaction you may be able to mark early results as tentative
>> (which would be useful anyway) and only overwrite tentative ones.
>>
>>
>> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>
>> >> Correct, even within the same key there's no promise of event time
>> ordering mapping of panes to real time ordering because the downstream
>> operations may happen on a different machine. Multiply triggered windows
>> add an element of non-determinism to the process.
>> >>
>> >> You're also correct that triggering with multiple panes requires lots
>> of care, especially when it comes to operations with side effects (like
>> sinks). Most safe is to only write the final pane to the sink, and handle
>> early triggering in a different way.
>> https://s.apache.org/beam-sink-triggers is a proposal to make this
>> easier to reason about.
>> >>
>> >>
>> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sn...@apache.org>
>> wrote:
>> >>>
>> >>> Also to clarify here (I re-read this and realized it could be
>> slightly unclear).  My question is only about in-order delivery of panes.
>>  ie: will pane P always be delivered before P+1.
>> >>>
>> >>> I realize the use of "in-order" before could be confusing, I don't
>> care about the ordering of the elements per-se, just the ordering of the
>> pane delivery.
>> >>>
>> >>> I want to make sure that given a GBK that produces 3 panes (P0, P1,
>> P2) for a key, a downstream PCollection could never see P0, P2, P1.  OR at
>> least, the final firing is always guaranteed to be delivered after all
>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>> >>>
>> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <sn...@apache.org>
>> wrote:
>> >>>>
>> >>>> Are you also saying also that even in the first example (Source ->
>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>> pretty big "got-cha" for correctness if you ever use accumulating
>> triggering.
>> >>>>
>> >>>> I'd also like to point out I'm not talking about a global ordering
>> across the entire PCollection, I'm talking about within the same key after
>> a GBK transform.
>> >>>>
>> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <
>> robertwb@google.com> wrote:
>> >>>>>
>> >>>>> Due to the nature of distributed processing, order is not
>> preserved. You can, however, inspect the PaneInfo to determine if an
>> element was early, on-time, or late and act accordingly.
>> >>>>>
>> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <
>> jcgarciam@gmail.com> wrote:
>> >>>>>>
>> >>>>>> In my experience ordering is not guaranteed, you may need apply a
>> transformation that sort the elements and then dispatch them sorted out.
>> >>>>>>
>> >>>>>> Or uses the Sorter extension for this:
>> >>>>>>
>> >>>>>>
>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>> >>>>>>
>> >>>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb.
>> 2019, 16:31:
>> >>>>>>>
>> >>>>>>> Hi everyone, I have some questions I want to ask about how
>> windowing, triggering, and panes work together, and how to ensure
>> correctness throughout a pipeline.
>> >>>>>>>
>> >>>>>>> Lets assume I have a very simple streaming pipeline that looks
>> like:
>> >>>>>>> Source -> CombineByKey (Sum) -> Sink
>> >>>>>>>
>> >>>>>>> Given fixed windows of 1 hour, early firings every minute, and
>> accumulating panes, this is pretty straight forward.  However, this can get
>> more complicated if we add steps after the CombineByKey, for instance
>> (using the same windowing strategy):
>> >>>>>>>
>> >>>>>>> Say I want to buffer the results of the CombineByKey into batches
>> of N elements.  I can do this with the built-in GroupIntoBatches [1]
>> transform, now my pipeline looks like:
>> >>>>>>>
>> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>> >>>>>>>
>> >>>>>>> This leads to my main question:
>> >>>>>>> Is ordering preserved somehow here?  ie: is it possible that the
>> result from early firing F+1 now comes BEFORE the firing F (because it was
>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>> gets F+1 before F, which means my resulting store has incorrect data
>> (possibly forever if F+1 was the final firing).
>> >>>>>>>
>> >>>>>>> If ordering is not preserved, it seems as if I'd need to
>> introduce my own ordering back in after GroupIntoBatches.  GIB is an
>> example here, but I imagine this could happen with any GBK type operation.
>> >>>>>>>
>> >>>>>>> Am I thinking about this the correct way?  Thanks!
>> >>>>>>>
>> >>>>>>> [1]
>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>
>>>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Steve Niemitz <sn...@apache.org>.
Thanks again for the answers so far!  I really appreciate it.  As for my
specific use-case, we're using Bigtable as the final sink, and I'd prefer
to keep our writes fully idempotent for other reasons (ie no
read-modify-write).  We actually do track tentative vs final values
already, but checking that at write-time would impose a pretty big overhead
in the write path.

After this I actually instrumented one of my running pipelines to detect
these "time traveling" panes, and did see it occurring pretty frequently,
particularly when dataflow decides to scale up/down the job, so that was
interesting.

From all this, it seems like using a stateful DoFn to prevent time
traveling panes from overwriting newer ones is the best solution for now.

My last question / statement is just around general education and
documentation about this.  I think the fact that PCollection are unordered
makes sense and is pretty intuitive, but fired panes being delivered
out-of-order seems very surprising.  I'm curious how many other pipelines
exist that run into this (and produce incorrect results!) but people are
unaware of.  Is there a way we can call this behavior out?  For example,
many of the sample beam projects use early firings, but there's never any
mention that the output may be out-of-order.

On Wed, Feb 13, 2019 at 3:11 AM Robert Bradshaw <ro...@google.com> wrote:

> On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <sn...@twitter.com>
> wrote:
> >
> > wow, thats super unexpected and dangerous, thanks for clarifying!  Time
> to go re-think how we do some of our writes w/ early firings then.
> >
> > Are there any workarounds to make things happen in-order in dataflow?
>  eg if the sink gets fused to the output of the GBK operation, will it
> always happen effectively in order (per key) even though it's not a
> guarantee?
>
> If things get fused, yes. Note that sinks themselves sometimes have fusion
> barriers though.
>
> > I also guess I could keep track of the last pane index my sink has seen,
> and ignore earlier ones (using state to keep track)?
>
> Yes, that would work.
>
> What kind of sink are you using? If it supports read-modify-write or some
> kind of transaction you may be able to mark early results as tentative
> (which would be useful anyway) and only overwrite tentative ones.
>
>
> > On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> Correct, even within the same key there's no promise of event time
> ordering mapping of panes to real time ordering because the downstream
> operations may happen on a different machine. Multiply triggered windows
> add an element of non-determinism to the process.
> >>
> >> You're also correct that triggering with multiple panes requires lots
> of care, especially when it comes to operations with side effects (like
> sinks). Most safe is to only write the final pane to the sink, and handle
> early triggering in a different way.
> https://s.apache.org/beam-sink-triggers is a proposal to make this easier
> to reason about.
> >>
> >>
> >> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sn...@apache.org>
> wrote:
> >>>
> >>> Also to clarify here (I re-read this and realized it could be slightly
> unclear).  My question is only about in-order delivery of panes.  ie: will
> pane P always be delivered before P+1.
> >>>
> >>> I realize the use of "in-order" before could be confusing, I don't
> care about the ordering of the elements per-se, just the ordering of the
> pane delivery.
> >>>
> >>> I want to make sure that given a GBK that produces 3 panes (P0, P1,
> P2) for a key, a downstream PCollection could never see P0, P2, P1.  OR at
> least, the final firing is always guaranteed to be delivered after all
> early-firings (eg we could have P0, P2, P1, but then always PLast).
> >>>
> >>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <sn...@apache.org>
> wrote:
> >>>>
> >>>> Are you also saying also that even in the first example (Source ->
> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
> delivered in-order from the Combine -> Sink transforms?  This seems like a
> pretty big "got-cha" for correctness if you ever use accumulating
> triggering.
> >>>>
> >>>> I'd also like to point out I'm not talking about a global ordering
> across the entire PCollection, I'm talking about within the same key after
> a GBK transform.
> >>>>
> >>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>>>>
> >>>>> Due to the nature of distributed processing, order is not preserved.
> You can, however, inspect the PaneInfo to determine if an element was
> early, on-time, or late and act accordingly.
> >>>>>
> >>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <
> jcgarciam@gmail.com> wrote:
> >>>>>>
> >>>>>> In my experience ordering is not guaranteed, you may need apply a
> transformation that sort the elements and then dispatch them sorted out.
> >>>>>>
> >>>>>> Or uses the Sorter extension for this:
> >>>>>>
> >>>>>>
> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
> >>>>>>
> >>>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb. 2019,
> 16:31:
> >>>>>>>
> >>>>>>> Hi everyone, I have some questions I want to ask about how
> windowing, triggering, and panes work together, and how to ensure
> correctness throughout a pipeline.
> >>>>>>>
> >>>>>>> Lets assume I have a very simple streaming pipeline that looks
> like:
> >>>>>>> Source -> CombineByKey (Sum) -> Sink
> >>>>>>>
> >>>>>>> Given fixed windows of 1 hour, early firings every minute, and
> accumulating panes, this is pretty straight forward.  However, this can get
> more complicated if we add steps after the CombineByKey, for instance
> (using the same windowing strategy):
> >>>>>>>
> >>>>>>> Say I want to buffer the results of the CombineByKey into batches
> of N elements.  I can do this with the built-in GroupIntoBatches [1]
> transform, now my pipeline looks like:
> >>>>>>>
> >>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
> >>>>>>>
> >>>>>>> This leads to my main question:
> >>>>>>> Is ordering preserved somehow here?  ie: is it possible that the
> result from early firing F+1 now comes BEFORE the firing F (because it was
> re-ordered in the GroupIntoBatches).  This would mean that the sink then
> gets F+1 before F, which means my resulting store has incorrect data
> (possibly forever if F+1 was the final firing).
> >>>>>>>
> >>>>>>> If ordering is not preserved, it seems as if I'd need to introduce
> my own ordering back in after GroupIntoBatches.  GIB is an example here,
> but I imagine this could happen with any GBK type operation.
> >>>>>>>
> >>>>>>> Am I thinking about this the correct way?  Thanks!
> >>>>>>>
> >>>>>>> [1]
> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>
>>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Robert Bradshaw <ro...@google.com>.
On Tue, Feb 12, 2019 at 7:38 PM Steve Niemitz <sn...@twitter.com> wrote:
>
> wow, thats super unexpected and dangerous, thanks for clarifying!  Time
to go re-think how we do some of our writes w/ early firings then.
>
> Are there any workarounds to make things happen in-order in dataflow?  eg
if the sink gets fused to the output of the GBK operation, will it always
happen effectively in order (per key) even though it's not a guarantee?

If things get fused, yes. Note that sinks themselves sometimes have fusion
barriers though.

> I also guess I could keep track of the last pane index my sink has seen,
and ignore earlier ones (using state to keep track)?

Yes, that would work.

What kind of sink are you using? If it supports read-modify-write or some
kind of transaction you may be able to mark early results as tentative
(which would be useful anyway) and only overwrite tentative ones.


> On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <ro...@google.com>
wrote:
>>
>> Correct, even within the same key there's no promise of event time
ordering mapping of panes to real time ordering because the downstream
operations may happen on a different machine. Multiply triggered windows
add an element of non-determinism to the process.
>>
>> You're also correct that triggering with multiple panes requires lots of
care, especially when it comes to operations with side effects (like
sinks). Most safe is to only write the final pane to the sink, and handle
early triggering in a different way. https://s.apache.org/beam-sink-triggers
is a proposal to make this easier to reason about.
>>
>>
>> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sn...@apache.org>
wrote:
>>>
>>> Also to clarify here (I re-read this and realized it could be slightly
unclear).  My question is only about in-order delivery of panes.  ie: will
pane P always be delivered before P+1.
>>>
>>> I realize the use of "in-order" before could be confusing, I don't care
about the ordering of the elements per-se, just the ordering of the pane
delivery.
>>>
>>> I want to make sure that given a GBK that produces 3 panes (P0, P1, P2)
for a key, a downstream PCollection could never see P0, P2, P1.  OR at
least, the final firing is always guaranteed to be delivered after all
early-firings (eg we could have P0, P2, P1, but then always PLast).
>>>
>>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <sn...@apache.org>
wrote:
>>>>
>>>> Are you also saying also that even in the first example (Source ->
CombineByKey (Sum) -> Sink) there's no guarantee that events would be
delivered in-order from the Combine -> Sink transforms?  This seems like a
pretty big "got-cha" for correctness if you ever use accumulating
triggering.
>>>>
>>>> I'd also like to point out I'm not talking about a global ordering
across the entire PCollection, I'm talking about within the same key after
a GBK transform.
>>>>
>>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <ro...@google.com>
wrote:
>>>>>
>>>>> Due to the nature of distributed processing, order is not preserved.
You can, however, inspect the PaneInfo to determine if an element was
early, on-time, or late and act accordingly.
>>>>>
>>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <
jcgarciam@gmail.com> wrote:
>>>>>>
>>>>>> In my experience ordering is not guaranteed, you may need apply a
transformation that sort the elements and then dispatch them sorted out.
>>>>>>
>>>>>> Or uses the Sorter extension for this:
>>>>>>
>>>>>>
https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>>>
>>>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb. 2019,
16:31:
>>>>>>>
>>>>>>> Hi everyone, I have some questions I want to ask about how
windowing, triggering, and panes work together, and how to ensure
correctness throughout a pipeline.
>>>>>>>
>>>>>>> Lets assume I have a very simple streaming pipeline that looks like:
>>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>>>>
>>>>>>> Given fixed windows of 1 hour, early firings every minute, and
accumulating panes, this is pretty straight forward.  However, this can get
more complicated if we add steps after the CombineByKey, for instance
(using the same windowing strategy):
>>>>>>>
>>>>>>> Say I want to buffer the results of the CombineByKey into batches
of N elements.  I can do this with the built-in GroupIntoBatches [1]
transform, now my pipeline looks like:
>>>>>>>
>>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>>>>
>>>>>>> This leads to my main question:
>>>>>>> Is ordering preserved somehow here?  ie: is it possible that the
result from early firing F+1 now comes BEFORE the firing F (because it was
re-ordered in the GroupIntoBatches).  This would mean that the sink then
gets F+1 before F, which means my resulting store has incorrect data
(possibly forever if F+1 was the final firing).
>>>>>>>
>>>>>>> If ordering is not preserved, it seems as if I'd need to introduce
my own ordering back in after GroupIntoBatches.  GIB is an example here,
but I imagine this could happen with any GBK type operation.
>>>>>>>
>>>>>>> Am I thinking about this the correct way?  Thanks!
>>>>>>>
>>>>>>> [1]
https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html

>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Steve Niemitz <sn...@twitter.com>.
wow, thats super unexpected and dangerous, thanks for clarifying!  Time to
go re-think how we do some of our writes w/ early firings then.

Are there any workarounds to make things happen in-order in dataflow?  eg
if the sink gets fused to the output of the GBK operation, will it always
happen effectively in order (per key) even though it's not a guarantee?  I
also guess I could keep track of the last pane index my sink has seen, and
ignore earlier ones (using state to keep track)?


On Tue, Feb 12, 2019 at 1:28 PM Robert Bradshaw <ro...@google.com> wrote:

> Correct, even within the same key there's no promise of event time
> ordering mapping of panes to real time ordering because the downstream
> operations *may* happen on a different machine. Multiply triggered
> windows add an element of non-determinism to the process.
>
> You're also correct that triggering with multiple panes requires lots of
> care, especially when it comes to operations with side effects (like
> sinks). Most safe is to only write the final pane to the sink, and handle
> early triggering in a different way.
> https://s.apache.org/beam-sink-triggers is a proposal to make this easier
> to reason about.
>
>
> On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sn...@apache.org> wrote:
>
>> Also to clarify here (I re-read this and realized it could be slightly
>> unclear).  My question is only about in-order delivery of panes.  ie: will
>> pane P always be delivered before P+1.
>>
>> I realize the use of "in-order" before could be confusing, I don't care
>> about the ordering of the elements per-se, just the ordering of the pane
>> delivery.
>>
>> I want to make sure that given a GBK that produces 3 panes (P0, P1, P2)
>> for a key, a downstream PCollection could never see P0, P2, P1.  OR at
>> least, the final firing is always guaranteed to be delivered after all
>> early-firings (eg we could have P0, P2, P1, but then always PLast).
>>
>> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <sn...@apache.org>
>> wrote:
>>
>>> Are you also saying also that even in the first example (Source ->
>>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>>> pretty big "got-cha" for correctness if you ever use accumulating
>>> triggering.
>>>
>>> I'd also like to point out I'm not talking about a global ordering
>>> across the entire PCollection, I'm talking about within the same key after
>>> a GBK transform.
>>>
>>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> Due to the nature of distributed processing, order is not preserved.
>>>> You can, however, inspect the PaneInfo to determine if an element was
>>>> early, on-time, or late and act accordingly.
>>>>
>>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <jc...@gmail.com>
>>>> wrote:
>>>>
>>>>> In my experience ordering is not guaranteed, you may need apply a
>>>>> transformation that sort the elements and then dispatch them sorted out.
>>>>>
>>>>> Or uses the Sorter extension for this:
>>>>>
>>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>>
>>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb. 2019,
>>>>> 16:31:
>>>>>
>>>>>> Hi everyone, I have some questions I want to ask about how windowing,
>>>>>> triggering, and panes work together, and how to ensure correctness
>>>>>> throughout a pipeline.
>>>>>>
>>>>>> Lets assume I have a very simple streaming pipeline that looks like:
>>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>>>
>>>>>> Given fixed windows of 1 hour, early firings every minute, and
>>>>>> accumulating panes, this is pretty straight forward.  However, this can get
>>>>>> more complicated if we add steps after the CombineByKey, for instance
>>>>>> (using the same windowing strategy):
>>>>>>
>>>>>> Say I want to buffer the results of the CombineByKey into batches of
>>>>>> N elements.  I can do this with the built-in GroupIntoBatches [1]
>>>>>> transform, now my pipeline looks like:
>>>>>>
>>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>>>
>>>>>> *This leads to my main question:*
>>>>>> Is ordering preserved somehow here?  ie: is it possible that the
>>>>>> result from early firing F+1 now comes BEFORE the firing F (because it was
>>>>>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>>>>>> gets F+1 before F, which means my resulting store has incorrect data
>>>>>> (possibly forever if F+1 was the final firing).
>>>>>>
>>>>>> If ordering is not preserved, it seems as if I'd need to introduce my
>>>>>> own ordering back in after GroupIntoBatches.  GIB is an example here, but I
>>>>>> imagine this could happen with any GBK type operation.
>>>>>>
>>>>>> Am I thinking about this the correct way?  Thanks!
>>>>>>
>>>>>> [1]
>>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>>>>
>>>>>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Robert Bradshaw <ro...@google.com>.
Correct, even within the same key there's no promise of event time ordering
mapping of panes to real time ordering because the downstream operations
*may* happen on a different machine. Multiply triggered windows add an
element of non-determinism to the process.

You're also correct that triggering with multiple panes requires lots of
care, especially when it comes to operations with side effects (like
sinks). Most safe is to only write the final pane to the sink, and handle
early triggering in a different way. https://s.apache.org/beam-sink-triggers
is a proposal to make this easier to reason about.


On Tue, Feb 12, 2019 at 7:19 PM Steve Niemitz <sn...@apache.org> wrote:

> Also to clarify here (I re-read this and realized it could be slightly
> unclear).  My question is only about in-order delivery of panes.  ie: will
> pane P always be delivered before P+1.
>
> I realize the use of "in-order" before could be confusing, I don't care
> about the ordering of the elements per-se, just the ordering of the pane
> delivery.
>
> I want to make sure that given a GBK that produces 3 panes (P0, P1, P2)
> for a key, a downstream PCollection could never see P0, P2, P1.  OR at
> least, the final firing is always guaranteed to be delivered after all
> early-firings (eg we could have P0, P2, P1, but then always PLast).
>
> On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <sn...@apache.org>
> wrote:
>
>> Are you also saying also that even in the first example (Source ->
>> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
>> delivered in-order from the Combine -> Sink transforms?  This seems like a
>> pretty big "got-cha" for correctness if you ever use accumulating
>> triggering.
>>
>> I'd also like to point out I'm not talking about a global ordering across
>> the entire PCollection, I'm talking about within the same key after a GBK
>> transform.
>>
>> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> Due to the nature of distributed processing, order is not preserved. You
>>> can, however, inspect the PaneInfo to determine if an element was early,
>>> on-time, or late and act accordingly.
>>>
>>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <jc...@gmail.com>
>>> wrote:
>>>
>>>> In my experience ordering is not guaranteed, you may need apply a
>>>> transformation that sort the elements and then dispatch them sorted out.
>>>>
>>>> Or uses the Sorter extension for this:
>>>>
>>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>>
>>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb. 2019,
>>>> 16:31:
>>>>
>>>>> Hi everyone, I have some questions I want to ask about how windowing,
>>>>> triggering, and panes work together, and how to ensure correctness
>>>>> throughout a pipeline.
>>>>>
>>>>> Lets assume I have a very simple streaming pipeline that looks like:
>>>>> Source -> CombineByKey (Sum) -> Sink
>>>>>
>>>>> Given fixed windows of 1 hour, early firings every minute, and
>>>>> accumulating panes, this is pretty straight forward.  However, this can get
>>>>> more complicated if we add steps after the CombineByKey, for instance
>>>>> (using the same windowing strategy):
>>>>>
>>>>> Say I want to buffer the results of the CombineByKey into batches of N
>>>>> elements.  I can do this with the built-in GroupIntoBatches [1] transform,
>>>>> now my pipeline looks like:
>>>>>
>>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>>
>>>>> *This leads to my main question:*
>>>>> Is ordering preserved somehow here?  ie: is it possible that the
>>>>> result from early firing F+1 now comes BEFORE the firing F (because it was
>>>>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>>>>> gets F+1 before F, which means my resulting store has incorrect data
>>>>> (possibly forever if F+1 was the final firing).
>>>>>
>>>>> If ordering is not preserved, it seems as if I'd need to introduce my
>>>>> own ordering back in after GroupIntoBatches.  GIB is an example here, but I
>>>>> imagine this could happen with any GBK type operation.
>>>>>
>>>>> Am I thinking about this the correct way?  Thanks!
>>>>>
>>>>> [1]
>>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>>>
>>>>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Steve Niemitz <sn...@apache.org>.
Also to clarify here (I re-read this and realized it could be slightly
unclear).  My question is only about in-order delivery of panes.  ie: will
pane P always be delivered before P+1.

I realize the use of "in-order" before could be confusing, I don't care
about the ordering of the elements per-se, just the ordering of the pane
delivery.

I want to make sure that given a GBK that produces 3 panes (P0, P1, P2) for
a key, a downstream PCollection could never see P0, P2, P1.  OR at least,
the final firing is always guaranteed to be delivered after all
early-firings (eg we could have P0, P2, P1, but then always PLast).

On Tue, Feb 12, 2019 at 11:48 AM Steve Niemitz <sn...@apache.org> wrote:

> Are you also saying also that even in the first example (Source ->
> CombineByKey (Sum) -> Sink) there's no guarantee that events would be
> delivered in-order from the Combine -> Sink transforms?  This seems like a
> pretty big "got-cha" for correctness if you ever use accumulating
> triggering.
>
> I'd also like to point out I'm not talking about a global ordering across
> the entire PCollection, I'm talking about within the same key after a GBK
> transform.
>
> On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> Due to the nature of distributed processing, order is not preserved. You
>> can, however, inspect the PaneInfo to determine if an element was early,
>> on-time, or late and act accordingly.
>>
>> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <jc...@gmail.com>
>> wrote:
>>
>>> In my experience ordering is not guaranteed, you may need apply a
>>> transformation that sort the elements and then dispatch them sorted out.
>>>
>>> Or uses the Sorter extension for this:
>>>
>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>>
>>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb. 2019,
>>> 16:31:
>>>
>>>> Hi everyone, I have some questions I want to ask about how windowing,
>>>> triggering, and panes work together, and how to ensure correctness
>>>> throughout a pipeline.
>>>>
>>>> Lets assume I have a very simple streaming pipeline that looks like:
>>>> Source -> CombineByKey (Sum) -> Sink
>>>>
>>>> Given fixed windows of 1 hour, early firings every minute, and
>>>> accumulating panes, this is pretty straight forward.  However, this can get
>>>> more complicated if we add steps after the CombineByKey, for instance
>>>> (using the same windowing strategy):
>>>>
>>>> Say I want to buffer the results of the CombineByKey into batches of N
>>>> elements.  I can do this with the built-in GroupIntoBatches [1] transform,
>>>> now my pipeline looks like:
>>>>
>>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>>
>>>> *This leads to my main question:*
>>>> Is ordering preserved somehow here?  ie: is it possible that the result
>>>> from early firing F+1 now comes BEFORE the firing F (because it was
>>>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>>>> gets F+1 before F, which means my resulting store has incorrect data
>>>> (possibly forever if F+1 was the final firing).
>>>>
>>>> If ordering is not preserved, it seems as if I'd need to introduce my
>>>> own ordering back in after GroupIntoBatches.  GIB is an example here, but I
>>>> imagine this could happen with any GBK type operation.
>>>>
>>>> Am I thinking about this the correct way?  Thanks!
>>>>
>>>> [1]
>>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>>
>>>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Steve Niemitz <sn...@apache.org>.
Are you also saying also that even in the first example (Source ->
CombineByKey (Sum) -> Sink) there's no guarantee that events would be
delivered in-order from the Combine -> Sink transforms?  This seems like a
pretty big "got-cha" for correctness if you ever use accumulating
triggering.

I'd also like to point out I'm not talking about a global ordering across
the entire PCollection, I'm talking about within the same key after a GBK
transform.

On Tue, Feb 12, 2019 at 11:35 AM Robert Bradshaw <ro...@google.com>
wrote:

> Due to the nature of distributed processing, order is not preserved. You
> can, however, inspect the PaneInfo to determine if an element was early,
> on-time, or late and act accordingly.
>
> On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <jc...@gmail.com>
> wrote:
>
>> In my experience ordering is not guaranteed, you may need apply a
>> transformation that sort the elements and then dispatch them sorted out.
>>
>> Or uses the Sorter extension for this:
>>
>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>
>> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb. 2019, 16:31:
>>
>>> Hi everyone, I have some questions I want to ask about how windowing,
>>> triggering, and panes work together, and how to ensure correctness
>>> throughout a pipeline.
>>>
>>> Lets assume I have a very simple streaming pipeline that looks like:
>>> Source -> CombineByKey (Sum) -> Sink
>>>
>>> Given fixed windows of 1 hour, early firings every minute, and
>>> accumulating panes, this is pretty straight forward.  However, this can get
>>> more complicated if we add steps after the CombineByKey, for instance
>>> (using the same windowing strategy):
>>>
>>> Say I want to buffer the results of the CombineByKey into batches of N
>>> elements.  I can do this with the built-in GroupIntoBatches [1] transform,
>>> now my pipeline looks like:
>>>
>>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>>
>>> *This leads to my main question:*
>>> Is ordering preserved somehow here?  ie: is it possible that the result
>>> from early firing F+1 now comes BEFORE the firing F (because it was
>>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>>> gets F+1 before F, which means my resulting store has incorrect data
>>> (possibly forever if F+1 was the final firing).
>>>
>>> If ordering is not preserved, it seems as if I'd need to introduce my
>>> own ordering back in after GroupIntoBatches.  GIB is an example here, but I
>>> imagine this could happen with any GBK type operation.
>>>
>>> Am I thinking about this the correct way?  Thanks!
>>>
>>> [1]
>>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>>
>>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Robert Bradshaw <ro...@google.com>.
Due to the nature of distributed processing, order is not preserved. You
can, however, inspect the PaneInfo to determine if an element was early,
on-time, or late and act accordingly.

On Tue, Feb 12, 2019 at 5:15 PM Juan Carlos Garcia <jc...@gmail.com>
wrote:

> In my experience ordering is not guaranteed, you may need apply a
> transformation that sort the elements and then dispatch them sorted out.
>
> Or uses the Sorter extension for this:
>
> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>
> Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb. 2019, 16:31:
>
>> Hi everyone, I have some questions I want to ask about how windowing,
>> triggering, and panes work together, and how to ensure correctness
>> throughout a pipeline.
>>
>> Lets assume I have a very simple streaming pipeline that looks like:
>> Source -> CombineByKey (Sum) -> Sink
>>
>> Given fixed windows of 1 hour, early firings every minute, and
>> accumulating panes, this is pretty straight forward.  However, this can get
>> more complicated if we add steps after the CombineByKey, for instance
>> (using the same windowing strategy):
>>
>> Say I want to buffer the results of the CombineByKey into batches of N
>> elements.  I can do this with the built-in GroupIntoBatches [1] transform,
>> now my pipeline looks like:
>>
>> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>>
>> *This leads to my main question:*
>> Is ordering preserved somehow here?  ie: is it possible that the result
>> from early firing F+1 now comes BEFORE the firing F (because it was
>> re-ordered in the GroupIntoBatches).  This would mean that the sink then
>> gets F+1 before F, which means my resulting store has incorrect data
>> (possibly forever if F+1 was the final firing).
>>
>> If ordering is not preserved, it seems as if I'd need to introduce my own
>> ordering back in after GroupIntoBatches.  GIB is an example here, but I
>> imagine this could happen with any GBK type operation.
>>
>> Am I thinking about this the correct way?  Thanks!
>>
>> [1]
>> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>>
>

Re: Some questions about ensuring correctness with windowing and triggering

Posted by Juan Carlos Garcia <jc...@gmail.com>.
In my experience ordering is not guaranteed, you may need apply a
transformation that sort the elements and then dispatch them sorted out.

Or uses the Sorter extension for this:

https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter

Steve Niemitz <sn...@apache.org> schrieb am Di., 12. Feb. 2019, 16:31:

> Hi everyone, I have some questions I want to ask about how windowing,
> triggering, and panes work together, and how to ensure correctness
> throughout a pipeline.
>
> Lets assume I have a very simple streaming pipeline that looks like:
> Source -> CombineByKey (Sum) -> Sink
>
> Given fixed windows of 1 hour, early firings every minute, and
> accumulating panes, this is pretty straight forward.  However, this can get
> more complicated if we add steps after the CombineByKey, for instance
> (using the same windowing strategy):
>
> Say I want to buffer the results of the CombineByKey into batches of N
> elements.  I can do this with the built-in GroupIntoBatches [1] transform,
> now my pipeline looks like:
>
> Source -> CombineByKey (Sum) -> GroupIntoBatches -> Sink
>
> *This leads to my main question:*
> Is ordering preserved somehow here?  ie: is it possible that the result
> from early firing F+1 now comes BEFORE the firing F (because it was
> re-ordered in the GroupIntoBatches).  This would mean that the sink then
> gets F+1 before F, which means my resulting store has incorrect data
> (possibly forever if F+1 was the final firing).
>
> If ordering is not preserved, it seems as if I'd need to introduce my own
> ordering back in after GroupIntoBatches.  GIB is an example here, but I
> imagine this could happen with any GBK type operation.
>
> Am I thinking about this the correct way?  Thanks!
>
> [1]
> https://beam.apache.org/releases/javadoc/2.10.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html
>