You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kenneth Knowles <ke...@apache.org> on 2022/01/18 14:42:33 UTC

Re: Default output timestamp of processing-time timers

On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz <sn...@apache.org> wrote:

> > I think this wouldn't be very robust to different situations where
> processing time and event time may not be that close to each other.
>
> if you do something like `min(endOfWindow, max(eventInputTimestamp,
> computedFiringTimestamp))` the worst case is that you set a watermark hold
> for somewhere in the future, right?  For example, if the watermark is
> lagging 3 hours, processing time = 4pm, event input = 1pm, window end =
> 5pm, the watermark hold/output time is set to 4pm + T.  This would make the
> timestamps "newer" than the input, but shouldn't ever create late data,
> correct?
>
> Also, imo, the timestamps really already cross domains now, because the
> watermark (event time) is held until the (processing time) timer fires.
>
> The concrete issue that brought this up was a pipeline with some state,
> and the state was "cleaned up" periodically with a processing time timer
> that fired every ~hour.  The author of the pipeline was confused why the
> watermark wasn't moving (and thus GBKs firing, etc).  The root cause was
> the watermark being held by the timer.
>
> > It would just save you .withOutputTimestamp(elementTimestamp) on your
> calls to setting the event time timer, right?
>
> Correct, the main thing I'm trying to solve is having to recalculate an
> output timestamp using the same logic that the timer itself is using to set
> its firing timestamp.
>

It sounds like the main use case that you are dealing with is the case
where the timer doesn't actually produce output (or set further timers that
produce output) so it doesn't need (or want) a watermark hold. That makes
sense.

In fact, I do not view a "watermark hold" as a fundamental concept. The act
of "set a timer with the intent that I am allowed to produce output with
timestamp X" is the fundamental concept, and watermark hold is an
implementation detail that should really never have been surfaced as an
end-user concept, or really even as an SDK author concept. This is why in
my proposal for adding output timestamps to timers, I called it
"withOutputTimestamp", and this is why the design does not include any
watermark holds - there is a self-loop on a transform where timers produce
an input watermark distinct from the watermark on input elements, and that
is enough. There is not now, and never has been, a need for the concept of
a hold at the level of the Beam model.

I wonder if we can automate this behavior by noticing that there is no
OutputReceiver parameters to the timer callback, and also transitively. Or
just work around it by saying ".withoutOutput" on the timer.

Kenn


>
>
>
> On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>>
>>
>> On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz <sn...@apache.org> wrote:
>>
>>> If I have a processing time timer, is there any way to automatically set
>>> the output timestamp to the timer firing timestamp (similar to how
>>> event-time timers work).
>>>
>>> A common use case would be to do something like:
>>> timer.offset(X).align(Y).setRelative()
>>>
>>
>>
>> but have the output timestamp be the firing timestamp.  In order to do
>>> this now you need to re-calculate the output timestamp (using the same
>>> logic as the timer does internally) and manually use withOutputTimestamp.
>>
>>
>> I think this wouldn't be very robust to different situations where
>> processing time and event time may not be that close to each other. In
>> general I'm skeptical of reusing timestamps across time domains, for just
>> this sort of reason. I wouldn't recommend doing this manually either.
>>
>>
>>> I'm not sure what the API would look like here, but it would also be
>>> nice to allow event-time timers to do the same in reverse (use the element
>>> input timestamp rather than the firing timestamp).  Maybe something like
>>> `withDefaultOutputTimestampFrom(...)` and an enum of FIRING_TIMESTAMP,
>>> ELEMENT_TIMESTAMP?
>>>
>>
>> It would just save you .withOutputTimestamp(elementTimestamp) on your
>> calls to setting the event time timer, right? It doesn't work in general
>> because a timer can be set from other OnTimer methods, where there is no
>> "element" per se, but just the output timestamp of the fired timer.
>>
>> Kenn
>>
>

Re: Default output timestamp of processing-time timers

Posted by Kenneth Knowles <ke...@apache.org>.
On Wed, Jan 19, 2022 at 1:11 AM Jan Lukavský <je...@seznam.cz> wrote:

> > One note - some people definitely use timer.withOutputTimestamp as a
> watermark hold.
>
> Definitely.
>
> > In fact, I do not view a "watermark hold" as a fundamental concept. The
> act of "set a timer with the intent that I am allowed to produce output
> with timestamp X" is the fundamental concept, and watermark hold is an
> implementation detail that should really never have been surfaced as an
> end-user concept, or really even as an SDK author concept.
>
> Agree that this need not be exposed explicitly, but the given the
> causality-preserving invariant that elements arriving *before* watermark
> *must not* leave after watermark I think that .withOutputTimestamp actually
> defines watermark hold implicitly. I think there is no other valid
> implementation than to hold output watermark not to cross the output
> timestamp of any active per-key timer (actually, we could distinguish cases
> when the timer is set for already late elements, there is no need - or
> possibility - to hold the watermark).
>
> I'd be also supportive for associating any buffer output timestamp with
> timer, rather than the buffer itself, as that really feels like a better
> description of what is *really* going to happen.
>
Is this just a way to connect the state, timer callback, and process
element. I wonder how it looks different or what we could do better with
this information. (I like these sorts of ideas, but I can't think of how it
would be different)

In the case Reuven described, where the timer callback does nothing, there
seems to be a real risk that data is left behind in the buffer when the
watermark hold is released. So you could, for example, have a timer
callback that always must accept the full contents of the buffer, and where
it is obvious to a user that the buffer is cleared after the callback. Like
OnWindowExpiration but OnBufferEviction.

> This was probably discussed, but I cannot see this in this discussion,
> what keeps us from setting output timestamp of processing-time timer to
> something like min(endOfWindow, currentOutputWatermark)? Yes, output
> watermark is not stable, but anything that is derived from _processing
> time_ is not stable by definition. For on-time elements, outputWatermark
> gives an estimation of the current position in event-time, so it makes
> sense to me to use that. Are there any counter examples?
>
This seems OK to me. Certainly the hold should never be based on processing
time.

Kenn

>  Jan
> On 1/18/22 21:10, Kenneth Knowles wrote:
>
> Yea, it makes sense. This is an issue for the global window where there
> isn't automatic cleanup of state. I've had a few user cases where they
> would like a good way of doing state cleanup in the global window too -
> something where whenever state gets buffer there is always a finite timer
> that will fire. There might be an opportunity here, if we attach the hold
> to that associated timer rather than the state. It sounds similar to what
> you describe where someone made a timer just to create a watermark hold
> associated with some state - I assume they actually do need to process and
> emit that state in some way related to the timer.
>
> On Tue, Jan 18, 2022 at 9:35 AM Reuven Lax <re...@google.com> wrote:
>
>> Correct.
>>
>> IIRC originally we didn't want to add "buffered data timestamps"
>> because it was error prone. Leaking even one record in state holds up the
>> watermark and can cause the entire pipeline to grind to a halt. Associating
>> with a timer guarantees that holds are always cleared eventually.
>>
>> On Tue, Jan 18, 2022 at 9:13 AM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> This is an interesting case, and a legitimate counterexample to
>>> consider. I'd call it a workaround :-). The semantic thing they would
>>> want/need is "output timestamp" associated with buffered data (also
>>> implemented with watermark hold). I do know systems that designed their
>>> state with this built in.
>>>
>>> Kenn
>>>
>>> On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> One note - some people definitely use timer.withOutputTimestamp as a
>>>> watermark hold.
>>>>
>>>
>>>> This is a scenario in which one outputs (from processElement) a
>>>> timestamp behind the current input element timestamp but knows that it is
>>>> safe because there is already an extent timer with an earlier
>>>> output timestamp (state can be used for this). In this case I've seen
>>>> timers set simply for the hold - the actual onTimer never outputs anything.
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz <sn...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> > I think this wouldn't be very robust to different situations where
>>>>>> processing time and event time may not be that close to each other.
>>>>>>
>>>>>> if you do something like `min(endOfWindow, max(eventInputTimestamp,
>>>>>> computedFiringTimestamp))` the worst case is that you set a watermark hold
>>>>>> for somewhere in the future, right?  For example, if the watermark is
>>>>>> lagging 3 hours, processing time = 4pm, event input = 1pm, window end =
>>>>>> 5pm, the watermark hold/output time is set to 4pm + T.  This would make the
>>>>>> timestamps "newer" than the input, but shouldn't ever create late data,
>>>>>> correct?
>>>>>>
>>>>>> Also, imo, the timestamps really already cross domains now, because
>>>>>> the watermark (event time) is held until the (processing time) timer fires.
>>>>>>
>>>>>> The concrete issue that brought this up was a pipeline with some
>>>>>> state, and the state was "cleaned up" periodically with a processing time
>>>>>> timer that fired every ~hour.  The author of the pipeline was confused why
>>>>>> the watermark wasn't moving (and thus GBKs firing, etc).  The root cause
>>>>>> was the watermark being held by the timer.
>>>>>>
>>>>>> > It would just save you .withOutputTimestamp(elementTimestamp) on
>>>>>> your calls to setting the event time timer, right?
>>>>>>
>>>>>> Correct, the main thing I'm trying to solve is having to recalculate
>>>>>> an output timestamp using the same logic that the timer itself is using to
>>>>>> set its firing timestamp.
>>>>>>
>>>>>
>>>>> It sounds like the main use case that you are dealing with is the case
>>>>> where the timer doesn't actually produce output (or set further timers that
>>>>> produce output) so it doesn't need (or want) a watermark hold. That makes
>>>>> sense.
>>>>>
>>>>> In fact, I do not view a "watermark hold" as a fundamental concept.
>>>>> The act of "set a timer with the intent that I am allowed to produce output
>>>>> with timestamp X" is the fundamental concept, and watermark hold is an
>>>>> implementation detail that should really never have been surfaced as an
>>>>> end-user concept, or really even as an SDK author concept. This is why in
>>>>> my proposal for adding output timestamps to timers, I called it
>>>>> "withOutputTimestamp", and this is why the design does not include any
>>>>> watermark holds - there is a self-loop on a transform where timers produce
>>>>> an input watermark distinct from the watermark on input elements, and that
>>>>> is enough. There is not now, and never has been, a need for the concept of
>>>>> a hold at the level of the Beam model.
>>>>>
>>>>> I wonder if we can automate this behavior by noticing that there is no
>>>>> OutputReceiver parameters to the timer callback, and also transitively. Or
>>>>> just work around it by saying ".withoutOutput" on the timer.
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles <ke...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz <sn...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> If I have a processing time timer, is there any way to
>>>>>>>> automatically set the output timestamp to the timer firing timestamp
>>>>>>>> (similar to how event-time timers work).
>>>>>>>>
>>>>>>>> A common use case would be to do something like:
>>>>>>>> timer.offset(X).align(Y).setRelative()
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> but have the output timestamp be the firing timestamp.  In order to
>>>>>>>> do this now you need to re-calculate the output timestamp (using the same
>>>>>>>> logic as the timer does internally) and manually use withOutputTimestamp.
>>>>>>>
>>>>>>>
>>>>>>> I think this wouldn't be very robust to different situations where
>>>>>>> processing time and event time may not be that close to each other. In
>>>>>>> general I'm skeptical of reusing timestamps across time domains, for just
>>>>>>> this sort of reason. I wouldn't recommend doing this manually either.
>>>>>>>
>>>>>>>
>>>>>>>> I'm not sure what the API would look like here, but it would also
>>>>>>>> be nice to allow event-time timers to do the same in reverse (use the
>>>>>>>> element input timestamp rather than the firing timestamp).  Maybe something
>>>>>>>> like `withDefaultOutputTimestampFrom(...)` and an enum of FIRING_TIMESTAMP,
>>>>>>>> ELEMENT_TIMESTAMP?
>>>>>>>>
>>>>>>>
>>>>>>> It would just save you .withOutputTimestamp(elementTimestamp) on
>>>>>>> your calls to setting the event time timer, right? It doesn't work in
>>>>>>> general because a timer can be set from other OnTimer methods, where there
>>>>>>> is no "element" per se, but just the output timestamp of the fired timer.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>

Re: Default output timestamp of processing-time timers

Posted by Jan Lukavský <je...@seznam.cz>.
 > One note - some people definitely use timer.withOutputTimestamp as a 
watermark hold.

Definitely.

 > In fact, I do not view a "watermark hold" as a fundamental concept. 
The act of "set a timer with the intent that I am allowed to produce 
output with timestamp X" is the fundamental concept, and watermark hold 
is an implementation detail that should really never have been surfaced 
as an end-user concept, or really even as an SDK author concept.

Agree that this need not be exposed explicitly, but the given the 
causality-preserving invariant that elements arriving *before* watermark 
*must not* leave after watermark I think that .withOutputTimestamp 
actually defines watermark hold implicitly. I think there is no other 
valid implementation than to hold output watermark not to cross the 
output timestamp of any active per-key timer (actually, we could 
distinguish cases when the timer is set for already late elements, there 
is no need - or possibility - to hold the watermark).

I'd be also supportive for associating any buffer output timestamp with 
timer, rather than the buffer itself, as that really feels like a better 
description of what is *really* going to happen.

This was probably discussed, but I cannot see this in this discussion, 
what keeps us from setting output timestamp of processing-time timer to 
something like min(endOfWindow, currentOutputWatermark)? Yes, output 
watermark is not stable, but anything that is derived from _processing 
time_ is not stable by definition. For on-time elements, outputWatermark 
gives an estimation of the current position in event-time, so it makes 
sense to me to use that. Are there any counter examples?

  Jan

On 1/18/22 21:10, Kenneth Knowles wrote:
> Yea, it makes sense. This is an issue for the global window where 
> there isn't automatic cleanup of state. I've had a few user cases 
> where they would like a good way of doing state cleanup in the global 
> window too - something where whenever state gets buffer there is 
> always a finite timer that will fire. There might be an opportunity 
> here, if we attach the hold to that associated timer rather than the 
> state. It sounds similar to what you describe where someone made a 
> timer just to create a watermark hold associated with some state - I 
> assume they actually do need to process and emit that state in some 
> way related to the timer.
>
> On Tue, Jan 18, 2022 at 9:35 AM Reuven Lax <re...@google.com> wrote:
>
>     Correct.
>
>     IIRC originally we didn't want to add "buffered data timestamps"
>     because it was error prone. Leaking even one record in state
>     holds up the watermark and can cause the entire pipeline to grind
>     to a halt. Associating with a timer guarantees that holds are
>     always cleared eventually.
>
>     On Tue, Jan 18, 2022 at 9:13 AM Kenneth Knowles <ke...@apache.org>
>     wrote:
>
>         This is an interesting case, and a legitimate counterexample
>         to consider. I'd call it a workaround :-). The semantic thing
>         they would want/need is "output timestamp" associated with
>         buffered data (also implemented with watermark hold). I do
>         know systems that designed their state with this built in.
>
>         Kenn
>
>         On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax <re...@google.com>
>         wrote:
>
>             One note - some people definitely use
>             timer.withOutputTimestamp as a watermark hold.
>
>
>             This is a scenario in which one outputs (from
>             processElement) a timestamp behind the current input
>             element timestamp but knows that it is safe because there
>             is already an extent timer with an earlier
>             output timestamp (state can be used for this). In this
>             case I've seen timers set simply for the hold - the actual
>             onTimer never outputs anything.
>
>             Reuven
>
>             On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles
>             <ke...@apache.org> wrote:
>
>
>
>                 On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz
>                 <sn...@apache.org> wrote:
>
>                     > I think this wouldn't be very robust to
>                     different situations where processing time and
>                     event time may not be that close to each other.
>
>                     if you do something like `min(endOfWindow,
>                     max(eventInputTimestamp,
>                     computedFiringTimestamp))` the worst case is that
>                     you set a watermark hold for somewhere in the
>                     future, right?  For example, if the watermark is
>                     lagging 3 hours, processing time = 4pm, event
>                     input = 1pm, window end = 5pm, the watermark
>                     hold/output time is set to 4pm + T. This would
>                     make the timestamps "newer" than the input, but
>                     shouldn't ever create late data, correct?
>
>                     Also, imo, the timestamps really already cross
>                     domains now, because the watermark (event time) is
>                     held until the (processing time) timer fires.
>
>                     The concrete issue that brought this up was a
>                     pipeline with some state, and the state was
>                     "cleaned up" periodically with a processing time
>                     timer that fired every ~hour.  The author of the
>                     pipeline was confused why the watermark wasn't
>                     moving (and thus GBKs firing, etc).  The root
>                     cause was the watermark being held by the timer.
>
>                     > It would just save you
>                     .withOutputTimestamp(elementTimestamp) on your
>                     calls to setting the event time timer, right?
>
>                     Correct, the main thing I'm trying to solve is
>                     having to recalculate an output timestamp using
>                     the same logic that the timer itself is using to
>                     set its firing timestamp.
>
>
>                 It sounds like the main use case that you are dealing
>                 with is the case where the timer doesn't actually
>                 produce output (or set further timers that produce
>                 output) so it doesn't need (or want) a watermark hold.
>                 That makes sense.
>
>                 In fact, I do not view a "watermark hold" as a
>                 fundamental concept. The act of "set a timer with the
>                 intent that I am allowed to produce output with
>                 timestamp X" is the fundamental concept, and watermark
>                 hold is an implementation detail that should really
>                 never have been surfaced as an end-user concept, or
>                 really even as an SDK author concept. This is why in
>                 my proposal for adding output timestamps to timers, I
>                 called it "withOutputTimestamp", and this is why the
>                 design does not include any watermark holds - there is
>                 a self-loop on a transform where timers produce an
>                 input watermark distinct from the watermark on input
>                 elements, and that is enough. There is not now, and
>                 never has been, a need for the concept of a hold at
>                 the level of the Beam model.
>
>                 I wonder if we can automate this behavior by noticing
>                 that there is no OutputReceiver parameters to the
>                 timer callback, and also transitively. Or just work
>                 around it by saying ".withoutOutput" on the timer.
>
>                 Kenn
>
>
>
>
>                     On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles
>                     <ke...@apache.org> wrote:
>
>
>
>                         On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz
>                         <sn...@apache.org> wrote:
>
>                             If I have a processing time timer, is
>                             there any way to automatically set the
>                             output timestamp to the timer firing
>                             timestamp (similar to how event-time
>                             timers work).
>
>                             A common use case would be to do something
>                             like:
>                             timer.offset(X).align(Y).setRelative()
>
>                             but have the output timestamp be the
>                             firing timestamp.  In order to do this now
>                             you need to re-calculate the output
>                             timestamp (using the same logic as the
>                             timer does internally) and manually use
>                             withOutputTimestamp.
>
>
>                         I think this wouldn't be very robust to
>                         different situations where processing time and
>                         event time may not be that close to each
>                         other. In general I'm skeptical of reusing
>                         timestamps across time domains, for just this
>                         sort of reason. I wouldn't recommend doing
>                         this manually either.
>
>                             I'm not sure what the API would look like
>                             here, but it would also be nice to allow
>                             event-time timers to do the same in
>                             reverse (use the element input timestamp
>                             rather than the firing timestamp).  Maybe
>                             something like
>                             `withDefaultOutputTimestampFrom(...)` and
>                             an enum of FIRING_TIMESTAMP,
>                             ELEMENT_TIMESTAMP?
>
>
>                         It would just save you
>                         .withOutputTimestamp(elementTimestamp) on your
>                         calls to setting the event time timer, right?
>                         It doesn't work in general because a timer can
>                         be set from other OnTimer methods, where there
>                         is no "element" per se, but just the output
>                         timestamp of the fired timer.
>
>                         Kenn
>

Re: Default output timestamp of processing-time timers

Posted by Kenneth Knowles <ke...@apache.org>.
Yea, it makes sense. This is an issue for the global window where there
isn't automatic cleanup of state. I've had a few user cases where they
would like a good way of doing state cleanup in the global window too -
something where whenever state gets buffer there is always a finite timer
that will fire. There might be an opportunity here, if we attach the hold
to that associated timer rather than the state. It sounds similar to what
you describe where someone made a timer just to create a watermark hold
associated with some state - I assume they actually do need to process and
emit that state in some way related to the timer.

On Tue, Jan 18, 2022 at 9:35 AM Reuven Lax <re...@google.com> wrote:

> Correct.
>
> IIRC originally we didn't want to add "buffered data timestamps"
> because it was error prone. Leaking even one record in state holds up the
> watermark and can cause the entire pipeline to grind to a halt. Associating
> with a timer guarantees that holds are always cleared eventually.
>
> On Tue, Jan 18, 2022 at 9:13 AM Kenneth Knowles <ke...@apache.org> wrote:
>
>> This is an interesting case, and a legitimate counterexample to consider.
>> I'd call it a workaround :-). The semantic thing they would want/need is
>> "output timestamp" associated with buffered data (also implemented with
>> watermark hold). I do know systems that designed their state with this
>> built in.
>>
>> Kenn
>>
>> On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax <re...@google.com> wrote:
>>
>>> One note - some people definitely use timer.withOutputTimestamp as a
>>> watermark hold.
>>>
>>
>>> This is a scenario in which one outputs (from processElement) a
>>> timestamp behind the current input element timestamp but knows that it is
>>> safe because there is already an extent timer with an earlier
>>> output timestamp (state can be used for this). In this case I've seen
>>> timers set simply for the hold - the actual onTimer never outputs anything.
>>>
>>> Reuven
>>>
>>> On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz <sn...@apache.org>
>>>> wrote:
>>>>
>>>>> > I think this wouldn't be very robust to different situations where
>>>>> processing time and event time may not be that close to each other.
>>>>>
>>>>> if you do something like `min(endOfWindow, max(eventInputTimestamp,
>>>>> computedFiringTimestamp))` the worst case is that you set a watermark hold
>>>>> for somewhere in the future, right?  For example, if the watermark is
>>>>> lagging 3 hours, processing time = 4pm, event input = 1pm, window end =
>>>>> 5pm, the watermark hold/output time is set to 4pm + T.  This would make the
>>>>> timestamps "newer" than the input, but shouldn't ever create late data,
>>>>> correct?
>>>>>
>>>>> Also, imo, the timestamps really already cross domains now, because
>>>>> the watermark (event time) is held until the (processing time) timer fires.
>>>>>
>>>>> The concrete issue that brought this up was a pipeline with some
>>>>> state, and the state was "cleaned up" periodically with a processing time
>>>>> timer that fired every ~hour.  The author of the pipeline was confused why
>>>>> the watermark wasn't moving (and thus GBKs firing, etc).  The root cause
>>>>> was the watermark being held by the timer.
>>>>>
>>>>> > It would just save you .withOutputTimestamp(elementTimestamp) on
>>>>> your calls to setting the event time timer, right?
>>>>>
>>>>> Correct, the main thing I'm trying to solve is having to recalculate
>>>>> an output timestamp using the same logic that the timer itself is using to
>>>>> set its firing timestamp.
>>>>>
>>>>
>>>> It sounds like the main use case that you are dealing with is the case
>>>> where the timer doesn't actually produce output (or set further timers that
>>>> produce output) so it doesn't need (or want) a watermark hold. That makes
>>>> sense.
>>>>
>>>> In fact, I do not view a "watermark hold" as a fundamental concept. The
>>>> act of "set a timer with the intent that I am allowed to produce output
>>>> with timestamp X" is the fundamental concept, and watermark hold is an
>>>> implementation detail that should really never have been surfaced as an
>>>> end-user concept, or really even as an SDK author concept. This is why in
>>>> my proposal for adding output timestamps to timers, I called it
>>>> "withOutputTimestamp", and this is why the design does not include any
>>>> watermark holds - there is a self-loop on a transform where timers produce
>>>> an input watermark distinct from the watermark on input elements, and that
>>>> is enough. There is not now, and never has been, a need for the concept of
>>>> a hold at the level of the Beam model.
>>>>
>>>> I wonder if we can automate this behavior by noticing that there is no
>>>> OutputReceiver parameters to the timer callback, and also transitively. Or
>>>> just work around it by saying ".withoutOutput" on the timer.
>>>>
>>>> Kenn
>>>>
>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz <sn...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> If I have a processing time timer, is there any way to automatically
>>>>>>> set the output timestamp to the timer firing timestamp (similar to how
>>>>>>> event-time timers work).
>>>>>>>
>>>>>>> A common use case would be to do something like:
>>>>>>> timer.offset(X).align(Y).setRelative()
>>>>>>>
>>>>>>
>>>>>>
>>>>>> but have the output timestamp be the firing timestamp.  In order to
>>>>>>> do this now you need to re-calculate the output timestamp (using the same
>>>>>>> logic as the timer does internally) and manually use withOutputTimestamp.
>>>>>>
>>>>>>
>>>>>> I think this wouldn't be very robust to different situations where
>>>>>> processing time and event time may not be that close to each other. In
>>>>>> general I'm skeptical of reusing timestamps across time domains, for just
>>>>>> this sort of reason. I wouldn't recommend doing this manually either.
>>>>>>
>>>>>>
>>>>>>> I'm not sure what the API would look like here, but it would also be
>>>>>>> nice to allow event-time timers to do the same in reverse (use the element
>>>>>>> input timestamp rather than the firing timestamp).  Maybe something like
>>>>>>> `withDefaultOutputTimestampFrom(...)` and an enum of FIRING_TIMESTAMP,
>>>>>>> ELEMENT_TIMESTAMP?
>>>>>>>
>>>>>>
>>>>>> It would just save you .withOutputTimestamp(elementTimestamp) on your
>>>>>> calls to setting the event time timer, right? It doesn't work in general
>>>>>> because a timer can be set from other OnTimer methods, where there is no
>>>>>> "element" per se, but just the output timestamp of the fired timer.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>

Re: Default output timestamp of processing-time timers

Posted by Reuven Lax <re...@google.com>.
Correct.

IIRC originally we didn't want to add "buffered data timestamps" because it
was error prone. Leaking even one record in state holds up the watermark
and can cause the entire pipeline to grind to a halt. Associating with a
timer guarantees that holds are always cleared eventually.

On Tue, Jan 18, 2022 at 9:13 AM Kenneth Knowles <ke...@apache.org> wrote:

> This is an interesting case, and a legitimate counterexample to consider.
> I'd call it a workaround :-). The semantic thing they would want/need is
> "output timestamp" associated with buffered data (also implemented with
> watermark hold). I do know systems that designed their state with this
> built in.
>
> Kenn
>
> On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax <re...@google.com> wrote:
>
>> One note - some people definitely use timer.withOutputTimestamp as a
>> watermark hold.
>>
>
>> This is a scenario in which one outputs (from processElement) a timestamp
>> behind the current input element timestamp but knows that it is safe
>> because there is already an extent timer with an earlier output timestamp
>> (state can be used for this). In this case I've seen timers set simply for
>> the hold - the actual onTimer never outputs anything.
>>
>> Reuven
>>
>> On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>>
>>>
>>> On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz <sn...@apache.org>
>>> wrote:
>>>
>>>> > I think this wouldn't be very robust to different situations where
>>>> processing time and event time may not be that close to each other.
>>>>
>>>> if you do something like `min(endOfWindow, max(eventInputTimestamp,
>>>> computedFiringTimestamp))` the worst case is that you set a watermark hold
>>>> for somewhere in the future, right?  For example, if the watermark is
>>>> lagging 3 hours, processing time = 4pm, event input = 1pm, window end =
>>>> 5pm, the watermark hold/output time is set to 4pm + T.  This would make the
>>>> timestamps "newer" than the input, but shouldn't ever create late data,
>>>> correct?
>>>>
>>>> Also, imo, the timestamps really already cross domains now, because the
>>>> watermark (event time) is held until the (processing time) timer fires.
>>>>
>>>> The concrete issue that brought this up was a pipeline with some state,
>>>> and the state was "cleaned up" periodically with a processing time timer
>>>> that fired every ~hour.  The author of the pipeline was confused why the
>>>> watermark wasn't moving (and thus GBKs firing, etc).  The root cause was
>>>> the watermark being held by the timer.
>>>>
>>>> > It would just save you .withOutputTimestamp(elementTimestamp) on your
>>>> calls to setting the event time timer, right?
>>>>
>>>> Correct, the main thing I'm trying to solve is having to recalculate an
>>>> output timestamp using the same logic that the timer itself is using to set
>>>> its firing timestamp.
>>>>
>>>
>>> It sounds like the main use case that you are dealing with is the case
>>> where the timer doesn't actually produce output (or set further timers that
>>> produce output) so it doesn't need (or want) a watermark hold. That makes
>>> sense.
>>>
>>> In fact, I do not view a "watermark hold" as a fundamental concept. The
>>> act of "set a timer with the intent that I am allowed to produce output
>>> with timestamp X" is the fundamental concept, and watermark hold is an
>>> implementation detail that should really never have been surfaced as an
>>> end-user concept, or really even as an SDK author concept. This is why in
>>> my proposal for adding output timestamps to timers, I called it
>>> "withOutputTimestamp", and this is why the design does not include any
>>> watermark holds - there is a self-loop on a transform where timers produce
>>> an input watermark distinct from the watermark on input elements, and that
>>> is enough. There is not now, and never has been, a need for the concept of
>>> a hold at the level of the Beam model.
>>>
>>> I wonder if we can automate this behavior by noticing that there is no
>>> OutputReceiver parameters to the timer callback, and also transitively. Or
>>> just work around it by saying ".withoutOutput" on the timer.
>>>
>>> Kenn
>>>
>>>
>>>>
>>>>
>>>>
>>>> On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz <sn...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> If I have a processing time timer, is there any way to automatically
>>>>>> set the output timestamp to the timer firing timestamp (similar to how
>>>>>> event-time timers work).
>>>>>>
>>>>>> A common use case would be to do something like:
>>>>>> timer.offset(X).align(Y).setRelative()
>>>>>>
>>>>>
>>>>>
>>>>> but have the output timestamp be the firing timestamp.  In order to do
>>>>>> this now you need to re-calculate the output timestamp (using the same
>>>>>> logic as the timer does internally) and manually use withOutputTimestamp.
>>>>>
>>>>>
>>>>> I think this wouldn't be very robust to different situations where
>>>>> processing time and event time may not be that close to each other. In
>>>>> general I'm skeptical of reusing timestamps across time domains, for just
>>>>> this sort of reason. I wouldn't recommend doing this manually either.
>>>>>
>>>>>
>>>>>> I'm not sure what the API would look like here, but it would also be
>>>>>> nice to allow event-time timers to do the same in reverse (use the element
>>>>>> input timestamp rather than the firing timestamp).  Maybe something like
>>>>>> `withDefaultOutputTimestampFrom(...)` and an enum of FIRING_TIMESTAMP,
>>>>>> ELEMENT_TIMESTAMP?
>>>>>>
>>>>>
>>>>> It would just save you .withOutputTimestamp(elementTimestamp) on your
>>>>> calls to setting the event time timer, right? It doesn't work in general
>>>>> because a timer can be set from other OnTimer methods, where there is no
>>>>> "element" per se, but just the output timestamp of the fired timer.
>>>>>
>>>>> Kenn
>>>>>
>>>>

Re: Default output timestamp of processing-time timers

Posted by Kenneth Knowles <ke...@apache.org>.
This is an interesting case, and a legitimate counterexample to consider.
I'd call it a workaround :-). The semantic thing they would want/need is
"output timestamp" associated with buffered data (also implemented with
watermark hold). I do know systems that designed their state with this
built in.

Kenn

On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax <re...@google.com> wrote:

> One note - some people definitely use timer.withOutputTimestamp as a
> watermark hold.
>

> This is a scenario in which one outputs (from processElement) a timestamp
> behind the current input element timestamp but knows that it is safe
> because there is already an extent timer with an earlier output timestamp
> (state can be used for this). In this case I've seen timers set simply for
> the hold - the actual onTimer never outputs anything.
>
> Reuven
>
> On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles <ke...@apache.org> wrote:
>
>>
>>
>> On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz <sn...@apache.org>
>> wrote:
>>
>>> > I think this wouldn't be very robust to different situations where
>>> processing time and event time may not be that close to each other.
>>>
>>> if you do something like `min(endOfWindow, max(eventInputTimestamp,
>>> computedFiringTimestamp))` the worst case is that you set a watermark hold
>>> for somewhere in the future, right?  For example, if the watermark is
>>> lagging 3 hours, processing time = 4pm, event input = 1pm, window end =
>>> 5pm, the watermark hold/output time is set to 4pm + T.  This would make the
>>> timestamps "newer" than the input, but shouldn't ever create late data,
>>> correct?
>>>
>>> Also, imo, the timestamps really already cross domains now, because the
>>> watermark (event time) is held until the (processing time) timer fires.
>>>
>>> The concrete issue that brought this up was a pipeline with some state,
>>> and the state was "cleaned up" periodically with a processing time timer
>>> that fired every ~hour.  The author of the pipeline was confused why the
>>> watermark wasn't moving (and thus GBKs firing, etc).  The root cause was
>>> the watermark being held by the timer.
>>>
>>> > It would just save you .withOutputTimestamp(elementTimestamp) on your
>>> calls to setting the event time timer, right?
>>>
>>> Correct, the main thing I'm trying to solve is having to recalculate an
>>> output timestamp using the same logic that the timer itself is using to set
>>> its firing timestamp.
>>>
>>
>> It sounds like the main use case that you are dealing with is the case
>> where the timer doesn't actually produce output (or set further timers that
>> produce output) so it doesn't need (or want) a watermark hold. That makes
>> sense.
>>
>> In fact, I do not view a "watermark hold" as a fundamental concept. The
>> act of "set a timer with the intent that I am allowed to produce output
>> with timestamp X" is the fundamental concept, and watermark hold is an
>> implementation detail that should really never have been surfaced as an
>> end-user concept, or really even as an SDK author concept. This is why in
>> my proposal for adding output timestamps to timers, I called it
>> "withOutputTimestamp", and this is why the design does not include any
>> watermark holds - there is a self-loop on a transform where timers produce
>> an input watermark distinct from the watermark on input elements, and that
>> is enough. There is not now, and never has been, a need for the concept of
>> a hold at the level of the Beam model.
>>
>> I wonder if we can automate this behavior by noticing that there is no
>> OutputReceiver parameters to the timer callback, and also transitively. Or
>> just work around it by saying ".withoutOutput" on the timer.
>>
>> Kenn
>>
>>
>>>
>>>
>>>
>>> On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz <sn...@apache.org>
>>>> wrote:
>>>>
>>>>> If I have a processing time timer, is there any way to automatically
>>>>> set the output timestamp to the timer firing timestamp (similar to how
>>>>> event-time timers work).
>>>>>
>>>>> A common use case would be to do something like:
>>>>> timer.offset(X).align(Y).setRelative()
>>>>>
>>>>
>>>>
>>>> but have the output timestamp be the firing timestamp.  In order to do
>>>>> this now you need to re-calculate the output timestamp (using the same
>>>>> logic as the timer does internally) and manually use withOutputTimestamp.
>>>>
>>>>
>>>> I think this wouldn't be very robust to different situations where
>>>> processing time and event time may not be that close to each other. In
>>>> general I'm skeptical of reusing timestamps across time domains, for just
>>>> this sort of reason. I wouldn't recommend doing this manually either.
>>>>
>>>>
>>>>> I'm not sure what the API would look like here, but it would also be
>>>>> nice to allow event-time timers to do the same in reverse (use the element
>>>>> input timestamp rather than the firing timestamp).  Maybe something like
>>>>> `withDefaultOutputTimestampFrom(...)` and an enum of FIRING_TIMESTAMP,
>>>>> ELEMENT_TIMESTAMP?
>>>>>
>>>>
>>>> It would just save you .withOutputTimestamp(elementTimestamp) on your
>>>> calls to setting the event time timer, right? It doesn't work in general
>>>> because a timer can be set from other OnTimer methods, where there is no
>>>> "element" per se, but just the output timestamp of the fired timer.
>>>>
>>>> Kenn
>>>>
>>>

Re: Default output timestamp of processing-time timers

Posted by Reuven Lax <re...@google.com>.
One note - some people definitely use timer.withOutputTimestamp as a
watermark hold.

This is a scenario in which one outputs (from processElement) a timestamp
behind the current input element timestamp but knows that it is safe
because there is already an extent timer with an earlier output timestamp
(state can be used for this). In this case I've seen timers set simply for
the hold - the actual onTimer never outputs anything.

Reuven

On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles <ke...@apache.org> wrote:

>
>
> On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz <sn...@apache.org> wrote:
>
>> > I think this wouldn't be very robust to different situations where
>> processing time and event time may not be that close to each other.
>>
>> if you do something like `min(endOfWindow, max(eventInputTimestamp,
>> computedFiringTimestamp))` the worst case is that you set a watermark hold
>> for somewhere in the future, right?  For example, if the watermark is
>> lagging 3 hours, processing time = 4pm, event input = 1pm, window end =
>> 5pm, the watermark hold/output time is set to 4pm + T.  This would make the
>> timestamps "newer" than the input, but shouldn't ever create late data,
>> correct?
>>
>> Also, imo, the timestamps really already cross domains now, because the
>> watermark (event time) is held until the (processing time) timer fires.
>>
>> The concrete issue that brought this up was a pipeline with some state,
>> and the state was "cleaned up" periodically with a processing time timer
>> that fired every ~hour.  The author of the pipeline was confused why the
>> watermark wasn't moving (and thus GBKs firing, etc).  The root cause was
>> the watermark being held by the timer.
>>
>> > It would just save you .withOutputTimestamp(elementTimestamp) on your
>> calls to setting the event time timer, right?
>>
>> Correct, the main thing I'm trying to solve is having to recalculate an
>> output timestamp using the same logic that the timer itself is using to set
>> its firing timestamp.
>>
>
> It sounds like the main use case that you are dealing with is the case
> where the timer doesn't actually produce output (or set further timers that
> produce output) so it doesn't need (or want) a watermark hold. That makes
> sense.
>
> In fact, I do not view a "watermark hold" as a fundamental concept. The
> act of "set a timer with the intent that I am allowed to produce output
> with timestamp X" is the fundamental concept, and watermark hold is an
> implementation detail that should really never have been surfaced as an
> end-user concept, or really even as an SDK author concept. This is why in
> my proposal for adding output timestamps to timers, I called it
> "withOutputTimestamp", and this is why the design does not include any
> watermark holds - there is a self-loop on a transform where timers produce
> an input watermark distinct from the watermark on input elements, and that
> is enough. There is not now, and never has been, a need for the concept of
> a hold at the level of the Beam model.
>
> I wonder if we can automate this behavior by noticing that there is no
> OutputReceiver parameters to the timer callback, and also transitively. Or
> just work around it by saying ".withoutOutput" on the timer.
>
> Kenn
>
>
>>
>>
>>
>> On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>>
>>>
>>> On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz <sn...@apache.org>
>>> wrote:
>>>
>>>> If I have a processing time timer, is there any way to automatically
>>>> set the output timestamp to the timer firing timestamp (similar to how
>>>> event-time timers work).
>>>>
>>>> A common use case would be to do something like:
>>>> timer.offset(X).align(Y).setRelative()
>>>>
>>>
>>>
>>> but have the output timestamp be the firing timestamp.  In order to do
>>>> this now you need to re-calculate the output timestamp (using the same
>>>> logic as the timer does internally) and manually use withOutputTimestamp.
>>>
>>>
>>> I think this wouldn't be very robust to different situations where
>>> processing time and event time may not be that close to each other. In
>>> general I'm skeptical of reusing timestamps across time domains, for just
>>> this sort of reason. I wouldn't recommend doing this manually either.
>>>
>>>
>>>> I'm not sure what the API would look like here, but it would also be
>>>> nice to allow event-time timers to do the same in reverse (use the element
>>>> input timestamp rather than the firing timestamp).  Maybe something like
>>>> `withDefaultOutputTimestampFrom(...)` and an enum of FIRING_TIMESTAMP,
>>>> ELEMENT_TIMESTAMP?
>>>>
>>>
>>> It would just save you .withOutputTimestamp(elementTimestamp) on your
>>> calls to setting the event time timer, right? It doesn't work in general
>>> because a timer can be set from other OnTimer methods, where there is no
>>> "element" per se, but just the output timestamp of the fired timer.
>>>
>>> Kenn
>>>
>>