You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Amit Sela <am...@gmail.com> on 2017/02/19 22:30:03 UTC

Setting synchronized processing time triggers

Hi all,

I was wondering how to use AfterSynchronizedProcessingTime trigger. for
processing time triggers there's the
AfterProcessingTime.pastFirstElementInPane() API but I've noticed that
AfterSynchronizedProcessingTime constructor is only called from a a
continuation trigger.
I wonder if someone could explain how should pipeline authors use this
trigger, and maybe a general explanation about processing time triggers
since (as far as I could find) most of existing
documentations/presentations/talks are focused on watermark-based triggers.

Thanks,
Amit

Re: Setting synchronized processing time triggers

Posted by Amit Sela <am...@gmail.com>.
Making this package-private would also remove this from the user list  ;-)

I thought of synchronized processing time more in the lines of what Kenn
suggests, for example:
The Spark runner has it's internal synchronized clock that triggers actions
per interval (the micro-batch clock), and I was thinking of using it but it
doesn't seem like the API is a fit at its current state.

I also thought of having a synchronized time trigger API for users to use
directly to trigger periodic actions.

On Mon, Feb 20, 2017 at 7:15 AM Kenneth Knowles <kl...@google.com.invalid>
wrote:

> For the dev list: there is another approach that is a bigger change, hence
> might not be worth taking on right now (or ever).
>
> Today, the triggering is always explicitly specified, and the SDK produces
> the continuation trigger and annotates PCollections as appropriate. We
> could push this responsibility to the runner, thus removing the
> AfterSynchronizedProcessingTime class from the SDK entirely. One argument
> in favor of this is that runners may have their own ways of emitting
> downstream output as fast as reasonable (e.g. punctuations).
>
> Kenn
>
> On Sun, Feb 19, 2017 at 9:07 PM, Kenneth Knowles <kl...@google.com> wrote:
>
> > Moving this discussion to dev@ and user@ to BCC since I also doubt that
> > this should be user-facing. Feel free to restore it if you want.
> >
> > The way it could really even be meaningful would be following a DoFn that
> > used timers in pretty advanced ways. I can't really come up with a real
> > example. It is public only due to Java visibility constraints to
> > appropriate map the trigger to an executable state machine. Work
> currently
> > in PR will remove those constraints, then we can make it package-private
> > again.
> >
> > Kenn
> >
> > On Sun, Feb 19, 2017 at 2:38 PM, Ben Chambers <bc...@apache.org>
> > wrote:
> >
> >> The continuation trigger is automatically used after the first group by
> >> key with a trigger. It is an attempt to trigger "as fast as reasonable"
> >> based on the original trigger. For example, if the trigger was 5 minutes
> >> after the hour (so aligned to an hour and then delayed by 5m) it
> wouldn't
> >> be good for every group by key to reintroduce the same delay. Instead it
> >> uses synchronized processing time to wait for all upstream firings of
> the
> >> same target time to fire. This ensures that all of the aligned triggers
> >> have been processed but non delay beyond that is introduced.
> >>
> >> Are there use cases for a user using synchronized processing time
> >> directly?
> >>
> >> On Sun, Feb 19, 2017, 2:30 PM Amit Sela <am...@gmail.com> wrote:
> >>
> >>> Hi all,
> >>>
> >>> I was wondering how to use AfterSynchronizedProcessingTime trigger. for
> >>> processing time triggers there's the
> AfterProcessingTime.pastFirstElementInPane()
> >>> API but I've noticed that AfterSynchronizedProcessingTime constructor
> >>> is only called from a a continuation trigger.
> >>> I wonder if someone could explain how should pipeline authors use this
> >>> trigger, and maybe a general explanation about processing time triggers
> >>> since (as far as I could find) most of existing
> >>> documentations/presentations/talks are focused on watermark-based
> >>> triggers.
> >>>
> >>> Thanks,
> >>> Amit
> >>>
> >>
> >
>

Re: Setting synchronized processing time triggers

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
For the dev list: there is another approach that is a bigger change, hence
might not be worth taking on right now (or ever).

Today, the triggering is always explicitly specified, and the SDK produces
the continuation trigger and annotates PCollections as appropriate. We
could push this responsibility to the runner, thus removing the
AfterSynchronizedProcessingTime class from the SDK entirely. One argument
in favor of this is that runners may have their own ways of emitting
downstream output as fast as reasonable (e.g. punctuations).

Kenn

On Sun, Feb 19, 2017 at 9:07 PM, Kenneth Knowles <kl...@google.com> wrote:

> Moving this discussion to dev@ and user@ to BCC since I also doubt that
> this should be user-facing. Feel free to restore it if you want.
>
> The way it could really even be meaningful would be following a DoFn that
> used timers in pretty advanced ways. I can't really come up with a real
> example. It is public only due to Java visibility constraints to
> appropriate map the trigger to an executable state machine. Work currently
> in PR will remove those constraints, then we can make it package-private
> again.
>
> Kenn
>
> On Sun, Feb 19, 2017 at 2:38 PM, Ben Chambers <bc...@apache.org>
> wrote:
>
>> The continuation trigger is automatically used after the first group by
>> key with a trigger. It is an attempt to trigger "as fast as reasonable"
>> based on the original trigger. For example, if the trigger was 5 minutes
>> after the hour (so aligned to an hour and then delayed by 5m) it wouldn't
>> be good for every group by key to reintroduce the same delay. Instead it
>> uses synchronized processing time to wait for all upstream firings of the
>> same target time to fire. This ensures that all of the aligned triggers
>> have been processed but non delay beyond that is introduced.
>>
>> Are there use cases for a user using synchronized processing time
>> directly?
>>
>> On Sun, Feb 19, 2017, 2:30 PM Amit Sela <am...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I was wondering how to use AfterSynchronizedProcessingTime trigger. for
>>> processing time triggers there's the AfterProcessingTime.pastFirstElementInPane()
>>> API but I've noticed that AfterSynchronizedProcessingTime constructor
>>> is only called from a a continuation trigger.
>>> I wonder if someone could explain how should pipeline authors use this
>>> trigger, and maybe a general explanation about processing time triggers
>>> since (as far as I could find) most of existing
>>> documentations/presentations/talks are focused on watermark-based
>>> triggers.
>>>
>>> Thanks,
>>> Amit
>>>
>>
>

Re: Setting synchronized processing time triggers

Posted by Kenneth Knowles <kl...@google.com>.
Moving this discussion to dev@ and user@ to BCC since I also doubt that
this should be user-facing. Feel free to restore it if you want.

The way it could really even be meaningful would be following a DoFn that
used timers in pretty advanced ways. I can't really come up with a real
example. It is public only due to Java visibility constraints to
appropriate map the trigger to an executable state machine. Work currently
in PR will remove those constraints, then we can make it package-private
again.

Kenn

On Sun, Feb 19, 2017 at 2:38 PM, Ben Chambers <bc...@apache.org> wrote:

> The continuation trigger is automatically used after the first group by
> key with a trigger. It is an attempt to trigger "as fast as reasonable"
> based on the original trigger. For example, if the trigger was 5 minutes
> after the hour (so aligned to an hour and then delayed by 5m) it wouldn't
> be good for every group by key to reintroduce the same delay. Instead it
> uses synchronized processing time to wait for all upstream firings of the
> same target time to fire. This ensures that all of the aligned triggers
> have been processed but non delay beyond that is introduced.
>
> Are there use cases for a user using synchronized processing time directly?
>
> On Sun, Feb 19, 2017, 2:30 PM Amit Sela <am...@gmail.com> wrote:
>
>> Hi all,
>>
>> I was wondering how to use AfterSynchronizedProcessingTime trigger. for
>> processing time triggers there's the AfterProcessingTime.pastFirstElementInPane()
>> API but I've noticed that AfterSynchronizedProcessingTime constructor is
>> only called from a a continuation trigger.
>> I wonder if someone could explain how should pipeline authors use this
>> trigger, and maybe a general explanation about processing time triggers
>> since (as far as I could find) most of existing
>> documentations/presentations/talks are focused on watermark-based
>> triggers.
>>
>> Thanks,
>> Amit
>>
>

Re: Setting synchronized processing time triggers

Posted by Kenneth Knowles <kl...@google.com.INVALID>.
Moving this discussion to dev@ and user@ to BCC since I also doubt that
this should be user-facing. Feel free to restore it if you want.

The way it could really even be meaningful would be following a DoFn that
used timers in pretty advanced ways. I can't really come up with a real
example. It is public only due to Java visibility constraints to
appropriate map the trigger to an executable state machine. Work currently
in PR will remove those constraints, then we can make it package-private
again.

Kenn

On Sun, Feb 19, 2017 at 2:38 PM, Ben Chambers <bc...@apache.org> wrote:

> The continuation trigger is automatically used after the first group by
> key with a trigger. It is an attempt to trigger "as fast as reasonable"
> based on the original trigger. For example, if the trigger was 5 minutes
> after the hour (so aligned to an hour and then delayed by 5m) it wouldn't
> be good for every group by key to reintroduce the same delay. Instead it
> uses synchronized processing time to wait for all upstream firings of the
> same target time to fire. This ensures that all of the aligned triggers
> have been processed but non delay beyond that is introduced.
>
> Are there use cases for a user using synchronized processing time directly?
>
> On Sun, Feb 19, 2017, 2:30 PM Amit Sela <am...@gmail.com> wrote:
>
>> Hi all,
>>
>> I was wondering how to use AfterSynchronizedProcessingTime trigger. for
>> processing time triggers there's the AfterProcessingTime.pastFirstElementInPane()
>> API but I've noticed that AfterSynchronizedProcessingTime constructor is
>> only called from a a continuation trigger.
>> I wonder if someone could explain how should pipeline authors use this
>> trigger, and maybe a general explanation about processing time triggers
>> since (as far as I could find) most of existing
>> documentations/presentations/talks are focused on watermark-based
>> triggers.
>>
>> Thanks,
>> Amit
>>
>

Re: Setting synchronized processing time triggers

Posted by Ben Chambers <bc...@apache.org>.
The continuation trigger is automatically used after the first group by key
with a trigger. It is an attempt to trigger "as fast as reasonable" based
on the original trigger. For example, if the trigger was 5 minutes after
the hour (so aligned to an hour and then delayed by 5m) it wouldn't be good
for every group by key to reintroduce the same delay. Instead it uses
synchronized processing time to wait for all upstream firings of the same
target time to fire. This ensures that all of the aligned triggers have
been processed but non delay beyond that is introduced.

Are there use cases for a user using synchronized processing time directly?

On Sun, Feb 19, 2017, 2:30 PM Amit Sela <am...@gmail.com> wrote:

> Hi all,
>
> I was wondering how to use AfterSynchronizedProcessingTime trigger. for
> processing time triggers there's the
> AfterProcessingTime.pastFirstElementInPane() API but I've noticed that
> AfterSynchronizedProcessingTime constructor is only called from a a
> continuation trigger.
> I wonder if someone could explain how should pipeline authors use this
> trigger, and maybe a general explanation about processing time triggers
> since (as far as I could find) most of existing
> documentations/presentations/talks are focused on watermark-based triggers.
>
> Thanks,
> Amit
>