You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kenneth Knowles <ke...@apache.org> on 2021/02/10 20:14:37 UTC

Can we solve WindowFn.getOutputTime another way?

On a PR (https://github.com/apache/beam/pull/13927) we got into a
discussion of a very old and strange feature of Beam that I think we should
revisit.

The WindowFn has the ability to shift timestamps forward in order to
unblock downstream watermarks. Why? Specifically in this situation:

 - aggregation/GBK with overlapping windows like SlidingWindows
 - timestamp combiner of the aggregated outputs is EARLIEST of the inputs
 - there is another downstream aggregation/GBK

The output watermark of the upstream aggregation is held to the minimum of
the inputs. When an output is emitted, we desire the output to flow through
the rest of the pipeline without delay. However, the downstream aggregation
can (and often will) be delayed by the window size because of watermark
holds in other later windows that are not released until those windows
output.

To avoid this problem, element x in window w will have its timestamp
shifted to not overlap with any earlier windows. It is a weird behavior. It
fixes the watermark hold problem but introduces a strange output with a
mysterious timestamp that is hard to justify.

Any other ideas?

Kenn

Re: Can we solve WindowFn.getOutputTime another way?

Posted by Kenneth Knowles <ke...@apache.org>.
OK. I also prefer the delete option. The main issue that remains is SQL, or
joins in general.

Kenn

On Wed, Feb 17, 2021 at 1:17 PM Robert Bradshaw <ro...@google.com> wrote:

> On Wed, Feb 17, 2021 at 12:30 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Slight correction: you mean default SlidingWindows? This is the only
>> nontrivial implementation I know of.
>>
>
> Yes.
>
>
>> Sessions does not shift the timestamp because you can't really guess how
>> far to shift it. Sessions with EARLIEST just get hung entirely whenever
>> there is a long-lived session.
>>
>> I've thought about it a bit more, and I think the issue is not best
>> solved by a WindowFn. It requires all the data in the WindowingStrategy to
>> know whether the hack is useful.
>>
>
> I agree. The intent is not to have shifted timestamps, it is to have more
> eager data propagation. Shifting timestamps is a (lossy) mechanism to
> achieve this with our current watermark implementation.
>
>
>> So here is a proposal:
>>
>>  - add a TimestampCombiner (aka OutputTime enum in the proto) for
>> EARLIEST_NON_OVERLAPPING
>>  - only call WindowFn#getOutputTime in this case
>>
>
> Or should this be called EARLIEST_SHIFTING_TIMESTAMP or similar
> (as WindowFn#getOutputTime is not restricted to non-overlapping).
>
>
>> This is necessarily a breaking change. Users who are using EARLIEST with
>> SlidingWindows will see a change in behavior. It can be flipped:
>>
>>  - add a TimestampCombiner EARLIEST_XYZ (not sure what to call it) that
>> does not call WindowFn#getOutputTime and implement it
>>  - deprecate EARLIEST but have it keep calling WindowFn#getOutputTime
>>
>> Or third option:
>>
>>  - delete WindowFn#getOutputTime and pretend it never existed.
>> SlidingWindows simply don't work well with EARLIEST. CoGBK joins don't work
>> well with EARLIEST.
>>
>
> I would go with this. Possibly with a transition period in which we
> support an opt-in option for the old behavior on Java on non-portable
> runners. At least until we can figure out what we really want and add it to
> the model.
>
>
>>
>> Kenn
>>
>> On Wed, Feb 17, 2021 at 11:16 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> OK, so to move forward, shall we update the default Sessions to not do
>>> this timestamp shifting, perhaps with a (deprecated) timestamp-shifting
>>> opt-in variant to ease the transition for those that want the old (marked
>>> experimental) behavior?
>>>
>>> On Fri, Feb 12, 2021 at 9:12 AM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>> All of this is right. Things have changed a lot. Nowadays the default
>>>> will work well, and we can caveat to users that EARLIEST may hold up
>>>> downstream output for overlapping windows.
>>>>
>>>> I'm slightly concerned about the fact that EARLIEST is necessary for
>>>> CoGBK joins, unless there is some special consideration why it doesn't
>>>> matter. So I wonder what happens when a pipeline has a few different joins.
>>>>
>>>> Kenn
>>>>
>>>> On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> Yes, unless you manually set the timestamp combiner to earliest, which
>>>>> in this case gives earliest + shifted.
>>>>>
>>>>> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> The default now is end of window, right? Doesn't that alleviate the
>>>>>> problem that the original change was supposed to fix?
>>>>>>
>>>>>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> The default timestamp combiner used to be earliest as well.
>>>>>>>
>>>>>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> IIRC, this was introduced because at the time users complained that
>>>>>>>> sliding windows were virtually unusable for reasonably-sized windows.
>>>>>>>> However this was before we allowed customizing the timestamp combiner, so
>>>>>>>> maybe this is less of a problem now?
>>>>>>>>
>>>>>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <
>>>>>>>> robertwb@google.com> wrote:
>>>>>>>>
>>>>>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <ke...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <aj...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <
>>>>>>>>>>> kenn@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got
>>>>>>>>>>>> into a discussion of a very old and strange feature of Beam that I think we
>>>>>>>>>>>> should revisit.
>>>>>>>>>>>>
>>>>>>>>>>>> The WindowFn has the ability to shift timestamps forward in
>>>>>>>>>>>> order to unblock downstream watermarks. Why? Specifically in this situation:
>>>>>>>>>>>>
>>>>>>>>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>>>>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of
>>>>>>>>>>>> the inputs
>>>>>>>>>>>>  - there is another downstream aggregation/GBK
>>>>>>>>>>>>
>>>>>>>>>>>> The output watermark of the upstream aggregation is held to the
>>>>>>>>>>>> minimum of the inputs. When an output is emitted, we desire the output to
>>>>>>>>>>>> flow through the rest of the pipeline without delay. However, the
>>>>>>>>>>>> downstream aggregation can (and often will) be delayed by the window size
>>>>>>>>>>>> because of *watermark holds in other later windows that are
>>>>>>>>>>>> not released until those windows output.*
>>>>>>>>>>>>
>>>>>>>>>>> Could you describe this a bit more? Why would later windows hold
>>>>>>>>>>> up the watermark for upstream steps. (Is it due to some subtlety? Such as
>>>>>>>>>>> tracking the watermark for each stage, rather than for each step?)
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> It does not have to do with stages/fusion (a runner-specific
>>>>>>>>>> concept) but is a necessity of watermarks being per-PCollection.
>>>>>>>>>>
>>>>>>>>>> Suppose:
>>>>>>>>>>
>>>>>>>>>>  - Default triggering
>>>>>>>>>>  - Timestamp combiner EARLIEST
>>>>>>>>>>  - 60s windows sliding every 10s
>>>>>>>>>>  - An element with timestamp 42
>>>>>>>>>>  - Aggregation (A) with downstream aggregation (B)
>>>>>>>>>>
>>>>>>>>>> Here is what happens:
>>>>>>>>>>
>>>>>>>>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and
>>>>>>>>>> [20, 80) and [30, 90) and [40, 100)
>>>>>>>>>>  - For each of those windows the output watermark hold is set to
>>>>>>>>>> 42 (the element's timestamp)
>>>>>>>>>>  - At time 50 the aggregation (A) over the first window is
>>>>>>>>>> emitted; the other windows remain buffered and held
>>>>>>>>>>  - The element arrives at aggregation (B) and is buffered because
>>>>>>>>>> the input watermark (which is the held output watermark from A) is still
>>>>>>>>>> 42, even though no other data will arrive for that window (WLOG if elements
>>>>>>>>>> from other keys are shuffled in)
>>>>>>>>>>  - The input watermark for aggregation (B) does not advance past
>>>>>>>>>> 42 until the [40, 100) window is fired and releases its watermark hold
>>>>>>>>>>
>>>>>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not
>>>>>>>>>> delayed by the window size, but by the difference in end-of-window
>>>>>>>>>> timestamps to all assigned windows (window size minus slide?)
>>>>>>>>>>
>>>>>>>>>> So to avoid this, what actually happens in Java today is that the
>>>>>>>>>> watermark hold, and output timestamp, is set not to 42 but altered to 50 to
>>>>>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since
>>>>>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an
>>>>>>>>>> important role in CoGBK based joins in SQL, where the iterables are
>>>>>>>>>> re-exploded with timestamps that may be the minimum of input elements. This
>>>>>>>>>> shifting may actually break SQL...
>>>>>>>>>>
>>>>>>>>>> This predated our switch away from "delta from watermark" late
>>>>>>>>>> data dropping to "window expiry" data dropping. So maybe there is some new
>>>>>>>>>> way to set a hold that does not make data late or droppable but still use
>>>>>>>>>> the EARLIEST timestamp. That is my question, for which I have not figured
>>>>>>>>>> out the answer.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is, indeed, a very tough question...
>>>>>>>>>
>>>>>>>>> I'd say this is generally a problem with EARLIEST and non-aligned
>>>>>>>>> windows. E.g. for sessions, a long key can hold up the watermark for all
>>>>>>>>> others. Here we "know" what the hold up is, and can adjust for it. But I
>>>>>>>>> don't think doing this adjustment is the right thing. It would certainly
>>>>>>>>> seem to mess up the timestamp of the outputs from a join. And it's possible
>>>>>>>>> that the values get re-windowed in which case this element should get
>>>>>>>>> joined with itself from a later window (which I'll admit is a bit odd, but
>>>>>>>>> maybe a reflection that multiple-windowing, like multi-firing triggering,
>>>>>>>>> is non-local).
>>>>>>>>>
>>>>>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly
>>>>>>>>> after the input watermark for A passes 50 because no non-late data coming
>>>>>>>>> out of A could influence it. In some sense, the "watermark" for the [-10,
>>>>>>>>> 50) windows has indeed passed, but not that for later windows. I don't
>>>>>>>>> think the beam model requires that we have a single watermark, just that we
>>>>>>>>> fire triggers/timers once we have seen all the on-time data that we think
>>>>>>>>> we could, and a runner could be smart about this.
>>>>>>>>>
>>>>>>>>> We may want to keep the ability to shift timestamps for WindowFns,
>>>>>>>>> but I think we shouldn't be doing so for the default sliding windows.
>>>>>>>>> Correctness (of output timestamps) over latency unless one asks otherwise.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> To avoid this problem, element x in window w will have its
>>>>>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a weird
>>>>>>>>>>>> behavior. It fixes the watermark hold problem but introduces a strange
>>>>>>>>>>>> output with a mysterious timestamp that is hard to justify.
>>>>>>>>>>>>
>>>>>>>>>>>> Any other ideas?
>>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>

Re: Can we solve WindowFn.getOutputTime another way?

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Feb 17, 2021 at 12:30 PM Kenneth Knowles <ke...@apache.org> wrote:

> Slight correction: you mean default SlidingWindows? This is the only
> nontrivial implementation I know of.
>

Yes.


> Sessions does not shift the timestamp because you can't really guess how
> far to shift it. Sessions with EARLIEST just get hung entirely whenever
> there is a long-lived session.
>
> I've thought about it a bit more, and I think the issue is not best solved
> by a WindowFn. It requires all the data in the WindowingStrategy to know
> whether the hack is useful.
>

I agree. The intent is not to have shifted timestamps, it is to have more
eager data propagation. Shifting timestamps is a (lossy) mechanism to
achieve this with our current watermark implementation.


> So here is a proposal:
>
>  - add a TimestampCombiner (aka OutputTime enum in the proto) for
> EARLIEST_NON_OVERLAPPING
>  - only call WindowFn#getOutputTime in this case
>

Or should this be called EARLIEST_SHIFTING_TIMESTAMP or similar
(as WindowFn#getOutputTime is not restricted to non-overlapping).


> This is necessarily a breaking change. Users who are using EARLIEST with
> SlidingWindows will see a change in behavior. It can be flipped:
>
>  - add a TimestampCombiner EARLIEST_XYZ (not sure what to call it) that
> does not call WindowFn#getOutputTime and implement it
>  - deprecate EARLIEST but have it keep calling WindowFn#getOutputTime
>
> Or third option:
>
>  - delete WindowFn#getOutputTime and pretend it never existed.
> SlidingWindows simply don't work well with EARLIEST. CoGBK joins don't work
> well with EARLIEST.
>

I would go with this. Possibly with a transition period in which we support
an opt-in option for the old behavior on Java on non-portable runners. At
least until we can figure out what we really want and add it to the model.


>
> Kenn
>
> On Wed, Feb 17, 2021 at 11:16 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> OK, so to move forward, shall we update the default Sessions to not do
>> this timestamp shifting, perhaps with a (deprecated) timestamp-shifting
>> opt-in variant to ease the transition for those that want the old (marked
>> experimental) behavior?
>>
>> On Fri, Feb 12, 2021 at 9:12 AM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> All of this is right. Things have changed a lot. Nowadays the default
>>> will work well, and we can caveat to users that EARLIEST may hold up
>>> downstream output for overlapping windows.
>>>
>>> I'm slightly concerned about the fact that EARLIEST is necessary for
>>> CoGBK joins, unless there is some special consideration why it doesn't
>>> matter. So I wonder what happens when a pipeline has a few different joins.
>>>
>>> Kenn
>>>
>>> On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> Yes, unless you manually set the timestamp combiner to earliest, which
>>>> in this case gives earliest + shifted.
>>>>
>>>> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> The default now is end of window, right? Doesn't that alleviate the
>>>>> problem that the original change was supposed to fix?
>>>>>
>>>>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> The default timestamp combiner used to be earliest as well.
>>>>>>
>>>>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> IIRC, this was introduced because at the time users complained that
>>>>>>> sliding windows were virtually unusable for reasonably-sized windows.
>>>>>>> However this was before we allowed customizing the timestamp combiner, so
>>>>>>> maybe this is less of a problem now?
>>>>>>>
>>>>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <
>>>>>>> robertwb@google.com> wrote:
>>>>>>>
>>>>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <ke...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <aj...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <ke...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into
>>>>>>>>>>> a discussion of a very old and strange feature of Beam that I think we
>>>>>>>>>>> should revisit.
>>>>>>>>>>>
>>>>>>>>>>> The WindowFn has the ability to shift timestamps forward in
>>>>>>>>>>> order to unblock downstream watermarks. Why? Specifically in this situation:
>>>>>>>>>>>
>>>>>>>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>>>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of
>>>>>>>>>>> the inputs
>>>>>>>>>>>  - there is another downstream aggregation/GBK
>>>>>>>>>>>
>>>>>>>>>>> The output watermark of the upstream aggregation is held to the
>>>>>>>>>>> minimum of the inputs. When an output is emitted, we desire the output to
>>>>>>>>>>> flow through the rest of the pipeline without delay. However, the
>>>>>>>>>>> downstream aggregation can (and often will) be delayed by the window size
>>>>>>>>>>> because of *watermark holds in other later windows that are not
>>>>>>>>>>> released until those windows output.*
>>>>>>>>>>>
>>>>>>>>>> Could you describe this a bit more? Why would later windows hold
>>>>>>>>>> up the watermark for upstream steps. (Is it due to some subtlety? Such as
>>>>>>>>>> tracking the watermark for each stage, rather than for each step?)
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> It does not have to do with stages/fusion (a runner-specific
>>>>>>>>> concept) but is a necessity of watermarks being per-PCollection.
>>>>>>>>>
>>>>>>>>> Suppose:
>>>>>>>>>
>>>>>>>>>  - Default triggering
>>>>>>>>>  - Timestamp combiner EARLIEST
>>>>>>>>>  - 60s windows sliding every 10s
>>>>>>>>>  - An element with timestamp 42
>>>>>>>>>  - Aggregation (A) with downstream aggregation (B)
>>>>>>>>>
>>>>>>>>> Here is what happens:
>>>>>>>>>
>>>>>>>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and
>>>>>>>>> [20, 80) and [30, 90) and [40, 100)
>>>>>>>>>  - For each of those windows the output watermark hold is set to
>>>>>>>>> 42 (the element's timestamp)
>>>>>>>>>  - At time 50 the aggregation (A) over the first window is
>>>>>>>>> emitted; the other windows remain buffered and held
>>>>>>>>>  - The element arrives at aggregation (B) and is buffered because
>>>>>>>>> the input watermark (which is the held output watermark from A) is still
>>>>>>>>> 42, even though no other data will arrive for that window (WLOG if elements
>>>>>>>>> from other keys are shuffled in)
>>>>>>>>>  - The input watermark for aggregation (B) does not advance past
>>>>>>>>> 42 until the [40, 100) window is fired and releases its watermark hold
>>>>>>>>>
>>>>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not
>>>>>>>>> delayed by the window size, but by the difference in end-of-window
>>>>>>>>> timestamps to all assigned windows (window size minus slide?)
>>>>>>>>>
>>>>>>>>> So to avoid this, what actually happens in Java today is that the
>>>>>>>>> watermark hold, and output timestamp, is set not to 42 but altered to 50 to
>>>>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since
>>>>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an
>>>>>>>>> important role in CoGBK based joins in SQL, where the iterables are
>>>>>>>>> re-exploded with timestamps that may be the minimum of input elements. This
>>>>>>>>> shifting may actually break SQL...
>>>>>>>>>
>>>>>>>>> This predated our switch away from "delta from watermark" late
>>>>>>>>> data dropping to "window expiry" data dropping. So maybe there is some new
>>>>>>>>> way to set a hold that does not make data late or droppable but still use
>>>>>>>>> the EARLIEST timestamp. That is my question, for which I have not figured
>>>>>>>>> out the answer.
>>>>>>>>>
>>>>>>>>
>>>>>>>> This is, indeed, a very tough question...
>>>>>>>>
>>>>>>>> I'd say this is generally a problem with EARLIEST and non-aligned
>>>>>>>> windows. E.g. for sessions, a long key can hold up the watermark for all
>>>>>>>> others. Here we "know" what the hold up is, and can adjust for it. But I
>>>>>>>> don't think doing this adjustment is the right thing. It would certainly
>>>>>>>> seem to mess up the timestamp of the outputs from a join. And it's possible
>>>>>>>> that the values get re-windowed in which case this element should get
>>>>>>>> joined with itself from a later window (which I'll admit is a bit odd, but
>>>>>>>> maybe a reflection that multiple-windowing, like multi-firing triggering,
>>>>>>>> is non-local).
>>>>>>>>
>>>>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly
>>>>>>>> after the input watermark for A passes 50 because no non-late data coming
>>>>>>>> out of A could influence it. In some sense, the "watermark" for the [-10,
>>>>>>>> 50) windows has indeed passed, but not that for later windows. I don't
>>>>>>>> think the beam model requires that we have a single watermark, just that we
>>>>>>>> fire triggers/timers once we have seen all the on-time data that we think
>>>>>>>> we could, and a runner could be smart about this.
>>>>>>>>
>>>>>>>> We may want to keep the ability to shift timestamps for WindowFns,
>>>>>>>> but I think we shouldn't be doing so for the default sliding windows.
>>>>>>>> Correctness (of output timestamps) over latency unless one asks otherwise.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> To avoid this problem, element x in window w will have its
>>>>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a weird
>>>>>>>>>>> behavior. It fixes the watermark hold problem but introduces a strange
>>>>>>>>>>> output with a mysterious timestamp that is hard to justify.
>>>>>>>>>>>
>>>>>>>>>>> Any other ideas?
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>

Re: Can we solve WindowFn.getOutputTime another way?

Posted by Kenneth Knowles <ke...@apache.org>.
These are all marked experimental, but they are also all many years old.
However, this is exactly why they are experimental: they aren't really a
good solution that we are totally confident in.

Kenn

On Wed, Feb 17, 2021 at 12:30 PM Kenneth Knowles <ke...@apache.org> wrote:

> Slight correction: you mean default SlidingWindows? This is the only
> nontrivial implementation I know of. Sessions does not shift the timestamp
> because you can't really guess how far to shift it. Sessions with EARLIEST
> just get hung entirely whenever there is a long-lived session.
>
> I've thought about it a bit more, and I think the issue is not best solved
> by a WindowFn. It requires all the data in the WindowingStrategy to know
> whether the hack is useful. So here is a proposal:
>
>  - add a TimestampCombiner (aka OutputTime enum in the proto) for
> EARLIEST_NON_OVERLAPPING
>  - only call WindowFn#getOutputTime in this case
>
> This is necessarily a breaking change. Users who are using EARLIEST with
> SlidingWindows will see a change in behavior. It can be flipped:
>
>  - add a TimestampCombiner EARLIEST_XYZ (not sure what to call it) that
> does not call WindowFn#getOutputTime and implement it
>  - deprecate EARLIEST but have it keep calling WindowFn#getOutputTime
>
> Or third option:
>
>  - delete WindowFn#getOutputTime and pretend it never existed.
> SlidingWindows simply don't work well with EARLIEST. CoGBK joins don't work
> well with EARLIEST.
>
> Kenn
>
> On Wed, Feb 17, 2021 at 11:16 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> OK, so to move forward, shall we update the default Sessions to not do
>> this timestamp shifting, perhaps with a (deprecated) timestamp-shifting
>> opt-in variant to ease the transition for those that want the old (marked
>> experimental) behavior?
>>
>> On Fri, Feb 12, 2021 at 9:12 AM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> All of this is right. Things have changed a lot. Nowadays the default
>>> will work well, and we can caveat to users that EARLIEST may hold up
>>> downstream output for overlapping windows.
>>>
>>> I'm slightly concerned about the fact that EARLIEST is necessary for
>>> CoGBK joins, unless there is some special consideration why it doesn't
>>> matter. So I wonder what happens when a pipeline has a few different joins.
>>>
>>> Kenn
>>>
>>> On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> Yes, unless you manually set the timestamp combiner to earliest, which
>>>> in this case gives earliest + shifted.
>>>>
>>>> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> The default now is end of window, right? Doesn't that alleviate the
>>>>> problem that the original change was supposed to fix?
>>>>>
>>>>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> The default timestamp combiner used to be earliest as well.
>>>>>>
>>>>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> IIRC, this was introduced because at the time users complained that
>>>>>>> sliding windows were virtually unusable for reasonably-sized windows.
>>>>>>> However this was before we allowed customizing the timestamp combiner, so
>>>>>>> maybe this is less of a problem now?
>>>>>>>
>>>>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <
>>>>>>> robertwb@google.com> wrote:
>>>>>>>
>>>>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <ke...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <aj...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <ke...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into
>>>>>>>>>>> a discussion of a very old and strange feature of Beam that I think we
>>>>>>>>>>> should revisit.
>>>>>>>>>>>
>>>>>>>>>>> The WindowFn has the ability to shift timestamps forward in
>>>>>>>>>>> order to unblock downstream watermarks. Why? Specifically in this situation:
>>>>>>>>>>>
>>>>>>>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>>>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of
>>>>>>>>>>> the inputs
>>>>>>>>>>>  - there is another downstream aggregation/GBK
>>>>>>>>>>>
>>>>>>>>>>> The output watermark of the upstream aggregation is held to the
>>>>>>>>>>> minimum of the inputs. When an output is emitted, we desire the output to
>>>>>>>>>>> flow through the rest of the pipeline without delay. However, the
>>>>>>>>>>> downstream aggregation can (and often will) be delayed by the window size
>>>>>>>>>>> because of *watermark holds in other later windows that are not
>>>>>>>>>>> released until those windows output.*
>>>>>>>>>>>
>>>>>>>>>> Could you describe this a bit more? Why would later windows hold
>>>>>>>>>> up the watermark for upstream steps. (Is it due to some subtlety? Such as
>>>>>>>>>> tracking the watermark for each stage, rather than for each step?)
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> It does not have to do with stages/fusion (a runner-specific
>>>>>>>>> concept) but is a necessity of watermarks being per-PCollection.
>>>>>>>>>
>>>>>>>>> Suppose:
>>>>>>>>>
>>>>>>>>>  - Default triggering
>>>>>>>>>  - Timestamp combiner EARLIEST
>>>>>>>>>  - 60s windows sliding every 10s
>>>>>>>>>  - An element with timestamp 42
>>>>>>>>>  - Aggregation (A) with downstream aggregation (B)
>>>>>>>>>
>>>>>>>>> Here is what happens:
>>>>>>>>>
>>>>>>>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and
>>>>>>>>> [20, 80) and [30, 90) and [40, 100)
>>>>>>>>>  - For each of those windows the output watermark hold is set to
>>>>>>>>> 42 (the element's timestamp)
>>>>>>>>>  - At time 50 the aggregation (A) over the first window is
>>>>>>>>> emitted; the other windows remain buffered and held
>>>>>>>>>  - The element arrives at aggregation (B) and is buffered because
>>>>>>>>> the input watermark (which is the held output watermark from A) is still
>>>>>>>>> 42, even though no other data will arrive for that window (WLOG if elements
>>>>>>>>> from other keys are shuffled in)
>>>>>>>>>  - The input watermark for aggregation (B) does not advance past
>>>>>>>>> 42 until the [40, 100) window is fired and releases its watermark hold
>>>>>>>>>
>>>>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not
>>>>>>>>> delayed by the window size, but by the difference in end-of-window
>>>>>>>>> timestamps to all assigned windows (window size minus slide?)
>>>>>>>>>
>>>>>>>>> So to avoid this, what actually happens in Java today is that the
>>>>>>>>> watermark hold, and output timestamp, is set not to 42 but altered to 50 to
>>>>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since
>>>>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an
>>>>>>>>> important role in CoGBK based joins in SQL, where the iterables are
>>>>>>>>> re-exploded with timestamps that may be the minimum of input elements. This
>>>>>>>>> shifting may actually break SQL...
>>>>>>>>>
>>>>>>>>> This predated our switch away from "delta from watermark" late
>>>>>>>>> data dropping to "window expiry" data dropping. So maybe there is some new
>>>>>>>>> way to set a hold that does not make data late or droppable but still use
>>>>>>>>> the EARLIEST timestamp. That is my question, for which I have not figured
>>>>>>>>> out the answer.
>>>>>>>>>
>>>>>>>>
>>>>>>>> This is, indeed, a very tough question...
>>>>>>>>
>>>>>>>> I'd say this is generally a problem with EARLIEST and non-aligned
>>>>>>>> windows. E.g. for sessions, a long key can hold up the watermark for all
>>>>>>>> others. Here we "know" what the hold up is, and can adjust for it. But I
>>>>>>>> don't think doing this adjustment is the right thing. It would certainly
>>>>>>>> seem to mess up the timestamp of the outputs from a join. And it's possible
>>>>>>>> that the values get re-windowed in which case this element should get
>>>>>>>> joined with itself from a later window (which I'll admit is a bit odd, but
>>>>>>>> maybe a reflection that multiple-windowing, like multi-firing triggering,
>>>>>>>> is non-local).
>>>>>>>>
>>>>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly
>>>>>>>> after the input watermark for A passes 50 because no non-late data coming
>>>>>>>> out of A could influence it. In some sense, the "watermark" for the [-10,
>>>>>>>> 50) windows has indeed passed, but not that for later windows. I don't
>>>>>>>> think the beam model requires that we have a single watermark, just that we
>>>>>>>> fire triggers/timers once we have seen all the on-time data that we think
>>>>>>>> we could, and a runner could be smart about this.
>>>>>>>>
>>>>>>>> We may want to keep the ability to shift timestamps for WindowFns,
>>>>>>>> but I think we shouldn't be doing so for the default sliding windows.
>>>>>>>> Correctness (of output timestamps) over latency unless one asks otherwise.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> To avoid this problem, element x in window w will have its
>>>>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a weird
>>>>>>>>>>> behavior. It fixes the watermark hold problem but introduces a strange
>>>>>>>>>>> output with a mysterious timestamp that is hard to justify.
>>>>>>>>>>>
>>>>>>>>>>> Any other ideas?
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>

Re: Can we solve WindowFn.getOutputTime another way?

Posted by Kenneth Knowles <ke...@apache.org>.
Slight correction: you mean default SlidingWindows? This is the only
nontrivial implementation I know of. Sessions does not shift the timestamp
because you can't really guess how far to shift it. Sessions with EARLIEST
just get hung entirely whenever there is a long-lived session.

I've thought about it a bit more, and I think the issue is not best solved
by a WindowFn. It requires all the data in the WindowingStrategy to know
whether the hack is useful. So here is a proposal:

 - add a TimestampCombiner (aka OutputTime enum in the proto) for
EARLIEST_NON_OVERLAPPING
 - only call WindowFn#getOutputTime in this case

This is necessarily a breaking change. Users who are using EARLIEST with
SlidingWindows will see a change in behavior. It can be flipped:

 - add a TimestampCombiner EARLIEST_XYZ (not sure what to call it) that
does not call WindowFn#getOutputTime and implement it
 - deprecate EARLIEST but have it keep calling WindowFn#getOutputTime

Or third option:

 - delete WindowFn#getOutputTime and pretend it never existed.
SlidingWindows simply don't work well with EARLIEST. CoGBK joins don't work
well with EARLIEST.

Kenn

On Wed, Feb 17, 2021 at 11:16 AM Robert Bradshaw <ro...@google.com>
wrote:

> OK, so to move forward, shall we update the default Sessions to not do
> this timestamp shifting, perhaps with a (deprecated) timestamp-shifting
> opt-in variant to ease the transition for those that want the old (marked
> experimental) behavior?
>
> On Fri, Feb 12, 2021 at 9:12 AM Kenneth Knowles <ke...@apache.org> wrote:
>
>> All of this is right. Things have changed a lot. Nowadays the default
>> will work well, and we can caveat to users that EARLIEST may hold up
>> downstream output for overlapping windows.
>>
>> I'm slightly concerned about the fact that EARLIEST is necessary for
>> CoGBK joins, unless there is some special consideration why it doesn't
>> matter. So I wonder what happens when a pipeline has a few different joins.
>>
>> Kenn
>>
>> On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> Yes, unless you manually set the timestamp combiner to earliest, which
>>> in this case gives earliest + shifted.
>>>
>>> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> The default now is end of window, right? Doesn't that alleviate the
>>>> problem that the original change was supposed to fix?
>>>>
>>>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> The default timestamp combiner used to be earliest as well.
>>>>>
>>>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> IIRC, this was introduced because at the time users complained that
>>>>>> sliding windows were virtually unusable for reasonably-sized windows.
>>>>>> However this was before we allowed customizing the timestamp combiner, so
>>>>>> maybe this is less of a problem now?
>>>>>>
>>>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <ke...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <aj...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <ke...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into
>>>>>>>>>> a discussion of a very old and strange feature of Beam that I think we
>>>>>>>>>> should revisit.
>>>>>>>>>>
>>>>>>>>>> The WindowFn has the ability to shift timestamps forward in order
>>>>>>>>>> to unblock downstream watermarks. Why? Specifically in this situation:
>>>>>>>>>>
>>>>>>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of
>>>>>>>>>> the inputs
>>>>>>>>>>  - there is another downstream aggregation/GBK
>>>>>>>>>>
>>>>>>>>>> The output watermark of the upstream aggregation is held to the
>>>>>>>>>> minimum of the inputs. When an output is emitted, we desire the output to
>>>>>>>>>> flow through the rest of the pipeline without delay. However, the
>>>>>>>>>> downstream aggregation can (and often will) be delayed by the window size
>>>>>>>>>> because of *watermark holds in other later windows that are not
>>>>>>>>>> released until those windows output.*
>>>>>>>>>>
>>>>>>>>> Could you describe this a bit more? Why would later windows hold
>>>>>>>>> up the watermark for upstream steps. (Is it due to some subtlety? Such as
>>>>>>>>> tracking the watermark for each stage, rather than for each step?)
>>>>>>>>>
>>>>>>>>
>>>>>>>> It does not have to do with stages/fusion (a runner-specific
>>>>>>>> concept) but is a necessity of watermarks being per-PCollection.
>>>>>>>>
>>>>>>>> Suppose:
>>>>>>>>
>>>>>>>>  - Default triggering
>>>>>>>>  - Timestamp combiner EARLIEST
>>>>>>>>  - 60s windows sliding every 10s
>>>>>>>>  - An element with timestamp 42
>>>>>>>>  - Aggregation (A) with downstream aggregation (B)
>>>>>>>>
>>>>>>>> Here is what happens:
>>>>>>>>
>>>>>>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and
>>>>>>>> [20, 80) and [30, 90) and [40, 100)
>>>>>>>>  - For each of those windows the output watermark hold is set to 42
>>>>>>>> (the element's timestamp)
>>>>>>>>  - At time 50 the aggregation (A) over the first window is emitted;
>>>>>>>> the other windows remain buffered and held
>>>>>>>>  - The element arrives at aggregation (B) and is buffered because
>>>>>>>> the input watermark (which is the held output watermark from A) is still
>>>>>>>> 42, even though no other data will arrive for that window (WLOG if elements
>>>>>>>> from other keys are shuffled in)
>>>>>>>>  - The input watermark for aggregation (B) does not advance past 42
>>>>>>>> until the [40, 100) window is fired and releases its watermark hold
>>>>>>>>
>>>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not
>>>>>>>> delayed by the window size, but by the difference in end-of-window
>>>>>>>> timestamps to all assigned windows (window size minus slide?)
>>>>>>>>
>>>>>>>> So to avoid this, what actually happens in Java today is that the
>>>>>>>> watermark hold, and output timestamp, is set not to 42 but altered to 50 to
>>>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since
>>>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an
>>>>>>>> important role in CoGBK based joins in SQL, where the iterables are
>>>>>>>> re-exploded with timestamps that may be the minimum of input elements. This
>>>>>>>> shifting may actually break SQL...
>>>>>>>>
>>>>>>>> This predated our switch away from "delta from watermark" late data
>>>>>>>> dropping to "window expiry" data dropping. So maybe there is some new way
>>>>>>>> to set a hold that does not make data late or droppable but still use the
>>>>>>>> EARLIEST timestamp. That is my question, for which I have not figured out
>>>>>>>> the answer.
>>>>>>>>
>>>>>>>
>>>>>>> This is, indeed, a very tough question...
>>>>>>>
>>>>>>> I'd say this is generally a problem with EARLIEST and non-aligned
>>>>>>> windows. E.g. for sessions, a long key can hold up the watermark for all
>>>>>>> others. Here we "know" what the hold up is, and can adjust for it. But I
>>>>>>> don't think doing this adjustment is the right thing. It would certainly
>>>>>>> seem to mess up the timestamp of the outputs from a join. And it's possible
>>>>>>> that the values get re-windowed in which case this element should get
>>>>>>> joined with itself from a later window (which I'll admit is a bit odd, but
>>>>>>> maybe a reflection that multiple-windowing, like multi-firing triggering,
>>>>>>> is non-local).
>>>>>>>
>>>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly
>>>>>>> after the input watermark for A passes 50 because no non-late data coming
>>>>>>> out of A could influence it. In some sense, the "watermark" for the [-10,
>>>>>>> 50) windows has indeed passed, but not that for later windows. I don't
>>>>>>> think the beam model requires that we have a single watermark, just that we
>>>>>>> fire triggers/timers once we have seen all the on-time data that we think
>>>>>>> we could, and a runner could be smart about this.
>>>>>>>
>>>>>>> We may want to keep the ability to shift timestamps for WindowFns,
>>>>>>> but I think we shouldn't be doing so for the default sliding windows.
>>>>>>> Correctness (of output timestamps) over latency unless one asks otherwise.
>>>>>>>
>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>> To avoid this problem, element x in window w will have its
>>>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a weird
>>>>>>>>>> behavior. It fixes the watermark hold problem but introduces a strange
>>>>>>>>>> output with a mysterious timestamp that is hard to justify.
>>>>>>>>>>
>>>>>>>>>> Any other ideas?
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>

Re: Can we solve WindowFn.getOutputTime another way?

Posted by Robert Bradshaw <ro...@google.com>.
OK, so to move forward, shall we update the default Sessions to not do this
timestamp shifting, perhaps with a (deprecated) timestamp-shifting opt-in
variant to ease the transition for those that want the old (marked
experimental) behavior?

On Fri, Feb 12, 2021 at 9:12 AM Kenneth Knowles <ke...@apache.org> wrote:

> All of this is right. Things have changed a lot. Nowadays the default will
> work well, and we can caveat to users that EARLIEST may hold up downstream
> output for overlapping windows.
>
> I'm slightly concerned about the fact that EARLIEST is necessary for CoGBK
> joins, unless there is some special consideration why it doesn't matter. So
> I wonder what happens when a pipeline has a few different joins.
>
> Kenn
>
> On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> Yes, unless you manually set the timestamp combiner to earliest, which in
>> this case gives earliest + shifted.
>>
>> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote:
>>
>>> The default now is end of window, right? Doesn't that alleviate the
>>> problem that the original change was supposed to fix?
>>>
>>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> The default timestamp combiner used to be earliest as well.
>>>>
>>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> IIRC, this was introduced because at the time users complained that
>>>>> sliding windows were virtually unusable for reasonably-sized windows.
>>>>> However this was before we allowed customizing the timestamp combiner, so
>>>>> maybe this is less of a problem now?
>>>>>
>>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <ke...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <aj...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <ke...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into a
>>>>>>>>> discussion of a very old and strange feature of Beam that I think we should
>>>>>>>>> revisit.
>>>>>>>>>
>>>>>>>>> The WindowFn has the ability to shift timestamps forward in order
>>>>>>>>> to unblock downstream watermarks. Why? Specifically in this situation:
>>>>>>>>>
>>>>>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of the
>>>>>>>>> inputs
>>>>>>>>>  - there is another downstream aggregation/GBK
>>>>>>>>>
>>>>>>>>> The output watermark of the upstream aggregation is held to the
>>>>>>>>> minimum of the inputs. When an output is emitted, we desire the output to
>>>>>>>>> flow through the rest of the pipeline without delay. However, the
>>>>>>>>> downstream aggregation can (and often will) be delayed by the window size
>>>>>>>>> because of *watermark holds in other later windows that are not
>>>>>>>>> released until those windows output.*
>>>>>>>>>
>>>>>>>> Could you describe this a bit more? Why would later windows hold up
>>>>>>>> the watermark for upstream steps. (Is it due to some subtlety? Such as
>>>>>>>> tracking the watermark for each stage, rather than for each step?)
>>>>>>>>
>>>>>>>
>>>>>>> It does not have to do with stages/fusion (a runner-specific
>>>>>>> concept) but is a necessity of watermarks being per-PCollection.
>>>>>>>
>>>>>>> Suppose:
>>>>>>>
>>>>>>>  - Default triggering
>>>>>>>  - Timestamp combiner EARLIEST
>>>>>>>  - 60s windows sliding every 10s
>>>>>>>  - An element with timestamp 42
>>>>>>>  - Aggregation (A) with downstream aggregation (B)
>>>>>>>
>>>>>>> Here is what happens:
>>>>>>>
>>>>>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and
>>>>>>> [20, 80) and [30, 90) and [40, 100)
>>>>>>>  - For each of those windows the output watermark hold is set to 42
>>>>>>> (the element's timestamp)
>>>>>>>  - At time 50 the aggregation (A) over the first window is emitted;
>>>>>>> the other windows remain buffered and held
>>>>>>>  - The element arrives at aggregation (B) and is buffered because
>>>>>>> the input watermark (which is the held output watermark from A) is still
>>>>>>> 42, even though no other data will arrive for that window (WLOG if elements
>>>>>>> from other keys are shuffled in)
>>>>>>>  - The input watermark for aggregation (B) does not advance past 42
>>>>>>> until the [40, 100) window is fired and releases its watermark hold
>>>>>>>
>>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not
>>>>>>> delayed by the window size, but by the difference in end-of-window
>>>>>>> timestamps to all assigned windows (window size minus slide?)
>>>>>>>
>>>>>>> So to avoid this, what actually happens in Java today is that the
>>>>>>> watermark hold, and output timestamp, is set not to 42 but altered to 50 to
>>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since
>>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an
>>>>>>> important role in CoGBK based joins in SQL, where the iterables are
>>>>>>> re-exploded with timestamps that may be the minimum of input elements. This
>>>>>>> shifting may actually break SQL...
>>>>>>>
>>>>>>> This predated our switch away from "delta from watermark" late data
>>>>>>> dropping to "window expiry" data dropping. So maybe there is some new way
>>>>>>> to set a hold that does not make data late or droppable but still use the
>>>>>>> EARLIEST timestamp. That is my question, for which I have not figured out
>>>>>>> the answer.
>>>>>>>
>>>>>>
>>>>>> This is, indeed, a very tough question...
>>>>>>
>>>>>> I'd say this is generally a problem with EARLIEST and non-aligned
>>>>>> windows. E.g. for sessions, a long key can hold up the watermark for all
>>>>>> others. Here we "know" what the hold up is, and can adjust for it. But I
>>>>>> don't think doing this adjustment is the right thing. It would certainly
>>>>>> seem to mess up the timestamp of the outputs from a join. And it's possible
>>>>>> that the values get re-windowed in which case this element should get
>>>>>> joined with itself from a later window (which I'll admit is a bit odd, but
>>>>>> maybe a reflection that multiple-windowing, like multi-firing triggering,
>>>>>> is non-local).
>>>>>>
>>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly
>>>>>> after the input watermark for A passes 50 because no non-late data coming
>>>>>> out of A could influence it. In some sense, the "watermark" for the [-10,
>>>>>> 50) windows has indeed passed, but not that for later windows. I don't
>>>>>> think the beam model requires that we have a single watermark, just that we
>>>>>> fire triggers/timers once we have seen all the on-time data that we think
>>>>>> we could, and a runner could be smart about this.
>>>>>>
>>>>>> We may want to keep the ability to shift timestamps for WindowFns,
>>>>>> but I think we shouldn't be doing so for the default sliding windows.
>>>>>> Correctness (of output timestamps) over latency unless one asks otherwise.
>>>>>>
>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>> To avoid this problem, element x in window w will have its
>>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a weird
>>>>>>>>> behavior. It fixes the watermark hold problem but introduces a strange
>>>>>>>>> output with a mysterious timestamp that is hard to justify.
>>>>>>>>>
>>>>>>>>> Any other ideas?
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>

Re: Can we solve WindowFn.getOutputTime another way?

Posted by Kenneth Knowles <ke...@apache.org>.
All of this is right. Things have changed a lot. Nowadays the default will
work well, and we can caveat to users that EARLIEST may hold up downstream
output for overlapping windows.

I'm slightly concerned about the fact that EARLIEST is necessary for CoGBK
joins, unless there is some special consideration why it doesn't matter. So
I wonder what happens when a pipeline has a few different joins.

Kenn

On Fri, Feb 12, 2021 at 12:37 AM Robert Bradshaw <ro...@google.com>
wrote:

> Yes, unless you manually set the timestamp combiner to earliest, which in
> this case gives earliest + shifted.
>
> On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote:
>
>> The default now is end of window, right? Doesn't that alleviate the
>> problem that the original change was supposed to fix?
>>
>> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> The default timestamp combiner used to be earliest as well.
>>>
>>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> IIRC, this was introduced because at the time users complained that
>>>> sliding windows were virtually unusable for reasonably-sized windows.
>>>> However this was before we allowed customizing the timestamp combiner, so
>>>> maybe this is less of a problem now?
>>>>
>>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <aj...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <ke...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into a
>>>>>>>> discussion of a very old and strange feature of Beam that I think we should
>>>>>>>> revisit.
>>>>>>>>
>>>>>>>> The WindowFn has the ability to shift timestamps forward in order
>>>>>>>> to unblock downstream watermarks. Why? Specifically in this situation:
>>>>>>>>
>>>>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of the
>>>>>>>> inputs
>>>>>>>>  - there is another downstream aggregation/GBK
>>>>>>>>
>>>>>>>> The output watermark of the upstream aggregation is held to the
>>>>>>>> minimum of the inputs. When an output is emitted, we desire the output to
>>>>>>>> flow through the rest of the pipeline without delay. However, the
>>>>>>>> downstream aggregation can (and often will) be delayed by the window size
>>>>>>>> because of *watermark holds in other later windows that are not
>>>>>>>> released until those windows output.*
>>>>>>>>
>>>>>>> Could you describe this a bit more? Why would later windows hold up
>>>>>>> the watermark for upstream steps. (Is it due to some subtlety? Such as
>>>>>>> tracking the watermark for each stage, rather than for each step?)
>>>>>>>
>>>>>>
>>>>>> It does not have to do with stages/fusion (a runner-specific concept)
>>>>>> but is a necessity of watermarks being per-PCollection.
>>>>>>
>>>>>> Suppose:
>>>>>>
>>>>>>  - Default triggering
>>>>>>  - Timestamp combiner EARLIEST
>>>>>>  - 60s windows sliding every 10s
>>>>>>  - An element with timestamp 42
>>>>>>  - Aggregation (A) with downstream aggregation (B)
>>>>>>
>>>>>> Here is what happens:
>>>>>>
>>>>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and [20,
>>>>>> 80) and [30, 90) and [40, 100)
>>>>>>  - For each of those windows the output watermark hold is set to 42
>>>>>> (the element's timestamp)
>>>>>>  - At time 50 the aggregation (A) over the first window is emitted;
>>>>>> the other windows remain buffered and held
>>>>>>  - The element arrives at aggregation (B) and is buffered because the
>>>>>> input watermark (which is the held output watermark from A) is still 42,
>>>>>> even though no other data will arrive for that window (WLOG if elements
>>>>>> from other keys are shuffled in)
>>>>>>  - The input watermark for aggregation (B) does not advance past 42
>>>>>> until the [40, 100) window is fired and releases its watermark hold
>>>>>>
>>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not delayed
>>>>>> by the window size, but by the difference in end-of-window timestamps to
>>>>>> all assigned windows (window size minus slide?)
>>>>>>
>>>>>> So to avoid this, what actually happens in Java today is that the
>>>>>> watermark hold, and output timestamp, is set not to 42 but altered to 50 to
>>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since
>>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an
>>>>>> important role in CoGBK based joins in SQL, where the iterables are
>>>>>> re-exploded with timestamps that may be the minimum of input elements. This
>>>>>> shifting may actually break SQL...
>>>>>>
>>>>>> This predated our switch away from "delta from watermark" late data
>>>>>> dropping to "window expiry" data dropping. So maybe there is some new way
>>>>>> to set a hold that does not make data late or droppable but still use the
>>>>>> EARLIEST timestamp. That is my question, for which I have not figured out
>>>>>> the answer.
>>>>>>
>>>>>
>>>>> This is, indeed, a very tough question...
>>>>>
>>>>> I'd say this is generally a problem with EARLIEST and non-aligned
>>>>> windows. E.g. for sessions, a long key can hold up the watermark for all
>>>>> others. Here we "know" what the hold up is, and can adjust for it. But I
>>>>> don't think doing this adjustment is the right thing. It would certainly
>>>>> seem to mess up the timestamp of the outputs from a join. And it's possible
>>>>> that the values get re-windowed in which case this element should get
>>>>> joined with itself from a later window (which I'll admit is a bit odd, but
>>>>> maybe a reflection that multiple-windowing, like multi-firing triggering,
>>>>> is non-local).
>>>>>
>>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly
>>>>> after the input watermark for A passes 50 because no non-late data coming
>>>>> out of A could influence it. In some sense, the "watermark" for the [-10,
>>>>> 50) windows has indeed passed, but not that for later windows. I don't
>>>>> think the beam model requires that we have a single watermark, just that we
>>>>> fire triggers/timers once we have seen all the on-time data that we think
>>>>> we could, and a runner could be smart about this.
>>>>>
>>>>> We may want to keep the ability to shift timestamps for WindowFns, but
>>>>> I think we shouldn't be doing so for the default sliding windows.
>>>>> Correctness (of output timestamps) over latency unless one asks otherwise.
>>>>>
>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>> To avoid this problem, element x in window w will have its
>>>>>>>> timestamp shifted to not overlap with any earlier windows. It is a weird
>>>>>>>> behavior. It fixes the watermark hold problem but introduces a strange
>>>>>>>> output with a mysterious timestamp that is hard to justify.
>>>>>>>>
>>>>>>>> Any other ideas?
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>

Re: Can we solve WindowFn.getOutputTime another way?

Posted by Robert Bradshaw <ro...@google.com>.
Yes, unless you manually set the timestamp combiner to earliest, which in
this case gives earliest + shifted.

On Fri, Feb 12, 2021 at 12:33 AM Reuven Lax <re...@google.com> wrote:

> The default now is end of window, right? Doesn't that alleviate the
> problem that the original change was supposed to fix?
>
> On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> The default timestamp combiner used to be earliest as well.
>>
>> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>
>>> IIRC, this was introduced because at the time users complained that
>>> sliding windows were virtually unusable for reasonably-sized windows.
>>> However this was before we allowed customizing the timestamp combiner, so
>>> maybe this is less of a problem now?
>>>
>>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <aj...@google.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <ke...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into a
>>>>>>> discussion of a very old and strange feature of Beam that I think we should
>>>>>>> revisit.
>>>>>>>
>>>>>>> The WindowFn has the ability to shift timestamps forward in order to
>>>>>>> unblock downstream watermarks. Why? Specifically in this situation:
>>>>>>>
>>>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of the
>>>>>>> inputs
>>>>>>>  - there is another downstream aggregation/GBK
>>>>>>>
>>>>>>> The output watermark of the upstream aggregation is held to the
>>>>>>> minimum of the inputs. When an output is emitted, we desire the output to
>>>>>>> flow through the rest of the pipeline without delay. However, the
>>>>>>> downstream aggregation can (and often will) be delayed by the window size
>>>>>>> because of *watermark holds in other later windows that are not
>>>>>>> released until those windows output.*
>>>>>>>
>>>>>> Could you describe this a bit more? Why would later windows hold up
>>>>>> the watermark for upstream steps. (Is it due to some subtlety? Such as
>>>>>> tracking the watermark for each stage, rather than for each step?)
>>>>>>
>>>>>
>>>>> It does not have to do with stages/fusion (a runner-specific concept)
>>>>> but is a necessity of watermarks being per-PCollection.
>>>>>
>>>>> Suppose:
>>>>>
>>>>>  - Default triggering
>>>>>  - Timestamp combiner EARLIEST
>>>>>  - 60s windows sliding every 10s
>>>>>  - An element with timestamp 42
>>>>>  - Aggregation (A) with downstream aggregation (B)
>>>>>
>>>>> Here is what happens:
>>>>>
>>>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and [20,
>>>>> 80) and [30, 90) and [40, 100)
>>>>>  - For each of those windows the output watermark hold is set to 42
>>>>> (the element's timestamp)
>>>>>  - At time 50 the aggregation (A) over the first window is emitted;
>>>>> the other windows remain buffered and held
>>>>>  - The element arrives at aggregation (B) and is buffered because the
>>>>> input watermark (which is the held output watermark from A) is still 42,
>>>>> even though no other data will arrive for that window (WLOG if elements
>>>>> from other keys are shuffled in)
>>>>>  - The input watermark for aggregation (B) does not advance past 42
>>>>> until the [40, 100) window is fired and releases its watermark hold
>>>>>
>>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not delayed
>>>>> by the window size, but by the difference in end-of-window timestamps to
>>>>> all assigned windows (window size minus slide?)
>>>>>
>>>>> So to avoid this, what actually happens in Java today is that the
>>>>> watermark hold, and output timestamp, is set not to 42 but altered to 50 to
>>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since
>>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an
>>>>> important role in CoGBK based joins in SQL, where the iterables are
>>>>> re-exploded with timestamps that may be the minimum of input elements. This
>>>>> shifting may actually break SQL...
>>>>>
>>>>> This predated our switch away from "delta from watermark" late data
>>>>> dropping to "window expiry" data dropping. So maybe there is some new way
>>>>> to set a hold that does not make data late or droppable but still use the
>>>>> EARLIEST timestamp. That is my question, for which I have not figured out
>>>>> the answer.
>>>>>
>>>>
>>>> This is, indeed, a very tough question...
>>>>
>>>> I'd say this is generally a problem with EARLIEST and non-aligned
>>>> windows. E.g. for sessions, a long key can hold up the watermark for all
>>>> others. Here we "know" what the hold up is, and can adjust for it. But I
>>>> don't think doing this adjustment is the right thing. It would certainly
>>>> seem to mess up the timestamp of the outputs from a join. And it's possible
>>>> that the values get re-windowed in which case this element should get
>>>> joined with itself from a later window (which I'll admit is a bit odd, but
>>>> maybe a reflection that multiple-windowing, like multi-firing triggering,
>>>> is non-local).
>>>>
>>>> Logicaly, the reason we want [-10 50) window for B to fire shortly
>>>> after the input watermark for A passes 50 because no non-late data coming
>>>> out of A could influence it. In some sense, the "watermark" for the [-10,
>>>> 50) windows has indeed passed, but not that for later windows. I don't
>>>> think the beam model requires that we have a single watermark, just that we
>>>> fire triggers/timers once we have seen all the on-time data that we think
>>>> we could, and a runner could be smart about this.
>>>>
>>>> We may want to keep the ability to shift timestamps for WindowFns, but
>>>> I think we shouldn't be doing so for the default sliding windows.
>>>> Correctness (of output timestamps) over latency unless one asks otherwise.
>>>>
>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>>
>>>>>>> To avoid this problem, element x in window w will have its timestamp
>>>>>>> shifted to not overlap with any earlier windows. It is a weird behavior. It
>>>>>>> fixes the watermark hold problem but introduces a strange output with a
>>>>>>> mysterious timestamp that is hard to justify.
>>>>>>>
>>>>>>> Any other ideas?
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>

Re: Can we solve WindowFn.getOutputTime another way?

Posted by Reuven Lax <re...@google.com>.
The default now is end of window, right? Doesn't that alleviate the problem
that the original change was supposed to fix?

On Fri, Feb 12, 2021 at 12:25 AM Robert Bradshaw <ro...@google.com>
wrote:

> The default timestamp combiner used to be earliest as well.
>
> On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>
>> IIRC, this was introduced because at the time users complained that
>> sliding windows were virtually unusable for reasonably-sized windows.
>> However this was before we allowed customizing the timestamp combiner, so
>> maybe this is less of a problem now?
>>
>> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <aj...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into a
>>>>>> discussion of a very old and strange feature of Beam that I think we should
>>>>>> revisit.
>>>>>>
>>>>>> The WindowFn has the ability to shift timestamps forward in order to
>>>>>> unblock downstream watermarks. Why? Specifically in this situation:
>>>>>>
>>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of the
>>>>>> inputs
>>>>>>  - there is another downstream aggregation/GBK
>>>>>>
>>>>>> The output watermark of the upstream aggregation is held to the
>>>>>> minimum of the inputs. When an output is emitted, we desire the output to
>>>>>> flow through the rest of the pipeline without delay. However, the
>>>>>> downstream aggregation can (and often will) be delayed by the window size
>>>>>> because of *watermark holds in other later windows that are not
>>>>>> released until those windows output.*
>>>>>>
>>>>> Could you describe this a bit more? Why would later windows hold up
>>>>> the watermark for upstream steps. (Is it due to some subtlety? Such as
>>>>> tracking the watermark for each stage, rather than for each step?)
>>>>>
>>>>
>>>> It does not have to do with stages/fusion (a runner-specific concept)
>>>> but is a necessity of watermarks being per-PCollection.
>>>>
>>>> Suppose:
>>>>
>>>>  - Default triggering
>>>>  - Timestamp combiner EARLIEST
>>>>  - 60s windows sliding every 10s
>>>>  - An element with timestamp 42
>>>>  - Aggregation (A) with downstream aggregation (B)
>>>>
>>>> Here is what happens:
>>>>
>>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and [20,
>>>> 80) and [30, 90) and [40, 100)
>>>>  - For each of those windows the output watermark hold is set to 42
>>>> (the element's timestamp)
>>>>  - At time 50 the aggregation (A) over the first window is emitted; the
>>>> other windows remain buffered and held
>>>>  - The element arrives at aggregation (B) and is buffered because the
>>>> input watermark (which is the held output watermark from A) is still 42,
>>>> even though no other data will arrive for that window (WLOG if elements
>>>> from other keys are shuffled in)
>>>>  - The input watermark for aggregation (B) does not advance past 42
>>>> until the [40, 100) window is fired and releases its watermark hold
>>>>
>>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not delayed
>>>> by the window size, but by the difference in end-of-window timestamps to
>>>> all assigned windows (window size minus slide?)
>>>>
>>>> So to avoid this, what actually happens in Java today is that the
>>>> watermark hold, and output timestamp, is set not to 42 but altered to 50 to
>>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since
>>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an
>>>> important role in CoGBK based joins in SQL, where the iterables are
>>>> re-exploded with timestamps that may be the minimum of input elements. This
>>>> shifting may actually break SQL...
>>>>
>>>> This predated our switch away from "delta from watermark" late data
>>>> dropping to "window expiry" data dropping. So maybe there is some new way
>>>> to set a hold that does not make data late or droppable but still use the
>>>> EARLIEST timestamp. That is my question, for which I have not figured out
>>>> the answer.
>>>>
>>>
>>> This is, indeed, a very tough question...
>>>
>>> I'd say this is generally a problem with EARLIEST and non-aligned
>>> windows. E.g. for sessions, a long key can hold up the watermark for all
>>> others. Here we "know" what the hold up is, and can adjust for it. But I
>>> don't think doing this adjustment is the right thing. It would certainly
>>> seem to mess up the timestamp of the outputs from a join. And it's possible
>>> that the values get re-windowed in which case this element should get
>>> joined with itself from a later window (which I'll admit is a bit odd, but
>>> maybe a reflection that multiple-windowing, like multi-firing triggering,
>>> is non-local).
>>>
>>> Logicaly, the reason we want [-10 50) window for B to fire shortly after
>>> the input watermark for A passes 50 because no non-late data coming out of
>>> A could influence it. In some sense, the "watermark" for the [-10, 50)
>>> windows has indeed passed, but not that for later windows. I don't think
>>> the beam model requires that we have a single watermark, just that we fire
>>> triggers/timers once we have seen all the on-time data that we think we
>>> could, and a runner could be smart about this.
>>>
>>> We may want to keep the ability to shift timestamps for WindowFns, but I
>>> think we shouldn't be doing so for the default sliding windows. Correctness
>>> (of output timestamps) over latency unless one asks otherwise.
>>>
>>>
>>>> Kenn
>>>>
>>>>
>>>>>
>>>>>> To avoid this problem, element x in window w will have its timestamp
>>>>>> shifted to not overlap with any earlier windows. It is a weird behavior. It
>>>>>> fixes the watermark hold problem but introduces a strange output with a
>>>>>> mysterious timestamp that is hard to justify.
>>>>>>
>>>>>> Any other ideas?
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>

Re: Can we solve WindowFn.getOutputTime another way?

Posted by Robert Bradshaw <ro...@google.com>.
The default timestamp combiner used to be earliest as well.

On Fri, Feb 12, 2021 at 12:10 AM Reuven Lax <re...@google.com> wrote:

> IIRC, this was introduced because at the time users complained that
> sliding windows were virtually unusable for reasonably-sized windows.
> However this was before we allowed customizing the timestamp combiner, so
> maybe this is less of a problem now?
>
> On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>>
>>>
>>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <aj...@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into a
>>>>> discussion of a very old and strange feature of Beam that I think we should
>>>>> revisit.
>>>>>
>>>>> The WindowFn has the ability to shift timestamps forward in order to
>>>>> unblock downstream watermarks. Why? Specifically in this situation:
>>>>>
>>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of the
>>>>> inputs
>>>>>  - there is another downstream aggregation/GBK
>>>>>
>>>>> The output watermark of the upstream aggregation is held to the
>>>>> minimum of the inputs. When an output is emitted, we desire the output to
>>>>> flow through the rest of the pipeline without delay. However, the
>>>>> downstream aggregation can (and often will) be delayed by the window size
>>>>> because of *watermark holds in other later windows that are not
>>>>> released until those windows output.*
>>>>>
>>>> Could you describe this a bit more? Why would later windows hold up the
>>>> watermark for upstream steps. (Is it due to some subtlety? Such as tracking
>>>> the watermark for each stage, rather than for each step?)
>>>>
>>>
>>> It does not have to do with stages/fusion (a runner-specific concept)
>>> but is a necessity of watermarks being per-PCollection.
>>>
>>> Suppose:
>>>
>>>  - Default triggering
>>>  - Timestamp combiner EARLIEST
>>>  - 60s windows sliding every 10s
>>>  - An element with timestamp 42
>>>  - Aggregation (A) with downstream aggregation (B)
>>>
>>> Here is what happens:
>>>
>>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and [20,
>>> 80) and [30, 90) and [40, 100)
>>>  - For each of those windows the output watermark hold is set to 42 (the
>>> element's timestamp)
>>>  - At time 50 the aggregation (A) over the first window is emitted; the
>>> other windows remain buffered and held
>>>  - The element arrives at aggregation (B) and is buffered because the
>>> input watermark (which is the held output watermark from A) is still 42,
>>> even though no other data will arrive for that window (WLOG if elements
>>> from other keys are shuffled in)
>>>  - The input watermark for aggregation (B) does not advance past 42
>>> until the [40, 100) window is fired and releases its watermark hold
>>>
>>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not delayed by
>>> the window size, but by the difference in end-of-window timestamps to all
>>> assigned windows (window size minus slide?)
>>>
>>> So to avoid this, what actually happens in Java today is that the
>>> watermark hold, and output timestamp, is set not to 42 but altered to 50 to
>>> not overlap the prior window. Timestamp of 50 is very nonintuitive since
>>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an
>>> important role in CoGBK based joins in SQL, where the iterables are
>>> re-exploded with timestamps that may be the minimum of input elements. This
>>> shifting may actually break SQL...
>>>
>>> This predated our switch away from "delta from watermark" late data
>>> dropping to "window expiry" data dropping. So maybe there is some new way
>>> to set a hold that does not make data late or droppable but still use the
>>> EARLIEST timestamp. That is my question, for which I have not figured out
>>> the answer.
>>>
>>
>> This is, indeed, a very tough question...
>>
>> I'd say this is generally a problem with EARLIEST and non-aligned
>> windows. E.g. for sessions, a long key can hold up the watermark for all
>> others. Here we "know" what the hold up is, and can adjust for it. But I
>> don't think doing this adjustment is the right thing. It would certainly
>> seem to mess up the timestamp of the outputs from a join. And it's possible
>> that the values get re-windowed in which case this element should get
>> joined with itself from a later window (which I'll admit is a bit odd, but
>> maybe a reflection that multiple-windowing, like multi-firing triggering,
>> is non-local).
>>
>> Logicaly, the reason we want [-10 50) window for B to fire shortly after
>> the input watermark for A passes 50 because no non-late data coming out of
>> A could influence it. In some sense, the "watermark" for the [-10, 50)
>> windows has indeed passed, but not that for later windows. I don't think
>> the beam model requires that we have a single watermark, just that we fire
>> triggers/timers once we have seen all the on-time data that we think we
>> could, and a runner could be smart about this.
>>
>> We may want to keep the ability to shift timestamps for WindowFns, but I
>> think we shouldn't be doing so for the default sliding windows. Correctness
>> (of output timestamps) over latency unless one asks otherwise.
>>
>>
>>> Kenn
>>>
>>>
>>>>
>>>>> To avoid this problem, element x in window w will have its timestamp
>>>>> shifted to not overlap with any earlier windows. It is a weird behavior. It
>>>>> fixes the watermark hold problem but introduces a strange output with a
>>>>> mysterious timestamp that is hard to justify.
>>>>>
>>>>> Any other ideas?
>>>>>
>>>>> Kenn
>>>>>
>>>>

Re: Can we solve WindowFn.getOutputTime another way?

Posted by Reuven Lax <re...@google.com>.
IIRC, this was introduced because at the time users complained that sliding
windows were virtually unusable for reasonably-sized windows. However this
was before we allowed customizing the timestamp combiner, so maybe this is
less of a problem now?

On Thu, Feb 11, 2021 at 10:53 PM Robert Bradshaw <ro...@google.com>
wrote:

> On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>>
>>
>> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <aj...@google.com> wrote:
>>
>>>
>>>
>>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <ke...@apache.org>
>>> wrote:
>>>
>>>> On a PR (https://github.com/apache/beam/pull/13927) we got into a
>>>> discussion of a very old and strange feature of Beam that I think we should
>>>> revisit.
>>>>
>>>> The WindowFn has the ability to shift timestamps forward in order to
>>>> unblock downstream watermarks. Why? Specifically in this situation:
>>>>
>>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>>  - timestamp combiner of the aggregated outputs is EARLIEST of the
>>>> inputs
>>>>  - there is another downstream aggregation/GBK
>>>>
>>>> The output watermark of the upstream aggregation is held to the minimum
>>>> of the inputs. When an output is emitted, we desire the output to flow
>>>> through the rest of the pipeline without delay. However, the downstream
>>>> aggregation can (and often will) be delayed by the window size because of *watermark
>>>> holds in other later windows that are not released until those windows
>>>> output.*
>>>>
>>> Could you describe this a bit more? Why would later windows hold up the
>>> watermark for upstream steps. (Is it due to some subtlety? Such as tracking
>>> the watermark for each stage, rather than for each step?)
>>>
>>
>> It does not have to do with stages/fusion (a runner-specific concept) but
>> is a necessity of watermarks being per-PCollection.
>>
>> Suppose:
>>
>>  - Default triggering
>>  - Timestamp combiner EARLIEST
>>  - 60s windows sliding every 10s
>>  - An element with timestamp 42
>>  - Aggregation (A) with downstream aggregation (B)
>>
>> Here is what happens:
>>
>>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and [20, 80)
>> and [30, 90) and [40, 100)
>>  - For each of those windows the output watermark hold is set to 42 (the
>> element's timestamp)
>>  - At time 50 the aggregation (A) over the first window is emitted; the
>> other windows remain buffered and held
>>  - The element arrives at aggregation (B) and is buffered because the
>> input watermark (which is the held output watermark from A) is still 42,
>> even though no other data will arrive for that window (WLOG if elements
>> from other keys are shuffled in)
>>  - The input watermark for aggregation (B) does not advance past 42 until
>> the [40, 100) window is fired and releases its watermark hold
>>
>> It is, indeed, subtle. To me, anyhow. I was wrong - it is not delayed by
>> the window size, but by the difference in end-of-window timestamps to all
>> assigned windows (window size minus slide?)
>>
>> So to avoid this, what actually happens in Java today is that the
>> watermark hold, and output timestamp, is set not to 42 but altered to 50 to
>> not overlap the prior window. Timestamp of 50 is very nonintuitive since
>> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an
>> important role in CoGBK based joins in SQL, where the iterables are
>> re-exploded with timestamps that may be the minimum of input elements. This
>> shifting may actually break SQL...
>>
>> This predated our switch away from "delta from watermark" late data
>> dropping to "window expiry" data dropping. So maybe there is some new way
>> to set a hold that does not make data late or droppable but still use the
>> EARLIEST timestamp. That is my question, for which I have not figured out
>> the answer.
>>
>
> This is, indeed, a very tough question...
>
> I'd say this is generally a problem with EARLIEST and non-aligned windows.
> E.g. for sessions, a long key can hold up the watermark for all others.
> Here we "know" what the hold up is, and can adjust for it. But I don't
> think doing this adjustment is the right thing. It would certainly seem to
> mess up the timestamp of the outputs from a join. And it's possible that
> the values get re-windowed in which case this element should get joined
> with itself from a later window (which I'll admit is a bit odd, but maybe a
> reflection that multiple-windowing, like multi-firing triggering, is
> non-local).
>
> Logicaly, the reason we want [-10 50) window for B to fire shortly after
> the input watermark for A passes 50 because no non-late data coming out of
> A could influence it. In some sense, the "watermark" for the [-10, 50)
> windows has indeed passed, but not that for later windows. I don't think
> the beam model requires that we have a single watermark, just that we fire
> triggers/timers once we have seen all the on-time data that we think we
> could, and a runner could be smart about this.
>
> We may want to keep the ability to shift timestamps for WindowFns, but I
> think we shouldn't be doing so for the default sliding windows. Correctness
> (of output timestamps) over latency unless one asks otherwise.
>
>
>> Kenn
>>
>>
>>>
>>>> To avoid this problem, element x in window w will have its timestamp
>>>> shifted to not overlap with any earlier windows. It is a weird behavior. It
>>>> fixes the watermark hold problem but introduces a strange output with a
>>>> mysterious timestamp that is hard to justify.
>>>>
>>>> Any other ideas?
>>>>
>>>> Kenn
>>>>
>>>

Re: Can we solve WindowFn.getOutputTime another way?

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Feb 10, 2021 at 8:03 PM Kenneth Knowles <ke...@apache.org> wrote:

>
>
> On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <aj...@google.com> wrote:
>
>>
>>
>> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> On a PR (https://github.com/apache/beam/pull/13927) we got into a
>>> discussion of a very old and strange feature of Beam that I think we should
>>> revisit.
>>>
>>> The WindowFn has the ability to shift timestamps forward in order to
>>> unblock downstream watermarks. Why? Specifically in this situation:
>>>
>>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>>  - timestamp combiner of the aggregated outputs is EARLIEST of the inputs
>>>  - there is another downstream aggregation/GBK
>>>
>>> The output watermark of the upstream aggregation is held to the minimum
>>> of the inputs. When an output is emitted, we desire the output to flow
>>> through the rest of the pipeline without delay. However, the downstream
>>> aggregation can (and often will) be delayed by the window size because of *watermark
>>> holds in other later windows that are not released until those windows
>>> output.*
>>>
>> Could you describe this a bit more? Why would later windows hold up the
>> watermark for upstream steps. (Is it due to some subtlety? Such as tracking
>> the watermark for each stage, rather than for each step?)
>>
>
> It does not have to do with stages/fusion (a runner-specific concept) but
> is a necessity of watermarks being per-PCollection.
>
> Suppose:
>
>  - Default triggering
>  - Timestamp combiner EARLIEST
>  - 60s windows sliding every 10s
>  - An element with timestamp 42
>  - Aggregation (A) with downstream aggregation (B)
>
> Here is what happens:
>
>  - The element falls into [-10, 50) and [0, 60) and [10, 70) and [20, 80)
> and [30, 90) and [40, 100)
>  - For each of those windows the output watermark hold is set to 42 (the
> element's timestamp)
>  - At time 50 the aggregation (A) over the first window is emitted; the
> other windows remain buffered and held
>  - The element arrives at aggregation (B) and is buffered because the
> input watermark (which is the held output watermark from A) is still 42,
> even though no other data will arrive for that window (WLOG if elements
> from other keys are shuffled in)
>  - The input watermark for aggregation (B) does not advance past 42 until
> the [40, 100) window is fired and releases its watermark hold
>
> It is, indeed, subtle. To me, anyhow. I was wrong - it is not delayed by
> the window size, but by the difference in end-of-window timestamps to all
> assigned windows (window size minus slide?)
>
> So to avoid this, what actually happens in Java today is that the
> watermark hold, and output timestamp, is set not to 42 but altered to 50 to
> not overlap the prior window. Timestamp of 50 is very nonintuitive since
> you asked for the EARLIEST of input timestamps. EARLIEST combiner plays an
> important role in CoGBK based joins in SQL, where the iterables are
> re-exploded with timestamps that may be the minimum of input elements. This
> shifting may actually break SQL...
>
> This predated our switch away from "delta from watermark" late data
> dropping to "window expiry" data dropping. So maybe there is some new way
> to set a hold that does not make data late or droppable but still use the
> EARLIEST timestamp. That is my question, for which I have not figured out
> the answer.
>

This is, indeed, a very tough question...

I'd say this is generally a problem with EARLIEST and non-aligned windows.
E.g. for sessions, a long key can hold up the watermark for all others.
Here we "know" what the hold up is, and can adjust for it. But I don't
think doing this adjustment is the right thing. It would certainly seem to
mess up the timestamp of the outputs from a join. And it's possible that
the values get re-windowed in which case this element should get joined
with itself from a later window (which I'll admit is a bit odd, but maybe a
reflection that multiple-windowing, like multi-firing triggering, is
non-local).

Logicaly, the reason we want [-10 50) window for B to fire shortly after
the input watermark for A passes 50 because no non-late data coming out of
A could influence it. In some sense, the "watermark" for the [-10, 50)
windows has indeed passed, but not that for later windows. I don't think
the beam model requires that we have a single watermark, just that we fire
triggers/timers once we have seen all the on-time data that we think we
could, and a runner could be smart about this.

We may want to keep the ability to shift timestamps for WindowFns, but I
think we shouldn't be doing so for the default sliding windows. Correctness
(of output timestamps) over latency unless one asks otherwise.


> Kenn
>
>
>>
>>> To avoid this problem, element x in window w will have its timestamp
>>> shifted to not overlap with any earlier windows. It is a weird behavior. It
>>> fixes the watermark hold problem but introduces a strange output with a
>>> mysterious timestamp that is hard to justify.
>>>
>>> Any other ideas?
>>>
>>> Kenn
>>>
>>

Re: Can we solve WindowFn.getOutputTime another way?

Posted by Kenneth Knowles <ke...@apache.org>.
On Wed, Feb 10, 2021 at 2:24 PM Alex Amato <aj...@google.com> wrote:

>
>
> On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> On a PR (https://github.com/apache/beam/pull/13927) we got into a
>> discussion of a very old and strange feature of Beam that I think we should
>> revisit.
>>
>> The WindowFn has the ability to shift timestamps forward in order to
>> unblock downstream watermarks. Why? Specifically in this situation:
>>
>>  - aggregation/GBK with overlapping windows like SlidingWindows
>>  - timestamp combiner of the aggregated outputs is EARLIEST of the inputs
>>  - there is another downstream aggregation/GBK
>>
>> The output watermark of the upstream aggregation is held to the minimum
>> of the inputs. When an output is emitted, we desire the output to flow
>> through the rest of the pipeline without delay. However, the downstream
>> aggregation can (and often will) be delayed by the window size because of *watermark
>> holds in other later windows that are not released until those windows
>> output.*
>>
> Could you describe this a bit more? Why would later windows hold up the
> watermark for upstream steps. (Is it due to some subtlety? Such as tracking
> the watermark for each stage, rather than for each step?)
>

It does not have to do with stages/fusion (a runner-specific concept) but
is a necessity of watermarks being per-PCollection.

Suppose:

 - Default triggering
 - Timestamp combiner EARLIEST
 - 60s windows sliding every 10s
 - An element with timestamp 42
 - Aggregation (A) with downstream aggregation (B)

Here is what happens:

 - The element falls into [-10, 50) and [0, 60) and [10, 70) and [20, 80)
and [30, 90) and [40, 100)
 - For each of those windows the output watermark hold is set to 42 (the
element's timestamp)
 - At time 50 the aggregation (A) over the first window is emitted; the
other windows remain buffered and held
 - The element arrives at aggregation (B) and is buffered because the input
watermark (which is the held output watermark from A) is still 42, even
though no other data will arrive for that window (WLOG if elements from
other keys are shuffled in)
 - The input watermark for aggregation (B) does not advance past 42 until
the [40, 100) window is fired and releases its watermark hold

It is, indeed, subtle. To me, anyhow. I was wrong - it is not delayed by
the window size, but by the difference in end-of-window timestamps to all
assigned windows (window size minus slide?)

So to avoid this, what actually happens in Java today is that the watermark
hold, and output timestamp, is set not to 42 but altered to 50 to not
overlap the prior window. Timestamp of 50 is very nonintuitive since you
asked for the EARLIEST of input timestamps. EARLIEST combiner plays an
important role in CoGBK based joins in SQL, where the iterables are
re-exploded with timestamps that may be the minimum of input elements. This
shifting may actually break SQL...

This predated our switch away from "delta from watermark" late data
dropping to "window expiry" data dropping. So maybe there is some new way
to set a hold that does not make data late or droppable but still use the
EARLIEST timestamp. That is my question, for which I have not figured out
the answer.

Kenn


>
>> To avoid this problem, element x in window w will have its timestamp
>> shifted to not overlap with any earlier windows. It is a weird behavior. It
>> fixes the watermark hold problem but introduces a strange output with a
>> mysterious timestamp that is hard to justify.
>>
>> Any other ideas?
>>
>> Kenn
>>
>

Re: Can we solve WindowFn.getOutputTime another way?

Posted by Alex Amato <aj...@google.com>.
On Wed, Feb 10, 2021 at 12:14 PM Kenneth Knowles <ke...@apache.org> wrote:

> On a PR (https://github.com/apache/beam/pull/13927) we got into a
> discussion of a very old and strange feature of Beam that I think we should
> revisit.
>
> The WindowFn has the ability to shift timestamps forward in order to
> unblock downstream watermarks. Why? Specifically in this situation:
>
>  - aggregation/GBK with overlapping windows like SlidingWindows
>  - timestamp combiner of the aggregated outputs is EARLIEST of the inputs
>  - there is another downstream aggregation/GBK
>
> The output watermark of the upstream aggregation is held to the minimum of
> the inputs. When an output is emitted, we desire the output to flow through
> the rest of the pipeline without delay. However, the downstream aggregation
> can (and often will) be delayed by the window size because of *watermark
> holds in other later windows that are not released until those windows
> output.*
>
Could you describe this a bit more? Why would later windows hold up the
watermark for upstream steps. (Is it due to some subtlety? Such as tracking
the watermark for each stage, rather than for each step?)


>
> To avoid this problem, element x in window w will have its timestamp
> shifted to not overlap with any earlier windows. It is a weird behavior. It
> fixes the watermark hold problem but introduces a strange output with a
> mysterious timestamp that is hard to justify.
>
> Any other ideas?
>
> Kenn
>