You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Maximilian Michels <mx...@apache.org> on 2020/08/11 15:59:52 UTC

Output timestamp for Python event timers

We ran into problems setting event time timers per-element in the Python 
SDK. Pipeline progress would stall.

Turns out, although the Python SDK does not expose the timer output 
timestamp feature to the user, it sets the timer output timestamp to the 
current input timestamp of an element.

This will lead to holding back the watermark until the timer fires (the 
Flink Runner respects the timer output timestamp when advancing the 
output watermark). We had set the fire timestamp to a timestamp so far 
in the future, that pipeline progress would completely stall for 
downstream transforms, due to the held back watermark.

Considering that this feature is not even exposed to the user in the 
Python SDK, I think we should set the default output timestamp to the 
fire timestamp, and not to the input timestamp. This is also how timer 
work in the Java SDK.

Let me know what you think.

-Max

PR: https://github.com/apache/beam/pull/12531

Re: Output timestamp for Python event timers

Posted by Boyuan Zhang <bo...@google.com>.
Thanks for your help! I'll take a look at the PR.

On Wed, Aug 12, 2020 at 2:27 AM Maximilian Michels <mx...@apache.org> wrote:

> Thanks for your suggestions!
>
> It makes sense to complete the work on this feature by exposing it in
> the Python API. We can do this as a next step. (There might be questions
> on how to do that exactly)
>
> For now, I'm concerned with getting the semantics right and unblocking
> users from stalling pipelines.
>
> I wasn't aware that processing timers used the input timestamp as the
> timer output timestamp. I've updated the PR accordingly. Please take a
> look: https://github.com/apache/beam/pull/12531
>
> -Max
>
> On 12.08.20 05:03, Luke Cwik wrote:
> > +1 on what Boyuan said. It is important that the defaults for processing
> > time domain differ from the defaults for the event time domain.
> >
> > On Tue, Aug 11, 2020 at 12:36 PM Yichi Zhang <zyichi@google.com
> > <ma...@google.com>> wrote:
> >
> >     +1 to expose set_output_timestamp and enrich python set timer api.
> >
> >     On Tue, Aug 11, 2020 at 12:01 PM Boyuan Zhang <boyuanz@google.com
> >     <ma...@google.com>> wrote:
> >
> >         Hi Maximilian,
> >
> >         It makes sense to set  hold_timestamp as fire_timestamp when the
> >         fire_timestamp is in the event time domain. Otherwise, the
> >         system may advance the watermark incorrectly.
> >         I think we can do something similar to Java FnApiRunner[1]:
> >
> >           * Expose set_output_timestamp API to python timer as well
> >           * If set_output_timestamp is not specified and timer is in
> >             event domain, we can use fire_timestamp as hold_timestamp
> >           * Otherwise, use input_timestamp as hold_timestamp.
> >
> >         What do you think?
> >
> >         [1]
> >
> https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493
> >
> >
> >
> >
> >         On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels
> >         <mxm@apache.org <ma...@apache.org>> wrote:
> >
> >             We ran into problems setting event time timers per-element
> >             in the Python
> >             SDK. Pipeline progress would stall.
> >
> >             Turns out, although the Python SDK does not expose the timer
> >             output
> >             timestamp feature to the user, it sets the timer output
> >             timestamp to the
> >             current input timestamp of an element.
> >
> >             This will lead to holding back the watermark until the timer
> >             fires (the
> >             Flink Runner respects the timer output timestamp when
> >             advancing the
> >             output watermark). We had set the fire timestamp to a
> >             timestamp so far
> >             in the future, that pipeline progress would completely stall
> >             for
> >             downstream transforms, due to the held back watermark.
> >
> >             Considering that this feature is not even exposed to the
> >             user in the
> >             Python SDK, I think we should set the default output
> >             timestamp to the
> >             fire timestamp, and not to the input timestamp. This is also
> >             how timer
> >             work in the Java SDK.
> >
> >             Let me know what you think.
> >
> >             -Max
> >
> >             PR: https://github.com/apache/beam/pull/12531
> >
>

Re: Output timestamp for Python event timers

Posted by Maximilian Michels <mx...@apache.org>.
Thanks for your suggestions!

It makes sense to complete the work on this feature by exposing it in 
the Python API. We can do this as a next step. (There might be questions 
on how to do that exactly)

For now, I'm concerned with getting the semantics right and unblocking 
users from stalling pipelines.

I wasn't aware that processing timers used the input timestamp as the 
timer output timestamp. I've updated the PR accordingly. Please take a 
look: https://github.com/apache/beam/pull/12531

-Max

On 12.08.20 05:03, Luke Cwik wrote:
> +1 on what Boyuan said. It is important that the defaults for processing 
> time domain differ from the defaults for the event time domain.
> 
> On Tue, Aug 11, 2020 at 12:36 PM Yichi Zhang <zyichi@google.com 
> <ma...@google.com>> wrote:
> 
>     +1 to expose set_output_timestamp and enrich python set timer api.
> 
>     On Tue, Aug 11, 2020 at 12:01 PM Boyuan Zhang <boyuanz@google.com
>     <ma...@google.com>> wrote:
> 
>         Hi Maximilian,
> 
>         It makes sense to set  hold_timestamp as fire_timestamp when the
>         fire_timestamp is in the event time domain. Otherwise, the
>         system may advance the watermark incorrectly.
>         I think we can do something similar to Java FnApiRunner[1]:
> 
>           * Expose set_output_timestamp API to python timer as well
>           * If set_output_timestamp is not specified and timer is in
>             event domain, we can use fire_timestamp as hold_timestamp
>           * Otherwise, use input_timestamp as hold_timestamp.
> 
>         What do you think?
> 
>         [1]
>         https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493
> 
> 
> 
> 
>         On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels
>         <mxm@apache.org <ma...@apache.org>> wrote:
> 
>             We ran into problems setting event time timers per-element
>             in the Python
>             SDK. Pipeline progress would stall.
> 
>             Turns out, although the Python SDK does not expose the timer
>             output
>             timestamp feature to the user, it sets the timer output
>             timestamp to the
>             current input timestamp of an element.
> 
>             This will lead to holding back the watermark until the timer
>             fires (the
>             Flink Runner respects the timer output timestamp when
>             advancing the
>             output watermark). We had set the fire timestamp to a
>             timestamp so far
>             in the future, that pipeline progress would completely stall
>             for
>             downstream transforms, due to the held back watermark.
> 
>             Considering that this feature is not even exposed to the
>             user in the
>             Python SDK, I think we should set the default output
>             timestamp to the
>             fire timestamp, and not to the input timestamp. This is also
>             how timer
>             work in the Java SDK.
> 
>             Let me know what you think.
> 
>             -Max
> 
>             PR: https://github.com/apache/beam/pull/12531
> 

Re: Output timestamp for Python event timers

Posted by Luke Cwik <lc...@google.com>.
+1 on what Boyuan said. It is important that the defaults for processing
time domain differ from the defaults for the event time domain.

On Tue, Aug 11, 2020 at 12:36 PM Yichi Zhang <zy...@google.com> wrote:

> +1 to expose set_output_timestamp and enrich python set timer api.
>
> On Tue, Aug 11, 2020 at 12:01 PM Boyuan Zhang <bo...@google.com> wrote:
>
>> Hi Maximilian,
>>
>> It makes sense to set  hold_timestamp as fire_timestamp when the
>> fire_timestamp is in the event time domain. Otherwise, the system may
>> advance the watermark incorrectly.
>> I think we can do something similar to Java FnApiRunner[1]:
>>
>>    - Expose set_output_timestamp API to python timer as well
>>    - If set_output_timestamp is not specified and timer is in event
>>    domain, we can use fire_timestamp as hold_timestamp
>>    - Otherwise, use input_timestamp as hold_timestamp.
>>
>> What do you think?
>>
>> [1]
>> https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493
>>
>>
>>
>>
>> On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> We ran into problems setting event time timers per-element in the Python
>>> SDK. Pipeline progress would stall.
>>>
>>> Turns out, although the Python SDK does not expose the timer output
>>> timestamp feature to the user, it sets the timer output timestamp to the
>>> current input timestamp of an element.
>>>
>>> This will lead to holding back the watermark until the timer fires (the
>>> Flink Runner respects the timer output timestamp when advancing the
>>> output watermark). We had set the fire timestamp to a timestamp so far
>>> in the future, that pipeline progress would completely stall for
>>> downstream transforms, due to the held back watermark.
>>>
>>> Considering that this feature is not even exposed to the user in the
>>> Python SDK, I think we should set the default output timestamp to the
>>> fire timestamp, and not to the input timestamp. This is also how timer
>>> work in the Java SDK.
>>>
>>> Let me know what you think.
>>>
>>> -Max
>>>
>>> PR: https://github.com/apache/beam/pull/12531
>>>
>>

Re: Output timestamp for Python event timers

Posted by Yichi Zhang <zy...@google.com>.
+1 to expose set_output_timestamp and enrich python set timer api.

On Tue, Aug 11, 2020 at 12:01 PM Boyuan Zhang <bo...@google.com> wrote:

> Hi Maximilian,
>
> It makes sense to set  hold_timestamp as fire_timestamp when the
> fire_timestamp is in the event time domain. Otherwise, the system may
> advance the watermark incorrectly.
> I think we can do something similar to Java FnApiRunner[1]:
>
>    - Expose set_output_timestamp API to python timer as well
>    - If set_output_timestamp is not specified and timer is in event
>    domain, we can use fire_timestamp as hold_timestamp
>    - Otherwise, use input_timestamp as hold_timestamp.
>
> What do you think?
>
> [1]
> https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493
>
>
>
>
> On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> We ran into problems setting event time timers per-element in the Python
>> SDK. Pipeline progress would stall.
>>
>> Turns out, although the Python SDK does not expose the timer output
>> timestamp feature to the user, it sets the timer output timestamp to the
>> current input timestamp of an element.
>>
>> This will lead to holding back the watermark until the timer fires (the
>> Flink Runner respects the timer output timestamp when advancing the
>> output watermark). We had set the fire timestamp to a timestamp so far
>> in the future, that pipeline progress would completely stall for
>> downstream transforms, due to the held back watermark.
>>
>> Considering that this feature is not even exposed to the user in the
>> Python SDK, I think we should set the default output timestamp to the
>> fire timestamp, and not to the input timestamp. This is also how timer
>> work in the Java SDK.
>>
>> Let me know what you think.
>>
>> -Max
>>
>> PR: https://github.com/apache/beam/pull/12531
>>
>

Re: Output timestamp for Python event timers

Posted by Boyuan Zhang <bo...@google.com>.
Hi Maximilian,

It makes sense to set  hold_timestamp as fire_timestamp when the
fire_timestamp is in the event time domain. Otherwise, the system may
advance the watermark incorrectly.
I think we can do something similar to Java FnApiRunner[1]:

   - Expose set_output_timestamp API to python timer as well
   - If set_output_timestamp is not specified and timer is in event domain,
   we can use fire_timestamp as hold_timestamp
   - Otherwise, use input_timestamp as hold_timestamp.

What do you think?

[1]
https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493




On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels <mx...@apache.org> wrote:

> We ran into problems setting event time timers per-element in the Python
> SDK. Pipeline progress would stall.
>
> Turns out, although the Python SDK does not expose the timer output
> timestamp feature to the user, it sets the timer output timestamp to the
> current input timestamp of an element.
>
> This will lead to holding back the watermark until the timer fires (the
> Flink Runner respects the timer output timestamp when advancing the
> output watermark). We had set the fire timestamp to a timestamp so far
> in the future, that pipeline progress would completely stall for
> downstream transforms, due to the held back watermark.
>
> Considering that this feature is not even exposed to the user in the
> Python SDK, I think we should set the default output timestamp to the
> fire timestamp, and not to the input timestamp. This is also how timer
> work in the Java SDK.
>
> Let me know what you think.
>
> -Max
>
> PR: https://github.com/apache/beam/pull/12531
>