You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by David Morávek <dm...@apache.org> on 2020/05/28 15:02:51 UTC

dealing with late data output timestamps

Hi,

I've came across "unexpected" model behaviour when dealing with late data
and custom timestamp combiners. Let's take a following pipeline as an
example:

final PCollection<String> input = ...;
input.apply(
      "GlobalWindows",
      Window.<String>into(new GlobalWindows())
          .triggering(
              AfterWatermark.pastEndOfWindow()
                  .withEarlyFirings(
                      AfterProcessingTime.pastFirstElementInPane()
                          .plusDelayOf(Duration.standardSeconds(10))))
          .withTimestampCombiner(TimestampCombiner.LATEST)
          .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
          .accumulatingFiredPanes())
  .apply("Aggregate", Count.perElement())

The above pipeline emits updates with the latest input timestamp it has
seen so far (from non-late elements). We write the output from this
timestamp to kafka and read it from another pipeline.

Problem comes when we need to handle late elements behind output watermark.
In this case beam can not use combined timestamp and uses EOW timestamp
instead. Unfortunately this results in downstream pipeline progressing it's
input watermark to end of global window. Also if we would use fixed windows
after this aggregation, it would yield unexpected results.

There is no reasoning about this behaviour in the last section of lateness
design doc <https://s.apache.org/beam-lateness> [1], so I'd like to open a
discussion about what the expected result should be.

My personal opinion is, that correct approach would be emitting late
elements with currentOutputWatermark rather than EOW in case of EARLIEST
and LATEST timestamp combiners.

I've prepared a faling test case for ReduceFnRunner
<https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>,
if anyone wants to play around with the issue.

I also think that BEAM-2262
<https://issues.apache.org/jira/browse/BEAM-2262> [2] may be related to
this discussion.

[1] https://s.apache.org/beam-lateness
[2] https://issues.apache.org/jira/browse/BEAM-2262
[3]
https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b

Looking forward to hearing your thoughts.

Thanks,
D.

Re: dealing with late data output timestamps

Posted by Kenneth Knowles <ke...@apache.org>.
See https://s.apache.org/beam-lateness for detailed rationale about where
the holds end up. It is a pretty massive read, but at this point I think
even the details there are relevant.

TL;DR is that the hold policy:

 - never makes on time data late
 - never makes non-droppable data droppable
 - never holds up downstream processing more than required

I don't believe there is another policy that achieves these.

Kenn

On Tue, Jun 2, 2020 at 6:57 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Kenn,
>
> agree that for on-time elements, the hold has to respect the output
> timestamps. For already late elements, it should be possible to calculate
> the output timestamp independently. Currently, the output watermark is
> *always* the value of watermark hold, which might be inappropriate for
> cases when the hold is explicitly cleared (because input is late and the
> hold is cleared in order not to hold downstream processing). My proposal
> would be to update the hold to input watermark (or more precisely to result
> of TimestampCombiner.assign(window, inputWatermark)), which seems to work,
> although it might hold watermark in cases when watermark updates even when
> no data arrives (I'm not sure how exactly to solve this). Another way
> around would be to clearly separate output timestamp (store it in different
> state than the watermark hold), because the output watermark should be
> cleared on different condition (hold clears after every firing to enable
> watemark progress, while output timestamp can be kept in case of
> accumulating panes).
>
> Jan
> On 6/2/20 1:42 AM, Kenneth Knowles wrote:
>
> Quick reply about one top-level thing: output timestamps and watermark
> holds are closely related. A hold is precisely reserving the right to
> output at a particular time. The part that is unintuitive is that these are
> ever different. That is, really, a hack to allow downstream processing to
> proceed more quickly when input is already late. I wonder if there is
> another way to set up the requirements so that they are always the same
> value, but the downstream on time processing is not held up by late data.
>
>
>
> On Sun, May 31, 2020 at 3:44 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Minor self-correction - in the property c) it MIGHT be possible to update
>> output watermark to time greater than input watermark, _as long as any
>> future element cannot be assigned timestamp that is less than the output
>> watermark_. That seems to be the case only for
>> TimestampCombiner.END_OF_WINDOW, as that actually does not depend on
>> timestamps of the actual elements. This doesn't quite change the reasoning
>> below, it might be possible to not store input watermark to watermark hold
>> for this combiner, although it would probably have negligible practical
>> impact.
>> On 5/31/20 12:17 PM, Jan Lukavský wrote:
>>
>> Hi Reuven,
>>
>> the asynchronicity of watermark update is what I was missing - it is what
>> relates watermarkhold with element output timestamp. On the other hand, we
>> have some invariants that have to hold, namely:
>>
>>  a) element arriving as non-late MUST NOT be changed to late
>>
>>  b) element arriving as late MIGHT be changed to non-late
>>
>>  c) operator's output watermark MUST be less than input watermark at any
>> time
>>
>> Properties a) and b) are somewhat natural requirements and property c)
>> follows the fact, that it is impossible to exactly predict future.
>>
>> Now, having these three properties, would it be possible to:
>>
>>  a) when pane contains both late and on-time elements, split the pane
>> into two, containing only late and on-time elements
>>
>>  b) calculate output timestamp of all panes using timestamp combiner (now
>> pane contains only late or on time elements, so no timestamp combiner
>> should be able to violate neither of properties a) or b))
>>
>>  c) calculate when there is pane that contains only late elements, update
>> watermark hold to min(current input watermark, window gc time) - so that
>> the output watermark can progress up to input watermark (and not violate
>> property c) above)
>>
>> I seems to me that what currently stands in the way is that
>>
>>  a) panes are not split to late and non-late only (and this might be
>> tricky, mostly for combining transforms)
>>
>>  b) the watermark hold with late-only pane is set to window gc time
>> (instead of adding the input watermark as well) - [1]
>>
>> With TimestampCombiner.LATEST and END_OF_WINDOW it seems that splitting
>> the panes would not be required, as the timestamp combiner can only shift
>> late elements forward (make use of property b)). TimestampCombiner.EARLIEST
>> would probably require splitting the panes, which seems to solve the
>> mentioned [BEAM-2262].
>>
>> WDYT?
>>
>> [1]
>> https://github.com/je-ik/beam/commit/9721d82133c672f4fdca5acfad4d6d3ff0fd256f
>> On 5/29/20 5:01 PM, Reuven Lax wrote:
>>
>> This does seem non intuitive, though I'm not sure what the best approach
>> is.
>>
>> The problem with using currentOutputWatermark as the output timestamp is
>> that Beam does not define watermark advancement to be synchronous, and at
>> least the Dataflow runner calculates watermarks completely independent of
>> bundle processing. This means that the output watermark could advance
>> immediately after being checked, which would cause the records output to be
>> arbitrarily late. So for example, if allowedLateness is 10 seconds, then
>> this trigger will accept a record that is 5 seconds late. However if
>> currentOutputWaternark advances by 15 seconds after checking it, then you
>> would end up outputting a result that is 15 seconds late and therefore
>> would be dropped.
>>
>> IMO it's most important that on-time elements are never turned into late.
>> elements. However the above behavior also seems confusing to users.
>>
>> Worth noting that I don't think that the current behavior is that much
>> better. If the output watermark is close to the end of the window, then I
>> think the existing model can also cause this scenario to happen.
>>
>> Reuven
>>
>> On Fri, May 29, 2020 at 12:54 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> what seems the most "surprising" to me is that we are using
>>> TimestampCombiners to actually do two (orthogonal) things:
>>>
>>>  a) calculate a watermark hold for a window, so on-time elements emitted
>>> from a pane are not late in downstream processing
>>>
>>>  b) calculate timestamp of elements in output pane
>>>
>>> These two follow a little different constraints - while in case a) it is
>>> not allowed to shift watermark "back in time" in case b) it seems OK to
>>> output data with timestamp lower than output watermark (what comes late,
>>> might leave late). So, while it seems OK to discard late elements for the
>>> sake of calculation output watermark, it seems wrong to discard them when
>>> calculating output timestamp. Maybe these two timestamps might be held in
>>> different states (the state will be held until GC time for accumulating
>>> panes and reset on discarding panes)?
>>>
>>> Jan
>>> On 5/28/20 5:02 PM, David Morávek wrote:
>>>
>>> Hi,
>>>
>>> I've came across "unexpected" model behaviour when dealing with late
>>> data and custom timestamp combiners. Let's take a following pipeline as an
>>> example:
>>>
>>> final PCollection<String> input = ...;
>>> input.apply(
>>>       "GlobalWindows",
>>>       Window.<String>into(new GlobalWindows())
>>>           .triggering(
>>>               AfterWatermark.pastEndOfWindow()
>>>                   .withEarlyFirings(
>>>                       AfterProcessingTime.pastFirstElementInPane()
>>>                           .plusDelayOf(Duration.standardSeconds(10))))
>>>           .withTimestampCombiner(TimestampCombiner.LATEST)
>>>           .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
>>>           .accumulatingFiredPanes())
>>>   .apply("Aggregate", Count.perElement())
>>>
>>> The above pipeline emits updates with the latest input timestamp it has
>>> seen so far (from non-late elements). We write the output from this
>>> timestamp to kafka and read it from another pipeline.
>>>
>>> Problem comes when we need to handle late elements behind output
>>> watermark. In this case beam can not use combined timestamp and uses EOW
>>> timestamp instead. Unfortunately this results in downstream pipeline
>>> progressing it's input watermark to end of global window. Also if we would
>>> use fixed windows after this aggregation, it would yield unexpected results.
>>>
>>> There is no reasoning about this behaviour in the last section of lateness
>>> design doc <https://s.apache.org/beam-lateness> [1], so I'd like to
>>> open a discussion about what the expected result should be.
>>>
>>> My personal opinion is, that correct approach would be emitting late
>>> elements with currentOutputWatermark rather than EOW in case of EARLIEST
>>> and LATEST timestamp combiners.
>>>
>>> I've prepared a faling test case for ReduceFnRunner
>>> <https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>,
>>> if anyone wants to play around with the issue.
>>>
>>> I also think that BEAM-2262
>>> <https://issues.apache.org/jira/browse/BEAM-2262> [2] may be related to
>>> this discussion.
>>>
>>> [1] https://s.apache.org/beam-lateness
>>> [2] https://issues.apache.org/jira/browse/BEAM-2262
>>> [3]
>>> https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b
>>>
>>> Looking forward to hearing your thoughts.
>>>
>>> Thanks,
>>> D.
>>>
>>>

Re: dealing with late data output timestamps

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Kenn,

agree that for on-time elements, the hold has to respect the output 
timestamps. For already late elements, it should be possible to 
calculate the output timestamp independently. Currently, the output 
watermark is *always* the value of watermark hold, which might be 
inappropriate for cases when the hold is explicitly cleared (because 
input is late and the hold is cleared in order not to hold downstream 
processing). My proposal would be to update the hold to input watermark 
(or more precisely to result of TimestampCombiner.assign(window, 
inputWatermark)), which seems to work, although it might hold watermark 
in cases when watermark updates even when no data arrives (I'm not sure 
how exactly to solve this). Another way around would be to clearly 
separate output timestamp (store it in different state than the 
watermark hold), because the output watermark should be cleared on 
different condition (hold clears after every firing to enable watemark 
progress, while output timestamp can be kept in case of accumulating panes).

Jan

On 6/2/20 1:42 AM, Kenneth Knowles wrote:
> Quick reply about one top-level thing: output timestamps and watermark 
> holds are closely related. A hold is precisely reserving the right to 
> output at a particular time. The part that is unintuitive is that 
> these are ever different. That is, really, a hack to allow downstream 
> processing to proceed more quickly when input is already late. I 
> wonder if there is another way to set up the requirements so that they 
> are always the same value, but the downstream on time processing is 
> not held up by late data.
>
>
>
> On Sun, May 31, 2020 at 3:44 AM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Minor self-correction - in the property c) it MIGHT be possible to
>     update output watermark to time greater than input watermark, _as
>     long as any future element cannot be assigned timestamp that is
>     less than the output watermark_. That seems to be the case only
>     for TimestampCombiner.END_OF_WINDOW, as that actually does not
>     depend on timestamps of the actual elements. This doesn't quite
>     change the reasoning below, it might be possible to not store
>     input watermark to watermark hold for this combiner, although it
>     would probably have negligible practical impact.
>
>     On 5/31/20 12:17 PM, Jan Lukavský wrote:
>>
>>     Hi Reuven,
>>
>>     the asynchronicity of watermark update is what I was missing - it
>>     is what relates watermarkhold with element output timestamp. On
>>     the other hand, we have some invariants that have to hold, namely:
>>
>>      a) element arriving as non-late MUST NOT be changed to late
>>
>>      b) element arriving as late MIGHT be changed to non-late
>>
>>      c) operator's output watermark MUST be less than input watermark
>>     at any time
>>
>>     Properties a) and b) are somewhat natural requirements and
>>     property c) follows the fact, that it is impossible to exactly
>>     predict future.
>>
>>     Now, having these three properties, would it be possible to:
>>
>>      a) when pane contains both late and on-time elements, split the
>>     pane into two, containing only late and on-time elements
>>
>>      b) calculate output timestamp of all panes using timestamp
>>     combiner (now pane contains only late or on time elements, so no
>>     timestamp combiner should be able to violate neither of
>>     properties a) or b))
>>
>>      c) calculate when there is pane that contains only late
>>     elements, update watermark hold to min(current input watermark,
>>     window gc time) - so that the output watermark can progress up to
>>     input watermark (and not violate property c) above)
>>
>>     I seems to me that what currently stands in the way is that
>>
>>      a) panes are not split to late and non-late only (and this might
>>     be tricky, mostly for combining transforms)
>>
>>      b) the watermark hold with late-only pane is set to window gc
>>     time (instead of adding the input watermark as well) - [1]
>>
>>     With TimestampCombiner.LATEST and END_OF_WINDOW it seems that
>>     splitting the panes would not be required, as the timestamp
>>     combiner can only shift late elements forward (make use of
>>     property b)). TimestampCombiner.EARLIEST would probably require
>>     splitting the panes, which seems to solve the mentioned [BEAM-2262].
>>
>>     WDYT?
>>
>>     [1]
>>     https://github.com/je-ik/beam/commit/9721d82133c672f4fdca5acfad4d6d3ff0fd256f
>>
>>     On 5/29/20 5:01 PM, Reuven Lax wrote:
>>>     This does seem non intuitive, though I'm not sure what the best
>>>     approach is.
>>>
>>>     The problem with using currentOutputWatermark as the output
>>>     timestamp is that Beam does not define watermark advancement to
>>>     be synchronous, and at least the Dataflow runner calculates
>>>     watermarks completely independent of bundle processing. This
>>>     means that the output watermark could advance immediately after
>>>     being checked, which would cause the records output to be
>>>     arbitrarily late. So for example, if allowedLateness is 10
>>>     seconds, then this trigger will accept a record that is 5
>>>     seconds late. However if currentOutputWaternark advances by 15
>>>     seconds after checking it, then you would end up outputting a
>>>     result that is 15 seconds late and therefore would be dropped.
>>>
>>>     IMO it's most important that on-time elements are never turned
>>>     into late. elements. However the above behavior also seems
>>>     confusing to users.
>>>
>>>     Worth noting that I don't think that the current behavior is
>>>     that much better. If the output watermark is close to the end of
>>>     the window, then I think the existing model can also cause this
>>>     scenario to happen.
>>>
>>>     Reuven
>>>
>>>     On Fri, May 29, 2020 at 12:54 AM Jan Lukavský <je.ik@seznam.cz
>>>     <ma...@seznam.cz>> wrote:
>>>
>>>         Hi,
>>>
>>>         what seems the most "surprising" to me is that we are using
>>>         TimestampCombiners to actually do two (orthogonal) things:
>>>
>>>          a) calculate a watermark hold for a window, so on-time
>>>         elements emitted from a pane are not late in downstream
>>>         processing
>>>
>>>          b) calculate timestamp of elements in output pane
>>>
>>>         These two follow a little different constraints - while in
>>>         case a) it is not allowed to shift watermark "back in time"
>>>         in case b) it seems OK to output data with timestamp lower
>>>         than output watermark (what comes late, might leave late).
>>>         So, while it seems OK to discard late elements for the sake
>>>         of calculation output watermark, it seems wrong to discard
>>>         them when calculating output timestamp. Maybe these two
>>>         timestamps might be held in different states (the state will
>>>         be held until GC time for accumulating panes and reset on
>>>         discarding panes)?
>>>
>>>         Jan
>>>
>>>         On 5/28/20 5:02 PM, David Morávek wrote:
>>>>         Hi,
>>>>
>>>>         I've came across "unexpected" model behaviour when dealing
>>>>         with late data and custom timestamp combiners. Let's take a
>>>>         following pipeline as an example:
>>>>
>>>>         final PCollection<String> input = ...;
>>>>         input.apply(
>>>>               "GlobalWindows",
>>>>               Window.<String>into(new GlobalWindows())
>>>>                   .triggering(
>>>>         AfterWatermark.pastEndOfWindow()
>>>>                           .withEarlyFirings(
>>>>         AfterProcessingTime.pastFirstElementInPane()
>>>>         .plusDelayOf(Duration.standardSeconds(10))))
>>>>         .withTimestampCombiner(TimestampCombiner.LATEST)
>>>>         .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
>>>>                   .accumulatingFiredPanes())
>>>>           .apply("Aggregate", Count.perElement())
>>>>
>>>>         The above pipeline emits updates with the latest input
>>>>         timestamp it has seen so far (from non-late elements). We
>>>>         write the output from this timestamp to kafka and read it
>>>>         from another pipeline.
>>>>
>>>>         Problem comes when we need to handle late elements behind
>>>>         output watermark. In this case beam can not use combined
>>>>         timestamp and uses EOW timestamp instead. Unfortunately
>>>>         this results in downstream pipeline progressing it's input
>>>>         watermark to end of global window. Also if we would use
>>>>         fixed windows after this aggregation, it would yield
>>>>         unexpected results.
>>>>
>>>>         There is no reasoning about this behaviour in the last
>>>>         section of lateness design doc
>>>>         <https://s.apache.org/beam-lateness> [1], so I'd like to
>>>>         open a discussion about what the expected result should be.
>>>>
>>>>         My personal opinion is, that correct approach would be
>>>>         emitting late elements with currentOutputWatermark rather
>>>>         than EOW in case of EARLIEST and LATEST timestamp combiners.
>>>>
>>>>         I've prepared a faling test case for ReduceFnRunner
>>>>         <https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>,
>>>>         if anyone wants to play around with the issue.
>>>>
>>>>         I also think that BEAM-2262
>>>>         <https://issues.apache.org/jira/browse/BEAM-2262> [2] may
>>>>         be related to this discussion.
>>>>
>>>>         [1] https://s.apache.org/beam-lateness
>>>>         [2] https://issues.apache.org/jira/browse/BEAM-2262
>>>>         [3]
>>>>         https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b
>>>>
>>>>         Looking forward to hearing your thoughts.
>>>>
>>>>         Thanks,
>>>>         D.
>>>

Re: dealing with late data output timestamps

Posted by Kenneth Knowles <ke...@apache.org>.
Quick reply about one top-level thing: output timestamps and watermark
holds are closely related. A hold is precisely reserving the right to
output at a particular time. The part that is unintuitive is that these are
ever different. That is, really, a hack to allow downstream processing to
proceed more quickly when input is already late. I wonder if there is
another way to set up the requirements so that they are always the same
value, but the downstream on time processing is not held up by late data.



On Sun, May 31, 2020 at 3:44 AM Jan Lukavský <je...@seznam.cz> wrote:

> Minor self-correction - in the property c) it MIGHT be possible to update
> output watermark to time greater than input watermark, _as long as any
> future element cannot be assigned timestamp that is less than the output
> watermark_. That seems to be the case only for
> TimestampCombiner.END_OF_WINDOW, as that actually does not depend on
> timestamps of the actual elements. This doesn't quite change the reasoning
> below, it might be possible to not store input watermark to watermark hold
> for this combiner, although it would probably have negligible practical
> impact.
> On 5/31/20 12:17 PM, Jan Lukavský wrote:
>
> Hi Reuven,
>
> the asynchronicity of watermark update is what I was missing - it is what
> relates watermarkhold with element output timestamp. On the other hand, we
> have some invariants that have to hold, namely:
>
>  a) element arriving as non-late MUST NOT be changed to late
>
>  b) element arriving as late MIGHT be changed to non-late
>
>  c) operator's output watermark MUST be less than input watermark at any
> time
>
> Properties a) and b) are somewhat natural requirements and property c)
> follows the fact, that it is impossible to exactly predict future.
>
> Now, having these three properties, would it be possible to:
>
>  a) when pane contains both late and on-time elements, split the pane into
> two, containing only late and on-time elements
>
>  b) calculate output timestamp of all panes using timestamp combiner (now
> pane contains only late or on time elements, so no timestamp combiner
> should be able to violate neither of properties a) or b))
>
>  c) calculate when there is pane that contains only late elements, update
> watermark hold to min(current input watermark, window gc time) - so that
> the output watermark can progress up to input watermark (and not violate
> property c) above)
>
> I seems to me that what currently stands in the way is that
>
>  a) panes are not split to late and non-late only (and this might be
> tricky, mostly for combining transforms)
>
>  b) the watermark hold with late-only pane is set to window gc time
> (instead of adding the input watermark as well) - [1]
>
> With TimestampCombiner.LATEST and END_OF_WINDOW it seems that splitting
> the panes would not be required, as the timestamp combiner can only shift
> late elements forward (make use of property b)). TimestampCombiner.EARLIEST
> would probably require splitting the panes, which seems to solve the
> mentioned [BEAM-2262].
>
> WDYT?
>
> [1]
> https://github.com/je-ik/beam/commit/9721d82133c672f4fdca5acfad4d6d3ff0fd256f
> On 5/29/20 5:01 PM, Reuven Lax wrote:
>
> This does seem non intuitive, though I'm not sure what the best approach
> is.
>
> The problem with using currentOutputWatermark as the output timestamp is
> that Beam does not define watermark advancement to be synchronous, and at
> least the Dataflow runner calculates watermarks completely independent of
> bundle processing. This means that the output watermark could advance
> immediately after being checked, which would cause the records output to be
> arbitrarily late. So for example, if allowedLateness is 10 seconds, then
> this trigger will accept a record that is 5 seconds late. However if
> currentOutputWaternark advances by 15 seconds after checking it, then you
> would end up outputting a result that is 15 seconds late and therefore
> would be dropped.
>
> IMO it's most important that on-time elements are never turned into late.
> elements. However the above behavior also seems confusing to users.
>
> Worth noting that I don't think that the current behavior is that much
> better. If the output watermark is close to the end of the window, then I
> think the existing model can also cause this scenario to happen.
>
> Reuven
>
> On Fri, May 29, 2020 at 12:54 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> what seems the most "surprising" to me is that we are using
>> TimestampCombiners to actually do two (orthogonal) things:
>>
>>  a) calculate a watermark hold for a window, so on-time elements emitted
>> from a pane are not late in downstream processing
>>
>>  b) calculate timestamp of elements in output pane
>>
>> These two follow a little different constraints - while in case a) it is
>> not allowed to shift watermark "back in time" in case b) it seems OK to
>> output data with timestamp lower than output watermark (what comes late,
>> might leave late). So, while it seems OK to discard late elements for the
>> sake of calculation output watermark, it seems wrong to discard them when
>> calculating output timestamp. Maybe these two timestamps might be held in
>> different states (the state will be held until GC time for accumulating
>> panes and reset on discarding panes)?
>>
>> Jan
>> On 5/28/20 5:02 PM, David Morávek wrote:
>>
>> Hi,
>>
>> I've came across "unexpected" model behaviour when dealing with late data
>> and custom timestamp combiners. Let's take a following pipeline as an
>> example:
>>
>> final PCollection<String> input = ...;
>> input.apply(
>>       "GlobalWindows",
>>       Window.<String>into(new GlobalWindows())
>>           .triggering(
>>               AfterWatermark.pastEndOfWindow()
>>                   .withEarlyFirings(
>>                       AfterProcessingTime.pastFirstElementInPane()
>>                           .plusDelayOf(Duration.standardSeconds(10))))
>>           .withTimestampCombiner(TimestampCombiner.LATEST)
>>           .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
>>           .accumulatingFiredPanes())
>>   .apply("Aggregate", Count.perElement())
>>
>> The above pipeline emits updates with the latest input timestamp it has
>> seen so far (from non-late elements). We write the output from this
>> timestamp to kafka and read it from another pipeline.
>>
>> Problem comes when we need to handle late elements behind output
>> watermark. In this case beam can not use combined timestamp and uses EOW
>> timestamp instead. Unfortunately this results in downstream pipeline
>> progressing it's input watermark to end of global window. Also if we would
>> use fixed windows after this aggregation, it would yield unexpected results.
>>
>> There is no reasoning about this behaviour in the last section of lateness
>> design doc <https://s.apache.org/beam-lateness> [1], so I'd like to open
>> a discussion about what the expected result should be.
>>
>> My personal opinion is, that correct approach would be emitting late
>> elements with currentOutputWatermark rather than EOW in case of EARLIEST
>> and LATEST timestamp combiners.
>>
>> I've prepared a faling test case for ReduceFnRunner
>> <https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>,
>> if anyone wants to play around with the issue.
>>
>> I also think that BEAM-2262
>> <https://issues.apache.org/jira/browse/BEAM-2262> [2] may be related to
>> this discussion.
>>
>> [1] https://s.apache.org/beam-lateness
>> [2] https://issues.apache.org/jira/browse/BEAM-2262
>> [3]
>> https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b
>>
>> Looking forward to hearing your thoughts.
>>
>> Thanks,
>> D.
>>
>>

Re: dealing with late data output timestamps

Posted by Jan Lukavský <je...@seznam.cz>.
Minor self-correction - in the property c) it MIGHT be possible to 
update output watermark to time greater than input watermark, _as long 
as any future element cannot be assigned timestamp that is less than the 
output watermark_. That seems to be the case only for 
TimestampCombiner.END_OF_WINDOW, as that actually does not depend on 
timestamps of the actual elements. This doesn't quite change the 
reasoning below, it might be possible to not store input watermark to 
watermark hold for this combiner, although it would probably have 
negligible practical impact.

On 5/31/20 12:17 PM, Jan Lukavský wrote:
>
> Hi Reuven,
>
> the asynchronicity of watermark update is what I was missing - it is 
> what relates watermarkhold with element output timestamp. On the other 
> hand, we have some invariants that have to hold, namely:
>
>  a) element arriving as non-late MUST NOT be changed to late
>
>  b) element arriving as late MIGHT be changed to non-late
>
>  c) operator's output watermark MUST be less than input watermark at 
> any time
>
> Properties a) and b) are somewhat natural requirements and property c) 
> follows the fact, that it is impossible to exactly predict future.
>
> Now, having these three properties, would it be possible to:
>
>  a) when pane contains both late and on-time elements, split the pane 
> into two, containing only late and on-time elements
>
>  b) calculate output timestamp of all panes using timestamp combiner 
> (now pane contains only late or on time elements, so no timestamp 
> combiner should be able to violate neither of properties a) or b))
>
>  c) calculate when there is pane that contains only late elements, 
> update watermark hold to min(current input watermark, window gc time) 
> - so that the output watermark can progress up to input watermark (and 
> not violate property c) above)
>
> I seems to me that what currently stands in the way is that
>
>  a) panes are not split to late and non-late only (and this might be 
> tricky, mostly for combining transforms)
>
>  b) the watermark hold with late-only pane is set to window gc time 
> (instead of adding the input watermark as well) - [1]
>
> With TimestampCombiner.LATEST and END_OF_WINDOW it seems that 
> splitting the panes would not be required, as the timestamp combiner 
> can only shift late elements forward (make use of property b)). 
> TimestampCombiner.EARLIEST would probably require splitting the panes, 
> which seems to solve the mentioned [BEAM-2262].
>
> WDYT?
>
> [1] 
> https://github.com/je-ik/beam/commit/9721d82133c672f4fdca5acfad4d6d3ff0fd256f
>
> On 5/29/20 5:01 PM, Reuven Lax wrote:
>> This does seem non intuitive, though I'm not sure what the best 
>> approach is.
>>
>> The problem with using currentOutputWatermark as the output timestamp 
>> is that Beam does not define watermark advancement to be synchronous, 
>> and at least the Dataflow runner calculates watermarks completely 
>> independent of bundle processing. This means that the output 
>> watermark could advance immediately after being checked, which would 
>> cause the records output to be arbitrarily late. So for example, if 
>> allowedLateness is 10 seconds, then this trigger will accept a record 
>> that is 5 seconds late. However if currentOutputWaternark advances by 
>> 15 seconds after checking it, then you would end up outputting a 
>> result that is 15 seconds late and therefore would be dropped.
>>
>> IMO it's most important that on-time elements are never turned into 
>> late. elements. However the above behavior also seems confusing to users.
>>
>> Worth noting that I don't think that the current behavior is that 
>> much better. If the output watermark is close to the end of the 
>> window, then I think the existing model can also cause this scenario 
>> to happen.
>>
>> Reuven
>>
>> On Fri, May 29, 2020 at 12:54 AM Jan Lukavský <je.ik@seznam.cz 
>> <ma...@seznam.cz>> wrote:
>>
>>     Hi,
>>
>>     what seems the most "surprising" to me is that we are using
>>     TimestampCombiners to actually do two (orthogonal) things:
>>
>>      a) calculate a watermark hold for a window, so on-time elements
>>     emitted from a pane are not late in downstream processing
>>
>>      b) calculate timestamp of elements in output pane
>>
>>     These two follow a little different constraints - while in case
>>     a) it is not allowed to shift watermark "back in time" in case b)
>>     it seems OK to output data with timestamp lower than output
>>     watermark (what comes late, might leave late). So, while it seems
>>     OK to discard late elements for the sake of calculation output
>>     watermark, it seems wrong to discard them when calculating output
>>     timestamp. Maybe these two timestamps might be held in different
>>     states (the state will be held until GC time for accumulating
>>     panes and reset on discarding panes)?
>>
>>     Jan
>>
>>     On 5/28/20 5:02 PM, David Morávek wrote:
>>>     Hi,
>>>
>>>     I've came across "unexpected" model behaviour when dealing with
>>>     late data and custom timestamp combiners. Let's take a following
>>>     pipeline as an example:
>>>
>>>     final PCollection<String> input = ...;
>>>     input.apply(
>>>           "GlobalWindows",
>>>           Window.<String>into(new GlobalWindows())
>>>               .triggering(
>>>                   AfterWatermark.pastEndOfWindow()
>>>                       .withEarlyFirings(
>>>     AfterProcessingTime.pastFirstElementInPane()
>>>     .plusDelayOf(Duration.standardSeconds(10))))
>>>     .withTimestampCombiner(TimestampCombiner.LATEST)
>>>     .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
>>>               .accumulatingFiredPanes())
>>>       .apply("Aggregate", Count.perElement())
>>>
>>>     The above pipeline emits updates with the latest input timestamp
>>>     it has seen so far (from non-late elements). We write the output
>>>     from this timestamp to kafka and read it from another pipeline.
>>>
>>>     Problem comes when we need to handle late elements behind output
>>>     watermark. In this case beam can not use combined timestamp and
>>>     uses EOW timestamp instead. Unfortunately this results in
>>>     downstream pipeline progressing it's input watermark to end of
>>>     global window. Also if we would use fixed windows after this
>>>     aggregation, it would yield unexpected results.
>>>
>>>     There is no reasoning about this behaviour in the last section
>>>     of lateness design doc <https://s.apache.org/beam-lateness> [1],
>>>     so I'd like to open a discussion about what the expected result
>>>     should be.
>>>
>>>     My personal opinion is, that correct approach would be emitting
>>>     late elements with currentOutputWatermark rather than EOW in
>>>     case of EARLIEST and LATEST timestamp combiners.
>>>
>>>     I've prepared a faling test case for ReduceFnRunner
>>>     <https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>,
>>>     if anyone wants to play around with the issue.
>>>
>>>     I also think that BEAM-2262
>>>     <https://issues.apache.org/jira/browse/BEAM-2262> [2] may be
>>>     related to this discussion.
>>>
>>>     [1] https://s.apache.org/beam-lateness
>>>     [2] https://issues.apache.org/jira/browse/BEAM-2262
>>>     [3]
>>>     https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b
>>>
>>>     Looking forward to hearing your thoughts.
>>>
>>>     Thanks,
>>>     D.
>>

Re: dealing with late data output timestamps

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Reuven,

the asynchronicity of watermark update is what I was missing - it is 
what relates watermarkhold with element output timestamp. On the other 
hand, we have some invariants that have to hold, namely:

  a) element arriving as non-late MUST NOT be changed to late

  b) element arriving as late MIGHT be changed to non-late

  c) operator's output watermark MUST be less than input watermark at 
any time

Properties a) and b) are somewhat natural requirements and property c) 
follows the fact, that it is impossible to exactly predict future.

Now, having these three properties, would it be possible to:

  a) when pane contains both late and on-time elements, split the pane 
into two, containing only late and on-time elements

  b) calculate output timestamp of all panes using timestamp combiner 
(now pane contains only late or on time elements, so no timestamp 
combiner should be able to violate neither of properties a) or b))

  c) calculate when there is pane that contains only late elements, 
update watermark hold to min(current input watermark, window gc time) - 
so that the output watermark can progress up to input watermark (and not 
violate property c) above)

I seems to me that what currently stands in the way is that

  a) panes are not split to late and non-late only (and this might be 
tricky, mostly for combining transforms)

  b) the watermark hold with late-only pane is set to window gc time 
(instead of adding the input watermark as well) - [1]

With TimestampCombiner.LATEST and END_OF_WINDOW it seems that splitting 
the panes would not be required, as the timestamp combiner can only 
shift late elements forward (make use of property b)). 
TimestampCombiner.EARLIEST would probably require splitting the panes, 
which seems to solve the mentioned [BEAM-2262].

WDYT?

[1] 
https://github.com/je-ik/beam/commit/9721d82133c672f4fdca5acfad4d6d3ff0fd256f

On 5/29/20 5:01 PM, Reuven Lax wrote:
> This does seem non intuitive, though I'm not sure what the best 
> approach is.
>
> The problem with using currentOutputWatermark as the output timestamp 
> is that Beam does not define watermark advancement to be synchronous, 
> and at least the Dataflow runner calculates watermarks completely 
> independent of bundle processing. This means that the output watermark 
> could advance immediately after being checked, which would cause the 
> records output to be arbitrarily late. So for example, if 
> allowedLateness is 10 seconds, then this trigger will accept a record 
> that is 5 seconds late. However if currentOutputWaternark advances by 
> 15 seconds after checking it, then you would end up outputting a 
> result that is 15 seconds late and therefore would be dropped.
>
> IMO it's most important that on-time elements are never turned into 
> late. elements. However the above behavior also seems confusing to users.
>
> Worth noting that I don't think that the current behavior is that much 
> better. If the output watermark is close to the end of the window, 
> then I think the existing model can also cause this scenario to happen.
>
> Reuven
>
> On Fri, May 29, 2020 at 12:54 AM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Hi,
>
>     what seems the most "surprising" to me is that we are using
>     TimestampCombiners to actually do two (orthogonal) things:
>
>      a) calculate a watermark hold for a window, so on-time elements
>     emitted from a pane are not late in downstream processing
>
>      b) calculate timestamp of elements in output pane
>
>     These two follow a little different constraints - while in case a)
>     it is not allowed to shift watermark "back in time" in case b) it
>     seems OK to output data with timestamp lower than output watermark
>     (what comes late, might leave late). So, while it seems OK to
>     discard late elements for the sake of calculation output
>     watermark, it seems wrong to discard them when calculating output
>     timestamp. Maybe these two timestamps might be held in different
>     states (the state will be held until GC time for accumulating
>     panes and reset on discarding panes)?
>
>     Jan
>
>     On 5/28/20 5:02 PM, David Morávek wrote:
>>     Hi,
>>
>>     I've came across "unexpected" model behaviour when dealing with
>>     late data and custom timestamp combiners. Let's take a following
>>     pipeline as an example:
>>
>>     final PCollection<String> input = ...;
>>     input.apply(
>>           "GlobalWindows",
>>           Window.<String>into(new GlobalWindows())
>>               .triggering(
>>                   AfterWatermark.pastEndOfWindow()
>>                       .withEarlyFirings(
>>     AfterProcessingTime.pastFirstElementInPane()
>>     .plusDelayOf(Duration.standardSeconds(10))))
>>     .withTimestampCombiner(TimestampCombiner.LATEST)
>>     .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
>>               .accumulatingFiredPanes())
>>       .apply("Aggregate", Count.perElement())
>>
>>     The above pipeline emits updates with the latest input timestamp
>>     it has seen so far (from non-late elements). We write the output
>>     from this timestamp to kafka and read it from another pipeline.
>>
>>     Problem comes when we need to handle late elements behind output
>>     watermark. In this case beam can not use combined timestamp and
>>     uses EOW timestamp instead. Unfortunately this results in
>>     downstream pipeline progressing it's input watermark to end of
>>     global window. Also if we would use fixed windows after this
>>     aggregation, it would yield unexpected results.
>>
>>     There is no reasoning about this behaviour in the last section of
>>     lateness design doc <https://s.apache.org/beam-lateness> [1], so
>>     I'd like to open a discussion about what the expected result
>>     should be.
>>
>>     My personal opinion is, that correct approach would be emitting
>>     late elements with currentOutputWatermark rather than EOW in case
>>     of EARLIEST and LATEST timestamp combiners.
>>
>>     I've prepared a faling test case for ReduceFnRunner
>>     <https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>,
>>     if anyone wants to play around with the issue.
>>
>>     I also think that BEAM-2262
>>     <https://issues.apache.org/jira/browse/BEAM-2262> [2] may be
>>     related to this discussion.
>>
>>     [1] https://s.apache.org/beam-lateness
>>     [2] https://issues.apache.org/jira/browse/BEAM-2262
>>     [3]
>>     https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b
>>
>>     Looking forward to hearing your thoughts.
>>
>>     Thanks,
>>     D.
>

Re: dealing with late data output timestamps

Posted by Reuven Lax <re...@google.com>.
This does seem non intuitive, though I'm not sure what the best approach is.

The problem with using currentOutputWatermark as the output timestamp is
that Beam does not define watermark advancement to be synchronous, and at
least the Dataflow runner calculates watermarks completely independent of
bundle processing. This means that the output watermark could advance
immediately after being checked, which would cause the records output to be
arbitrarily late. So for example, if allowedLateness is 10 seconds, then
this trigger will accept a record that is 5 seconds late. However if
currentOutputWaternark advances by 15 seconds after checking it, then you
would end up outputting a result that is 15 seconds late and therefore
would be dropped.

IMO it's most important that on-time elements are never turned into late.
elements. However the above behavior also seems confusing to users.

Worth noting that I don't think that the current behavior is that much
better. If the output watermark is close to the end of the window, then I
think the existing model can also cause this scenario to happen.

Reuven

On Fri, May 29, 2020 at 12:54 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> what seems the most "surprising" to me is that we are using
> TimestampCombiners to actually do two (orthogonal) things:
>
>  a) calculate a watermark hold for a window, so on-time elements emitted
> from a pane are not late in downstream processing
>
>  b) calculate timestamp of elements in output pane
>
> These two follow a little different constraints - while in case a) it is
> not allowed to shift watermark "back in time" in case b) it seems OK to
> output data with timestamp lower than output watermark (what comes late,
> might leave late). So, while it seems OK to discard late elements for the
> sake of calculation output watermark, it seems wrong to discard them when
> calculating output timestamp. Maybe these two timestamps might be held in
> different states (the state will be held until GC time for accumulating
> panes and reset on discarding panes)?
>
> Jan
> On 5/28/20 5:02 PM, David Morávek wrote:
>
> Hi,
>
> I've came across "unexpected" model behaviour when dealing with late data
> and custom timestamp combiners. Let's take a following pipeline as an
> example:
>
> final PCollection<String> input = ...;
> input.apply(
>       "GlobalWindows",
>       Window.<String>into(new GlobalWindows())
>           .triggering(
>               AfterWatermark.pastEndOfWindow()
>                   .withEarlyFirings(
>                       AfterProcessingTime.pastFirstElementInPane()
>                           .plusDelayOf(Duration.standardSeconds(10))))
>           .withTimestampCombiner(TimestampCombiner.LATEST)
>           .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
>           .accumulatingFiredPanes())
>   .apply("Aggregate", Count.perElement())
>
> The above pipeline emits updates with the latest input timestamp it has
> seen so far (from non-late elements). We write the output from this
> timestamp to kafka and read it from another pipeline.
>
> Problem comes when we need to handle late elements behind output
> watermark. In this case beam can not use combined timestamp and uses EOW
> timestamp instead. Unfortunately this results in downstream pipeline
> progressing it's input watermark to end of global window. Also if we would
> use fixed windows after this aggregation, it would yield unexpected results.
>
> There is no reasoning about this behaviour in the last section of lateness
> design doc <https://s.apache.org/beam-lateness> [1], so I'd like to open
> a discussion about what the expected result should be.
>
> My personal opinion is, that correct approach would be emitting late
> elements with currentOutputWatermark rather than EOW in case of EARLIEST
> and LATEST timestamp combiners.
>
> I've prepared a faling test case for ReduceFnRunner
> <https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>,
> if anyone wants to play around with the issue.
>
> I also think that BEAM-2262
> <https://issues.apache.org/jira/browse/BEAM-2262> [2] may be related to
> this discussion.
>
> [1] https://s.apache.org/beam-lateness
> [2] https://issues.apache.org/jira/browse/BEAM-2262
> [3]
> https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b
>
> Looking forward to hearing your thoughts.
>
> Thanks,
> D.
>
>

Re: dealing with late data output timestamps

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

what seems the most "surprising" to me is that we are using 
TimestampCombiners to actually do two (orthogonal) things:

  a) calculate a watermark hold for a window, so on-time elements 
emitted from a pane are not late in downstream processing

  b) calculate timestamp of elements in output pane

These two follow a little different constraints - while in case a) it is 
not allowed to shift watermark "back in time" in case b) it seems OK to 
output data with timestamp lower than output watermark (what comes late, 
might leave late). So, while it seems OK to discard late elements for 
the sake of calculation output watermark, it seems wrong to discard them 
when calculating output timestamp. Maybe these two timestamps might be 
held in different states (the state will be held until GC time for 
accumulating panes and reset on discarding panes)?

Jan

On 5/28/20 5:02 PM, David Morávek wrote:
> Hi,
>
> I've came across "unexpected" model behaviour when dealing with late 
> data and custom timestamp combiners. Let's take a following pipeline 
> as an example:
>
> final PCollection<String> input = ...;
> input.apply(
>       "GlobalWindows",
>       Window.<String>into(new GlobalWindows())
>           .triggering(
>               AfterWatermark.pastEndOfWindow()
>                   .withEarlyFirings(
> AfterProcessingTime.pastFirstElementInPane()
> .plusDelayOf(Duration.standardSeconds(10))))
>           .withTimestampCombiner(TimestampCombiner.LATEST)
> .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
>           .accumulatingFiredPanes())
>   .apply("Aggregate", Count.perElement())
>
> The above pipeline emits updates with the latest input timestamp it 
> has seen so far (from non-late elements). We write the output from 
> this timestamp to kafka and read it from another pipeline.
>
> Problem comes when we need to handle late elements behind output 
> watermark. In this case beam can not use combined timestamp and uses 
> EOW timestamp instead. Unfortunately this results in downstream 
> pipeline progressing it's input watermark to end of global window. 
> Also if we would use fixed windows after this aggregation, it would 
> yield unexpected results.
>
> There is no reasoning about this behaviour in the last section of 
> lateness design doc <https://s.apache.org/beam-lateness> [1], so I'd 
> like to open a discussion about what the expected result should be.
>
> My personal opinion is, that correct approach would be emitting late 
> elements with currentOutputWatermark rather than EOW in case of 
> EARLIEST and LATEST timestamp combiners.
>
> I've prepared a faling test case for ReduceFnRunner 
> <https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b>, 
> if anyone wants to play around with the issue.
>
> I also think that BEAM-2262 
> <https://issues.apache.org/jira/browse/BEAM-2262> [2] may be related 
> to this discussion.
>
> [1] https://s.apache.org/beam-lateness
> [2] https://issues.apache.org/jira/browse/BEAM-2262
> [3] 
> https://github.com/dmvk/beam/commit/c93cd26681aa6fbc83c15d2a7a8146287f1e850b
>
> Looking forward to hearing your thoughts.
>
> Thanks,
> D.