You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Eugene Kirpichov <ki...@google.com.INVALID> on 2017/04/05 23:55:39 UTC

Proposed Splittable DoFn API changes

Hey all,

From the recent experience in continuing implementation of Splittable DoFn,
I would like to propose a few changes to its API. They get rid of a bug,
make parts of its semantics more well-defined and easier for a user to get
right, and reduce the assumptions about the runner implementation.

In short:
- Add c.updateWatermark() and report watermark continuously via this method.
- Make SDF.@ProcessElement return void, which is simpler for users though
it doesn't allow to resume after a specified time
- Declare that SDF.@ProcessElement must guarantee that after it returns,
the entire tracker.currentRestriction() was processed.
- Add a bool RestrictionTracker.done() method to enforce the bullet above.
- For resuming after specified time, use regular DoFn with state and timers
API.

The only downside is the removal (from SDF) of ability to suspend the call
for a certain amount of time - the suggestion is that, if you need that,
you should use a regular DoFn and the timers API.

Please see the full proposal in the following doc and comment there & vote
on this thread.
https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit?usp=sharing


I am going to concurrently start prototyping some parts of this proposal,
because the current implementation is simply wrong and this proposal is the
only way to fix it that I can think of, but I will adjust my implementation
based on the discussion. I believe this proposal should not affect runner
authors - I can make all the necessary changes myself.

Thanks!

Re: Proposed Splittable DoFn API changes

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
Hi all,
Heads up: https://github.com/apache/beam/pull/3360 reintroduces
ProcessContinuation and the associated JIRA
https://issues.apache.org/jira/browse/BEAM-2447 explains why and how to get
it to make sense without the issues described in this document.
Thanks.

On Sat, Apr 8, 2017 at 6:56 AM Aljoscha Krettek <al...@apache.org> wrote:

> +1
>
> I was too busy with traveling and preparations for Flink Forward but I
> wanted to retroactively confirm that these are good changes. :-)
> > On 7. Apr 2017, at 22:43, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
> >
> > Hi Eugene,
> >
> > thanks for the update and nice example.
> >
> > I plan to start to refactor/experiment on some IOs.
> >
> > Regards
> > JB
> >
> > On 04/08/2017 02:44 AM, Eugene Kirpichov wrote:
> >> The changes are in.
> >>
> >> Also included is a handy change that allows one to skip implementing the
> >> NewTracker method if the restriction type implements HasDefaultTracker,
> >> leaving the only two required methods be ProcessElement and
> >> GetInitialRestriction.
> >>
> >> E.g. here's what a minimal SDF example looks like now - splittably
> pairing
> >> a string with every number in 0..100:
> >>
> >>  class CountFn extends DoFn<String, KV<String, Long>> {
> >>    @ProcessElement
> >>    public void process(ProcessContext c, OffsetRangeTracker tracker) {
> >>      for (long i = tracker.currentRestriction().getFrom();
> >> tracker.tryClaim(i); ++i) {
> >>        c.output(KV.of(c.element(), i));
> >>      }
> >>    }
> >>
> >>    @GetInitialRestriction
> >>    public OffsetRange getInitialRange(String element) { return new
> >> OffsetRange(0, 100); }
> >>  }
> >>
> >>
> >> On Thu, Apr 6, 2017 at 3:16 PM Eugene Kirpichov <ki...@google.com>
> >> wrote:
> >>
> >>> FWIW, here's a pull request implementing these changes:
> >>> https://github.com/apache/beam/pull/2455
> >>>
> >>> On Wed, Apr 5, 2017 at 4:55 PM Eugene Kirpichov <ki...@google.com>
> >>> wrote:
> >>>
> >>> Hey all,
> >>>
> >>> From the recent experience in continuing implementation of Splittable
> >>> DoFn, I would like to propose a few changes to its API. They get rid
> of a
> >>> bug, make parts of its semantics more well-defined and easier for a
> user to
> >>> get right, and reduce the assumptions about the runner implementation.
> >>>
> >>> In short:
> >>> - Add c.updateWatermark() and report watermark continuously via this
> >>> method.
> >>> - Make SDF.@ProcessElement return void, which is simpler for users
> though
> >>> it doesn't allow to resume after a specified time
> >>> - Declare that SDF.@ProcessElement must guarantee that after it
> returns,
> >>> the entire tracker.currentRestriction() was processed.
> >>> - Add a bool RestrictionTracker.done() method to enforce the bullet
> above.
> >>> - For resuming after specified time, use regular DoFn with state and
> >>> timers API.
> >>>
> >>> The only downside is the removal (from SDF) of ability to suspend the
> call
> >>> for a certain amount of time - the suggestion is that, if you need
> that,
> >>> you should use a regular DoFn and the timers API.
> >>>
> >>> Please see the full proposal in the following doc and comment there &
> vote
> >>> on this thread.
> >>>
> >>>
> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit?usp=sharing
> >>>
> >>>
> >>> I am going to concurrently start prototyping some parts of this
> proposal,
> >>> because the current implementation is simply wrong and this proposal
> is the
> >>> only way to fix it that I can think of, but I will adjust my
> implementation
> >>> based on the discussion. I believe this proposal should not affect
> runner
> >>> authors - I can make all the necessary changes myself.
> >>>
> >>> Thanks!
> >>>
> >>>
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbonofre@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
>
>

Re: Proposed Splittable DoFn API changes

Posted by Aljoscha Krettek <al...@apache.org>.
+1

I was too busy with traveling and preparations for Flink Forward but I wanted to retroactively confirm that these are good changes. :-)
> On 7. Apr 2017, at 22:43, Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
> 
> Hi Eugene,
> 
> thanks for the update and nice example.
> 
> I plan to start to refactor/experiment on some IOs.
> 
> Regards
> JB
> 
> On 04/08/2017 02:44 AM, Eugene Kirpichov wrote:
>> The changes are in.
>> 
>> Also included is a handy change that allows one to skip implementing the
>> NewTracker method if the restriction type implements HasDefaultTracker,
>> leaving the only two required methods be ProcessElement and
>> GetInitialRestriction.
>> 
>> E.g. here's what a minimal SDF example looks like now - splittably pairing
>> a string with every number in 0..100:
>> 
>>  class CountFn extends DoFn<String, KV<String, Long>> {
>>    @ProcessElement
>>    public void process(ProcessContext c, OffsetRangeTracker tracker) {
>>      for (long i = tracker.currentRestriction().getFrom();
>> tracker.tryClaim(i); ++i) {
>>        c.output(KV.of(c.element(), i));
>>      }
>>    }
>> 
>>    @GetInitialRestriction
>>    public OffsetRange getInitialRange(String element) { return new
>> OffsetRange(0, 100); }
>>  }
>> 
>> 
>> On Thu, Apr 6, 2017 at 3:16 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>> 
>>> FWIW, here's a pull request implementing these changes:
>>> https://github.com/apache/beam/pull/2455
>>> 
>>> On Wed, Apr 5, 2017 at 4:55 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>> 
>>> Hey all,
>>> 
>>> From the recent experience in continuing implementation of Splittable
>>> DoFn, I would like to propose a few changes to its API. They get rid of a
>>> bug, make parts of its semantics more well-defined and easier for a user to
>>> get right, and reduce the assumptions about the runner implementation.
>>> 
>>> In short:
>>> - Add c.updateWatermark() and report watermark continuously via this
>>> method.
>>> - Make SDF.@ProcessElement return void, which is simpler for users though
>>> it doesn't allow to resume after a specified time
>>> - Declare that SDF.@ProcessElement must guarantee that after it returns,
>>> the entire tracker.currentRestriction() was processed.
>>> - Add a bool RestrictionTracker.done() method to enforce the bullet above.
>>> - For resuming after specified time, use regular DoFn with state and
>>> timers API.
>>> 
>>> The only downside is the removal (from SDF) of ability to suspend the call
>>> for a certain amount of time - the suggestion is that, if you need that,
>>> you should use a regular DoFn and the timers API.
>>> 
>>> Please see the full proposal in the following doc and comment there & vote
>>> on this thread.
>>> 
>>> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit?usp=sharing
>>> 
>>> 
>>> I am going to concurrently start prototyping some parts of this proposal,
>>> because the current implementation is simply wrong and this proposal is the
>>> only way to fix it that I can think of, but I will adjust my implementation
>>> based on the discussion. I believe this proposal should not affect runner
>>> authors - I can make all the necessary changes myself.
>>> 
>>> Thanks!
>>> 
>>> 
>> 
> 
> -- 
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com


Re: Proposed Splittable DoFn API changes

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Eugene,

thanks for the update and nice example.

I plan to start to refactor/experiment on some IOs.

Regards
JB

On 04/08/2017 02:44 AM, Eugene Kirpichov wrote:
> The changes are in.
>
> Also included is a handy change that allows one to skip implementing the
> NewTracker method if the restriction type implements HasDefaultTracker,
> leaving the only two required methods be ProcessElement and
> GetInitialRestriction.
>
> E.g. here's what a minimal SDF example looks like now - splittably pairing
> a string with every number in 0..100:
>
>   class CountFn extends DoFn<String, KV<String, Long>> {
>     @ProcessElement
>     public void process(ProcessContext c, OffsetRangeTracker tracker) {
>       for (long i = tracker.currentRestriction().getFrom();
> tracker.tryClaim(i); ++i) {
>         c.output(KV.of(c.element(), i));
>       }
>     }
>
>     @GetInitialRestriction
>     public OffsetRange getInitialRange(String element) { return new
> OffsetRange(0, 100); }
>   }
>
>
> On Thu, Apr 6, 2017 at 3:16 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> FWIW, here's a pull request implementing these changes:
>> https://github.com/apache/beam/pull/2455
>>
>> On Wed, Apr 5, 2017 at 4:55 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>> Hey all,
>>
>> From the recent experience in continuing implementation of Splittable
>> DoFn, I would like to propose a few changes to its API. They get rid of a
>> bug, make parts of its semantics more well-defined and easier for a user to
>> get right, and reduce the assumptions about the runner implementation.
>>
>> In short:
>> - Add c.updateWatermark() and report watermark continuously via this
>> method.
>> - Make SDF.@ProcessElement return void, which is simpler for users though
>> it doesn't allow to resume after a specified time
>> - Declare that SDF.@ProcessElement must guarantee that after it returns,
>> the entire tracker.currentRestriction() was processed.
>> - Add a bool RestrictionTracker.done() method to enforce the bullet above.
>> - For resuming after specified time, use regular DoFn with state and
>> timers API.
>>
>> The only downside is the removal (from SDF) of ability to suspend the call
>> for a certain amount of time - the suggestion is that, if you need that,
>> you should use a regular DoFn and the timers API.
>>
>> Please see the full proposal in the following doc and comment there & vote
>> on this thread.
>>
>> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit?usp=sharing
>>
>>
>> I am going to concurrently start prototyping some parts of this proposal,
>> because the current implementation is simply wrong and this proposal is the
>> only way to fix it that I can think of, but I will adjust my implementation
>> based on the discussion. I believe this proposal should not affect runner
>> authors - I can make all the necessary changes myself.
>>
>> Thanks!
>>
>>
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: Proposed Splittable DoFn API changes

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
The changes are in.

Also included is a handy change that allows one to skip implementing the
NewTracker method if the restriction type implements HasDefaultTracker,
leaving the only two required methods be ProcessElement and
GetInitialRestriction.

E.g. here's what a minimal SDF example looks like now - splittably pairing
a string with every number in 0..100:

  class CountFn extends DoFn<String, KV<String, Long>> {
    @ProcessElement
    public void process(ProcessContext c, OffsetRangeTracker tracker) {
      for (long i = tracker.currentRestriction().getFrom();
tracker.tryClaim(i); ++i) {
        c.output(KV.of(c.element(), i));
      }
    }

    @GetInitialRestriction
    public OffsetRange getInitialRange(String element) { return new
OffsetRange(0, 100); }
  }


On Thu, Apr 6, 2017 at 3:16 PM Eugene Kirpichov <ki...@google.com>
wrote:

> FWIW, here's a pull request implementing these changes:
> https://github.com/apache/beam/pull/2455
>
> On Wed, Apr 5, 2017 at 4:55 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
> Hey all,
>
> From the recent experience in continuing implementation of Splittable
> DoFn, I would like to propose a few changes to its API. They get rid of a
> bug, make parts of its semantics more well-defined and easier for a user to
> get right, and reduce the assumptions about the runner implementation.
>
> In short:
> - Add c.updateWatermark() and report watermark continuously via this
> method.
> - Make SDF.@ProcessElement return void, which is simpler for users though
> it doesn't allow to resume after a specified time
> - Declare that SDF.@ProcessElement must guarantee that after it returns,
> the entire tracker.currentRestriction() was processed.
> - Add a bool RestrictionTracker.done() method to enforce the bullet above.
> - For resuming after specified time, use regular DoFn with state and
> timers API.
>
> The only downside is the removal (from SDF) of ability to suspend the call
> for a certain amount of time - the suggestion is that, if you need that,
> you should use a regular DoFn and the timers API.
>
> Please see the full proposal in the following doc and comment there & vote
> on this thread.
>
> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit?usp=sharing
>
>
> I am going to concurrently start prototyping some parts of this proposal,
> because the current implementation is simply wrong and this proposal is the
> only way to fix it that I can think of, but I will adjust my implementation
> based on the discussion. I believe this proposal should not affect runner
> authors - I can make all the necessary changes myself.
>
> Thanks!
>
>

Re: Proposed Splittable DoFn API changes

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
FWIW, here's a pull request implementing these changes:
https://github.com/apache/beam/pull/2455

On Wed, Apr 5, 2017 at 4:55 PM Eugene Kirpichov <ki...@google.com>
wrote:

> Hey all,
>
> From the recent experience in continuing implementation of Splittable
> DoFn, I would like to propose a few changes to its API. They get rid of a
> bug, make parts of its semantics more well-defined and easier for a user to
> get right, and reduce the assumptions about the runner implementation.
>
> In short:
> - Add c.updateWatermark() and report watermark continuously via this
> method.
> - Make SDF.@ProcessElement return void, which is simpler for users though
> it doesn't allow to resume after a specified time
> - Declare that SDF.@ProcessElement must guarantee that after it returns,
> the entire tracker.currentRestriction() was processed.
> - Add a bool RestrictionTracker.done() method to enforce the bullet above.
> - For resuming after specified time, use regular DoFn with state and
> timers API.
>
> The only downside is the removal (from SDF) of ability to suspend the call
> for a certain amount of time - the suggestion is that, if you need that,
> you should use a regular DoFn and the timers API.
>
> Please see the full proposal in the following doc and comment there & vote
> on this thread.
>
> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit?usp=sharing
>
>
> I am going to concurrently start prototyping some parts of this proposal,
> because the current implementation is simply wrong and this proposal is the
> only way to fix it that I can think of, but I will adjust my implementation
> based on the discussion. I believe this proposal should not affect runner
> authors - I can make all the necessary changes myself.
>
> Thanks!
>