You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kenneth Knowles <kl...@google.com> on 2017/12/12 04:47:20 UTC

Re: [PROPOSAL] "Requires deterministic input"

In discussion on https://github.com/apache/beam/pull/4135 and offline it
came up how this should interact with stateful/timely DoFns.

Highlights:

 - It was suggested to support it for @OnTimer which would mean that the
state is stable on retry
 - But an unfortunate corollary is that for stateful DoFn it should be
stable after each element
 - And also if the state is expected to be stable on replay, then ordering
must be stable, including interleaving of timers and elements

And since stateful DoFn are very useful for sinks, and @RequiresStableInput
is very useful for sinks, I wonder if someone has thoughts about how these
should interact?

In the special case of @OnWindowExpiry (not yet implemented), the most
important timer for flushing state, it wouldn't actually be so bad.

Kenn

On Wed, Nov 15, 2017 at 7:28 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> +1
>
> > On 15. Nov 2017, at 14:07, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
> >
> > Agree !
> >
> > Thanks Kenn,
> > Regards
> > JB
> >
> > On 11/15/2017 02:05 PM, Kenneth Knowles wrote:
> >> Reviving this again, since it came up again today in yet another
> context. I
> >> think it is time to add this as an experimental annotation. I think we
> know
> >> that we need it, and roughly how it should work, while there are still
> >> finer points to discuss about what it means for input to be stable.
> >> So I filed https://issues.apache.org/jira/browse/BEAM-3194 and whipped
> up
> >> https://github.com/apache/beam/pull/4135 to move it along a smidge. It
> will
> >> need to be incorporated into the Beam model's ParDoPayload and the
> Python
> >> SDK as well.
> >> Kenn
> >> On Tue, Aug 15, 2017 at 2:40 PM, Reuven Lax <re...@google.com.invalid>
> >> wrote:
> >>> Well the Fn API is still being designed, so this is something we'd
> have to
> >>> think about.
> >>>
> >>> On Tue, Aug 15, 2017 at 2:19 PM, Robert Bradshaw <
> >>> robertwb@google.com.invalid> wrote:
> >>>
> >>>> On Tue, Aug 15, 2017 at 2:14 PM, Reuven Lax <relax@google.com.invalid
> >
> >>>> wrote:
> >>>>> On Tue, Aug 15, 2017 at 1:59 PM, Robert Bradshaw <
> >>>>> robertwb@google.com.invalid> wrote:
> >>>>>
> >>>>>> On Sat, Aug 12, 2017 at 1:13 AM, Reuven Lax
> <relax@google.com.invalid
> >>>>
> >>>>>> wrote:
> >>>>>>> On Fri, Aug 11, 2017 at 10:52 PM, Robert Bradshaw <
> >>>>>>>> The question here is whether the ordering is part of the "content"
> >>> of
> >>>>>>>> an iterable.
> >>>>>>>
> >>>>>>> My initial instinct was to say yes - but maybe it should not be
> >>> until
> >>>>>> Beam
> >>>>>>> has a first-class notion of sorted values after a GBK?
> >>>>>>
> >>>>>> Yeah, I'm not sure on this either. Interestingly, if we consider
> >>>>>> ordering to be important, than the composite gbk + ungroup will be
> >>>>>> stable despite its components not being so.
> >>>>>>
> >>>>>>>>>> As I mention above, the iterable is semantically [part of] a
> >>>> single
> >>>>>>>>>> element. So just to unpack this, to make sure we are talking
> >>> about
> >>>>>> the
> >>>>>>>> same
> >>>>>>>>>> thing, I think you are talking about GBK as implemented via
> >>> GBKO +
> >>>>>> GABW.
> >>>>>>>>>>
> >>>>>>>>>> When the output of GABW is required to be stable but the output
> >>> of
> >>>>>> GBKO
> >>>>>>>> is
> >>>>>>>>>> not stable, we don't have stability for free in all cases by
> >>>>>> inserting a
> >>>>>>>>>> GBK, but require something more to make the output of GABW
> >>>> stable, in
> >>>>>>>> the
> >>>>>>>>>> worst case a full materialization.
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Correct. My point is that there are alternate, cheaper ways of
> >>>> doing
> >>>>>>>> this.
> >>>>>>>>> If GABW stores state in an ordered list, it can simply
> >>> checkpoint a
> >>>>>>>> market
> >>>>>>>>> into that list to ensure that the output is stabl.
> >>>>>>>>
> >>>>>>>> In the presence of non-trivial triggering and/or late data, I'm
> not
> >>>> so
> >>>>>>>> sure this is "easy." E.g. A bundle may fail, and more data may
> come
> >>>> in
> >>>>>>>> from upstream (and get appended to the buffer) before it is
> >>> retried.
> >>>>>>>>
> >>>>>>>
> >>>>>>> That will still work. If the subsequent ParDo has processed the
> >>>> Iterable,
> >>>>>>> that means we'll have successfully checkpointed a marker to the
> list
> >>>>>> (using
> >>>>>>> whatever technique the runner supports). More data coming in will
> >>> get
> >>>>>>> appended after the marker, so we can ensure that the retry still
> >>> sees
> >>>> the
> >>>>>>> same elements in the Iterable.
> >>>>>>
> >>>>>> I'm thinking of the following.
> >>>>>>
> >>>>>> 1. (k, v1) and (k, v2) come into the GABW and [v1, v2] gets stored
> in
> >>>>>> the state. A trigger gets set.
> >>>>>> 2. The trigger is fired and (k, [v1, v2]) gets sent downstream, but
> >>>>>> for some reason fails.
> >>>>>> 3. (k, v3) comes into the GABW and [v3] gets appended to the state.
> >>>>>> 4. The trigger is again fired, and this time (k, [v1, v2, v3]) is
> sent
> >>>>>> downstream.
> >>>>>>
> >>>>>>
> >>>>> If you add the annotation specifying stableinput, then we will not do
> >>>> this.
> >>>>> Before we send anything downstream, we will add a marker to the list,
> >>> and
> >>>>> only forward data downstream once the marker has been checkpointed.
> >>> This
> >>>>> adds a bit of cost and latency of course, but the assumption is that
> >>>> adding
> >>>>> this annotation will always add some cost.
> >>>>
> >>>> I don't think you can checkpoint anything "before sending data
> >>>> downstream" if its being executed as part of a fused graph, unless we
> >>>> add special support for this in the Fn API. I suppose the runner could
> >>>> pre-emptively modify the state of any GABW operations before firing
> >>>> triggers...
> >>>>
> >>>>>> It is unclear when a marker would be added to the list. Is this in
> >>>>>> step 2 which, despite failing, still result in modified state [v1,
> v2,
> >>>>>> marker]? (And this state modification would have to be committed
> >>>>>> before attempting the bundle, in case the "failure" was something
> like
> >>>>>> a VM shutdown.) And only on success the state is modified to be (say
> >>>>>> this is accumulating mode) [v1, v2]?
> >>>>>>
> >>>>>> I think it could be done, but it may significantly complicate
> things.
> >>>>>>
> >>>>
> >>>
> >
> > --
> > Jean-Baptiste Onofré
> > jbonofre@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
>
>

Re: [PROPOSAL] "Requires deterministic input"

Posted by Reuven Lax <re...@google.com>.
On Mon, Dec 11, 2017 at 8:47 PM, Kenneth Knowles <kl...@google.com> wrote:

> In discussion on https://github.com/apache/beam/pull/4135 and offline it
> came up how this should interact with stateful/timely DoFns.
>
> Highlights:
>
>  - It was suggested to support it for @OnTimer which would mean that the
> state is stable on retry
>

This sounds good.


>  - But an unfortunate corollary is that for stateful DoFn it should be
> stable after each element
>  - And also if the state is expected to be stable on replay, then ordering
> must be stable, including interleaving of timers and elements
>

I'm not sure why this follows? I think rather we simply need the timer not
to fire _until_ state is stable. Elements (and therefore state writes)
might retry in different orders, but the timer does not fire until the
result is committed.


>
> And since stateful DoFn are very useful for sinks, and
> @RequiresStableInput is very useful for sinks, I wonder if someone has
> thoughts about how these should interact?
>
> In the special case of @OnWindowExpiry (not yet implemented), the most
> important timer for flushing state, it wouldn't actually be so bad.
>
> Kenn
>
> On Wed, Nov 15, 2017 at 7:28 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> +1
>>
>> > On 15. Nov 2017, at 14:07, Jean-Baptiste Onofré <jb...@nanthrax.net>
>> wrote:
>> >
>> > Agree !
>> >
>> > Thanks Kenn,
>> > Regards
>> > JB
>> >
>> > On 11/15/2017 02:05 PM, Kenneth Knowles wrote:
>> >> Reviving this again, since it came up again today in yet another
>> context. I
>> >> think it is time to add this as an experimental annotation. I think we
>> know
>> >> that we need it, and roughly how it should work, while there are still
>> >> finer points to discuss about what it means for input to be stable.
>> >> So I filed https://issues.apache.org/jira/browse/BEAM-3194 and
>> whipped up
>> >> https://github.com/apache/beam/pull/4135 to move it along a smidge.
>> It will
>> >> need to be incorporated into the Beam model's ParDoPayload and the
>> Python
>> >> SDK as well.
>> >> Kenn
>> >> On Tue, Aug 15, 2017 at 2:40 PM, Reuven Lax <re...@google.com.invalid>
>> >> wrote:
>> >>> Well the Fn API is still being designed, so this is something we'd
>> have to
>> >>> think about.
>> >>>
>> >>> On Tue, Aug 15, 2017 at 2:19 PM, Robert Bradshaw <
>> >>> robertwb@google.com.invalid> wrote:
>> >>>
>> >>>> On Tue, Aug 15, 2017 at 2:14 PM, Reuven Lax <relax@google.com.invalid
>> >
>> >>>> wrote:
>> >>>>> On Tue, Aug 15, 2017 at 1:59 PM, Robert Bradshaw <
>> >>>>> robertwb@google.com.invalid> wrote:
>> >>>>>
>> >>>>>> On Sat, Aug 12, 2017 at 1:13 AM, Reuven Lax
>> <relax@google.com.invalid
>> >>>>
>> >>>>>> wrote:
>> >>>>>>> On Fri, Aug 11, 2017 at 10:52 PM, Robert Bradshaw <
>> >>>>>>>> The question here is whether the ordering is part of the
>> "content"
>> >>> of
>> >>>>>>>> an iterable.
>> >>>>>>>
>> >>>>>>> My initial instinct was to say yes - but maybe it should not be
>> >>> until
>> >>>>>> Beam
>> >>>>>>> has a first-class notion of sorted values after a GBK?
>> >>>>>>
>> >>>>>> Yeah, I'm not sure on this either. Interestingly, if we consider
>> >>>>>> ordering to be important, than the composite gbk + ungroup will be
>> >>>>>> stable despite its components not being so.
>> >>>>>>
>> >>>>>>>>>> As I mention above, the iterable is semantically [part of] a
>> >>>> single
>> >>>>>>>>>> element. So just to unpack this, to make sure we are talking
>> >>> about
>> >>>>>> the
>> >>>>>>>> same
>> >>>>>>>>>> thing, I think you are talking about GBK as implemented via
>> >>> GBKO +
>> >>>>>> GABW.
>> >>>>>>>>>>
>> >>>>>>>>>> When the output of GABW is required to be stable but the output
>> >>> of
>> >>>>>> GBKO
>> >>>>>>>> is
>> >>>>>>>>>> not stable, we don't have stability for free in all cases by
>> >>>>>> inserting a
>> >>>>>>>>>> GBK, but require something more to make the output of GABW
>> >>>> stable, in
>> >>>>>>>> the
>> >>>>>>>>>> worst case a full materialization.
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Correct. My point is that there are alternate, cheaper ways of
>> >>>> doing
>> >>>>>>>> this.
>> >>>>>>>>> If GABW stores state in an ordered list, it can simply
>> >>> checkpoint a
>> >>>>>>>> market
>> >>>>>>>>> into that list to ensure that the output is stabl.
>> >>>>>>>>
>> >>>>>>>> In the presence of non-trivial triggering and/or late data, I'm
>> not
>> >>>> so
>> >>>>>>>> sure this is "easy." E.g. A bundle may fail, and more data may
>> come
>> >>>> in
>> >>>>>>>> from upstream (and get appended to the buffer) before it is
>> >>> retried.
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>> That will still work. If the subsequent ParDo has processed the
>> >>>> Iterable,
>> >>>>>>> that means we'll have successfully checkpointed a marker to the
>> list
>> >>>>>> (using
>> >>>>>>> whatever technique the runner supports). More data coming in will
>> >>> get
>> >>>>>>> appended after the marker, so we can ensure that the retry still
>> >>> sees
>> >>>> the
>> >>>>>>> same elements in the Iterable.
>> >>>>>>
>> >>>>>> I'm thinking of the following.
>> >>>>>>
>> >>>>>> 1. (k, v1) and (k, v2) come into the GABW and [v1, v2] gets stored
>> in
>> >>>>>> the state. A trigger gets set.
>> >>>>>> 2. The trigger is fired and (k, [v1, v2]) gets sent downstream, but
>> >>>>>> for some reason fails.
>> >>>>>> 3. (k, v3) comes into the GABW and [v3] gets appended to the state.
>> >>>>>> 4. The trigger is again fired, and this time (k, [v1, v2, v3]) is
>> sent
>> >>>>>> downstream.
>> >>>>>>
>> >>>>>>
>> >>>>> If you add the annotation specifying stableinput, then we will not
>> do
>> >>>> this.
>> >>>>> Before we send anything downstream, we will add a marker to the
>> list,
>> >>> and
>> >>>>> only forward data downstream once the marker has been checkpointed.
>> >>> This
>> >>>>> adds a bit of cost and latency of course, but the assumption is that
>> >>>> adding
>> >>>>> this annotation will always add some cost.
>> >>>>
>> >>>> I don't think you can checkpoint anything "before sending data
>> >>>> downstream" if its being executed as part of a fused graph, unless we
>> >>>> add special support for this in the Fn API. I suppose the runner
>> could
>> >>>> pre-emptively modify the state of any GABW operations before firing
>> >>>> triggers...
>> >>>>
>> >>>>>> It is unclear when a marker would be added to the list. Is this in
>> >>>>>> step 2 which, despite failing, still result in modified state [v1,
>> v2,
>> >>>>>> marker]? (And this state modification would have to be committed
>> >>>>>> before attempting the bundle, in case the "failure" was something
>> like
>> >>>>>> a VM shutdown.) And only on success the state is modified to be
>> (say
>> >>>>>> this is accumulating mode) [v1, v2]?
>> >>>>>>
>> >>>>>> I think it could be done, but it may significantly complicate
>> things.
>> >>>>>>
>> >>>>
>> >>>
>> >
>> > --
>> > Jean-Baptiste Onofré
>> > jbonofre@apache.org
>> > http://blog.nanthrax.net
>> > Talend - http://www.talend.com
>>
>>
>