You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Lukasz Cwik <lc...@google.com> on 2018/09/05 20:53:48 UTC

Re: SplittableDoFn

Thanks to all those who have provided interest in this topic by the
questions they have asked on the doc already and for those interested in
having this discussion. I have setup this doodle to allow people to provide
their availability:
https://doodle.com/poll/nrw7w84255xnfwqy

I'll send out the chosen time based upon peoples availability and a Hangout
link by end of day Friday so please mark your availability using the link
above.

The agenda of the meeting will be as follows:
* Overview of the proposal
* Enumerate and discuss/answer questions brought up in the meeting

Note that all questions and any discussions/answers provided will be added
to the doc for those who are unable to attend.

On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> +1
>
> Regards
> JB
> Le 31 août 2018, à 18:22, Lukasz Cwik <lc...@google.com> a écrit:
>>
>> That is possible, I'll take people's date/time suggestions and create a
>> simple online poll with them.
>>
>> On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> Thanks for taking this up. I added some comments to the doc. A
>>> European-friendly time for discussion would be great.
>>>
>>> On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> I came up with a proposal[1] for a progress model solely based off of
>>>> the backlog and that splits should be based upon the remaining backlog we
>>>> want the SDK to split at. I also give recommendations to runner authors as
>>>> to how an autoscaling system could work based upon the measured backlog. A
>>>> lot of discussions around progress reporting and splitting in the past has
>>>> always been around finding an optimal solution, after reading a lot of
>>>> information about work stealing, I don't believe there is a general
>>>> solution and it really is upto SplittableDoFns to be well behaved. I did
>>>> not do much work in classifying what a well behaved SplittableDoFn is
>>>> though. Much of this work builds off ideas that Eugene had documented in
>>>> the past[2].
>>>>
>>>> I could use the communities wide knowledge of different I/Os to see if
>>>> computing the backlog is practical in the way that I'm suggesting and to
>>>> gather people's feedback.
>>>>
>>>> If there is a lot of interest, I would like to hold a community video
>>>> conference between Sept 10th and 14th about this topic. Please reply with
>>>> your availability by Sept 6th if your interested.
>>>>
>>>> 1: https://s.apache.org/beam-bundles-backlog-splitting
>>>> 2: https://s.apache.org/beam-breaking-fusion
>>>>
>>>> On Mon, Aug 13, 2018 at 10:21 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
>>>> wrote:
>>>>
>>>>> Awesome !
>>>>>
>>>>> Thanks Luke !
>>>>>
>>>>> I plan to work with you and others on this one.
>>>>>
>>>>> Regards
>>>>> JB
>>>>> Le 13 août 2018, à 19:14, Lukasz Cwik <lc...@google.com> a écrit:
>>>>>>
>>>>>> I wanted to reach out that I will be continuing from where Eugene
>>>>>> left off with SplittableDoFn. I know that many of you have done a bunch of
>>>>>> work with IOs and/or runner integration for SplittableDoFn and would
>>>>>> appreciate your help in advancing this awesome idea. If you have questions
>>>>>> or things you want to get reviewed related to SplittableDoFn, feel free to
>>>>>> send them my way or include me on anything SplittableDoFn related.
>>>>>>
>>>>>> I was part of several discussions with Eugene and I think the biggest
>>>>>> outstanding design portion is to figure out how dynamic work rebalancing
>>>>>> would play out with the portability APIs. This includes reporting of
>>>>>> progress from within a bundle. I know that Eugene had shared some documents
>>>>>> in this regard but the position / split models didn't work too cleanly in a
>>>>>> unified sense for bounded and unbounded SplittableDoFns. It will likely
>>>>>> take me awhile to gather my thoughts but could use your expertise as to how
>>>>>> compatible these ideas are with respect to to IOs and runners
>>>>>> Flink/Spark/Dataflow/Samza/Apex/... and obviously help during
>>>>>> implementation.
>>>>>>
>>>>>

Re: SplittableDoFn

Posted by Alex Van Boxel <al...@vanboxel.be>.
Yes, but we need at least Mongo 4.0 to make it production ready. I wouldn't
let anyone work with anything less because you can't checkpoint). I'm
waiting till our test cluster is 4.0 to continue on this.

 _/
_/ Alex Van Boxel


On Wed, Oct 3, 2018 at 9:43 AM Ismaël Mejía <ie...@gmail.com> wrote:

> Hello Axel, Thanks for sharing, really interesting quest story, we
> really need more like this (kudos for the animations too).
> Are you planning to contribute the continous SDF based version of the
> mongo connector into Beam upstream (once ready)?
>
>
> On Wed, Oct 3, 2018 at 7:07 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
> >
> > Nice one Alex !
> >
> > Thanks
> > Regards
> > JB
> >
> > On 02/10/2018 23:19, Alex Van Boxel wrote:
> > > Don't want to crash the tech discussion here, but... I just gave a
> > > session at the Beam Summit about Splittable DoFn's as a users
> > > perspective (from things I could gather from the documentation and
> > > experimentation). Her is the slides deck, maybe it could be
> > > useful:
> https://docs.google.com/presentation/d/1dSc6oKh5pZItQPB_QiUyEoLT2TebMnj-pmdGipkVFPk/edit?usp=sharing
> (quite
> > > proud of the animations though ;-)
> > >
> > >  _/
> > > _/ Alex Van Boxel
> > >
> > >
> > > On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik <lcwik@google.com
> > > <ma...@google.com>> wrote:
> > >
> > >     Reuven, just inside the restriction tracker itself which is scoped
> > >     per executing SplittableDoFn. A user could incorrectly write the
> > >     synchronization since they are currently responsible for writing it
> > >     though.
> > >
> > >     On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax <relax@google.com
> > >     <ma...@google.com>> wrote:
> > >
> > >         is synchronization over an entire work item, or just inside
> > >         restriction tracker? my concern is that some runners
> (especially
> > >         streaming runners) might have hundreds or thousands of parallel
> > >         work items being processed for the same SDF (for different
> > >         keys), and I'm afraid of creating lock-contention bottlenecks.
> > >
> > >         On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik <lcwik@google.com
> > >         <ma...@google.com>> wrote:
> > >
> > >             The synchronization is related to Java thread safety since
> > >             there is likely to be concurrent access needed to a
> > >             restriction tracker to properly handle accessing the
> backlog
> > >             and splitting concurrently from when the users DoFn is
> > >             executing and updating the restriction tracker. This is
> > >             similar to the Java thread safety needed in BoundedSource
> > >             and UnboundedSource for fraction consumed, backlog bytes,
> > >             and splitting.
> > >
> > >             On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax <
> relax@google.com
> > >             <ma...@google.com>> wrote:
> > >
> > >                 Can you give details on what the synchronization is
> per?
> > >                 Is it per key, or global to each worker?
> > >
> > >                 On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik
> > >                 <lcwik@google.com <ma...@google.com>> wrote:
> > >
> > >                     As I was looking at the SplittableDoFn API while
> > >                     working towards making a proposal for how the
> > >                     backlog/splitting API could look, I found some
> sharp
> > >                     edges that could be improved.
> > >
> > >                     I noticed that:
> > >                     1) We require users to write thread safe code, this
> > >                     is something that we haven't asked of users when
> > >                     writing a DoFn.
> > >                     2) We "internal" methods within the
> > >                     RestrictionTracker that are not meant to be used by
> > >                     the runner.
> > >
> > >                     I can fix these issues by giving the user a
> > >                     forwarding restriction tracker[1] that provides an
> > >                     appropriate level of synchronization as needed and
> > >                     also provides the necessary observation hooks to
> see
> > >                     when a claim failed or succeeded.
> > >
> > >                     This requires a change to our experimental API
> since
> > >                     we need to pass a RestrictionTracker to the
> > >                     @ProcessElement method instead of a sub-type of
> > >                     RestrictionTracker.
> > >                     @ProcessElement
> > >                     processElement(ProcessContext context,
> > >                     OffsetRangeTracker tracker) { ... }
> > >                     becomes:
> > >                     @ProcessElement
> > >                     processElement(ProcessContext context,
> > >                     RestrictionTracker<OffsetRange, Long> tracker) {
> ... }
> > >
> > >                     This provides an additional benefit that it
> prevents
> > >                     users from working around the RestrictionTracker
> > >                     APIs and potentially making underlying changes to
> > >                     the tracker outside of the tryClaim call.
> > >
> > >                     Full implementation is available within this PR[2]
> > >                     and was wondering what people thought.
> > >
> > >                     1:
> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
> > >                     2: https://github.com/apache/beam/pull/6467
> > >
> > >
> > >                     On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik
> > >                     <lcwik@google.com <ma...@google.com>>
> wrote:
> > >
> > >                         The changes to the API have not been proposed
> > >                         yet. So far it has all been about what is the
> > >                         representation and why.
> > >
> > >                         For splitting, the current idea has been about
> > >                         using the backlog as a way of telling the
> > >                         SplittableDoFn where to split, so it would be
> in
> > >                         terms of whatever the SDK decided to report.
> > >                         The runner always chooses a number for backlog
> > >                         that is relative to the SDKs reported backlog.
> > >                         It would be upto the SDK to round/clamp the
> > >                         number given by the Runner to represent
> > >                         something meaningful for itself.
> > >                         For example if the backlog that the SDK was
> > >                         reporting was bytes remaining in a file such as
> > >                         500, then the Runner could provide some value
> > >                         like 212.2 which the SDK would then round to
> 212.
> > >                         If the backlog that the SDK was reporting 57
> > >                         pubsub messages, then the Runner could provide
> a
> > >                         value like 300 which would mean to read 57
> > >                         values and then another 243 as part of the
> > >                         current restriction.
> > >
> > >                         I believe that BoundedSource/UnboundedSource
> > >                         will have wrappers added that provide a basic
> > >                         SplittableDoFn implementation so existing IOs
> > >                         should be migrated over without API changes.
> > >
> > >                         On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía
> > >                         <iemejia@gmail.com <ma...@gmail.com>>
> > >                         wrote:
> > >
> > >                             Thanks a lot Luke for bringing this back to
> > >                             the mailing list and Ryan for taking
> > >                             the notes.
> > >
> > >                             I would like to know if there was some
> > >                             discussion, or if you guys have given
> > >                             some thought to the required changes in the
> > >                             SDK (API) part. What will be the
> > >                             equivalent of `splitAtFraction` and what
> > >                             should IO authors do to support it..
> > >
> > >                             On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik
> > >                             <lcwik@google.com <mailto:lcwik@google.com
> >>
> > >                             wrote:
> > >                             >
> > >                             > Thanks to everyone who joined and for the
> > >                             questions asked.
> > >                             >
> > >                             > Ryan graciously collected notes of the
> > >                             discussion:
> > >
> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
> > >                             >
> > >                             > The summary was that bringing
> > >                             BoundedSource/UnboundedSource into using a
> > >                             unified backlog-reporting mechanism with
> > >                             optional other signals that Dataflow has
> > >                             found useful (such as is the remaining
> > >                             restriction splittable (yes, no, unknown)).
> > >                             Other runners can use or not. SDFs should
> > >                             report backlog and watermark as minimum
> bar.
> > >                             The backlog should use an arbitrary
> > >                             precision float such as Java BigDecimal to
> > >                             prevent issues where limited precision
> > >                             removes the ability to compute delta
> > >                             efficiently.
> > >                             >
> > >                             >
> > >                             >
> > >                             > On Wed, Sep 12, 2018 at 3:54 PM Lukasz
> > >                             Cwik <lcwik@google.com
> > >                             <ma...@google.com>> wrote:
> > >                             >>
> > >                             >> Here is the link to join the discussion:
> > >                             https://meet.google.com/idc-japs-hwf
> > >                             >> Remember that it is this Friday Sept
> 14th
> > >                             from 11am-noon PST.
> > >                             >>
> > >                             >>
> > >                             >>
> > >                             >> On Mon, Sep 10, 2018 at 7:30 AM
> > >                             Maximilian Michels <mxm@apache.org
> > >                             <ma...@apache.org>> wrote:
> > >                             >>>
> > >                             >>> Thanks for moving forward with this,
> Lukasz!
> > >                             >>>
> > >                             >>> Unfortunately, can't make it on Friday
> > >                             but I'll sync with somebody on
> > >                             >>> the call (e.g. Ryan) about your
> discussion.
> > >                             >>>
> > >                             >>> On 08.09.18 02:00, Lukasz Cwik wrote:
> > >                             >>> > Thanks for everyone who wanted to
> fill
> > >                             out the doodle poll. The most
> > >                             >>> > popular time was Friday Sept 14th
> from
> > >                             11am-noon PST. I'll send out a
> > >                             >>> > calendar invite and meeting link
> early
> > >                             next week.
> > >                             >>> >
> > >                             >>> > I have received a lot of feedback on
> > >                             the document and have addressed
> > >                             >>> > some parts of it including:
> > >                             >>> > * clarifying terminology
> > >                             >>> > * processing skew due to some
> > >                             restrictions having their watermarks much
> > >                             >>> > further behind then others affecting
> > >                             scheduling of bundles by runners
> > >                             >>> > * external throttling & I/O wait
> > >                             overhead reporting to make sure we
> > >                             >>> > don't overscale
> > >                             >>> >
> > >                             >>> > Areas that still need additional
> > >                             feedback and details are:
> > >                             >>> > * reporting progress around the work
> > >                             that is done and is active
> > >                             >>> > * more examples
> > >                             >>> > * unbounded restrictions being caused
> > >                             by an unbounded number of splits
> > >                             >>> > of existing unbounded restrictions
> > >                             (infinite work growth)
> > >                             >>> > * whether we should be reporting this
> > >                             information at the PTransform
> > >                             >>> > level or at the bundle level
> > >                             >>> >
> > >                             >>> >
> > >                             >>> >
> > >                             >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz
> > >                             Cwik <lcwik@google.com <mailto:
> lcwik@google.com>
> > >                             >>> > <mailto:lcwik@google.com
> > >                             <ma...@google.com>>> wrote:
> > >                             >>> >
> > >                             >>> >     Thanks to all those who have
> > >                             provided interest in this topic by the
> > >                             >>> >     questions they have asked on the
> > >                             doc already and for those
> > >                             >>> >     interested in having this
> > >                             discussion. I have setup this doodle to
> > >                             >>> >     allow people to provide their
> > >                             availability:
> > >                             >>> >
> > >                              https://doodle.com/poll/nrw7w84255xnfwqy
> > >                             >>> >
> > >                             >>> >     I'll send out the chosen time
> > >                             based upon peoples availability and a
> > >                             >>> >     Hangout link by end of day Friday
> > >                             so please mark your availability
> > >                             >>> >     using the link above.
> > >                             >>> >
> > >                             >>> >     The agenda of the meeting will be
> > >                             as follows:
> > >                             >>> >     * Overview of the proposal
> > >                             >>> >     * Enumerate and discuss/answer
> > >                             questions brought up in the meeting
> > >                             >>> >
> > >                             >>> >     Note that all questions and any
> > >                             discussions/answers provided will be
> > >                             >>> >     added to the doc for those who
> are
> > >                             unable to attend.
> > >                             >>> >
> > >                             >>> >     On Fri, Aug 31, 2018 at 9:47 AM
> > >                             Jean-Baptiste Onofré
> > >                             >>> >     <jb@nanthrax.net
> > >                             <ma...@nanthrax.net>
> > >                             <mailto:jb@nanthrax.net
> > >                             <ma...@nanthrax.net>>> wrote:
> > >                             >>> >
> > >                             >>> >         +1
> > >                             >>> >
> > >                             >>> >         Regards
> > >                             >>> >         JB
> > >                             >>> >         Le 31 août 2018, à 18:22,
> > >                             Lukasz Cwik <lcwik@google.com
> > >                             <ma...@google.com>
> > >                             >>> >         <mailto:lcwik@google.com
> > >                             <ma...@google.com>>> a écrit:
> > >                             >>> >
> > >                             >>> >             That is possible, I'll
> > >                             take people's date/time suggestions
> > >                             >>> >             and create a simple
> online
> > >                             poll with them.
> > >                             >>> >
> > >                             >>> >             On Fri, Aug 31, 2018 at
> > >                             2:22 AM Robert Bradshaw
> > >                             >>> >             <robertwb@google.com
> > >                             <ma...@google.com>
> > >                             <mailto:robertwb@google.com
> > >                             <ma...@google.com>>> wrote:
> > >                             >>> >
> > >                             >>> >                 Thanks for taking
> this
> > >                             up. I added some comments to the
> > >                             >>> >                 doc. A
> > >                             European-friendly time for discussion would
> > >                             >>> >                 be great.
> > >                             >>> >
> > >                             >>> >                 On Fri, Aug 31, 2018
> > >                             at 3:14 AM Lukasz Cwik
> > >                             >>> >                 <lcwik@google.com
> > >                             <ma...@google.com>
> > >                             <mailto:lcwik@google.com
> > >                             <ma...@google.com>>> wrote:
> > >                             >>> >
> > >                             >>> >                     I came up with a
> > >                             proposal[1] for a progress model
> > >                             >>> >                     solely based off
> > >                             of the backlog and that splits
> > >                             >>> >                     should be based
> > >                             upon the remaining backlog we want
> > >                             >>> >                     the SDK to split
> > >                             at. I also give recommendations to
> > >                             >>> >                     runner authors as
> > >                             to how an autoscaling system could
> > >                             >>> >                     work based upon
> > >                             the measured backlog. A lot of
> > >                             >>> >                     discussions
> around
> > >                             progress reporting and splitting
> > >                             >>> >                     in the past has
> > >                             always been around finding an
> > >                             >>> >                     optimal solution,
> > >                             after reading a lot of information
> > >                             >>> >                     about work
> > >                             stealing, I don't believe there is a
> > >                             >>> >                     general solution
> > >                             and it really is upto
> > >                             >>> >                     SplittableDoFns
> to
> > >                             be well behaved. I did not do
> > >                             >>> >                     much work in
> > >                             classifying what a well behaved
> > >                             >>> >                     SplittableDoFn is
> > >                             though. Much of this work builds
> > >                             >>> >                     off ideas that
> > >                             Eugene had documented in the past[2].
> > >                             >>> >
> > >                             >>> >                     I could use the
> > >                             communities wide knowledge of
> > >                             >>> >                     different I/Os to
> > >                             see if computing the backlog is
> > >                             >>> >                     practical in the
> > >                             way that I'm suggesting and to
> > >                             >>> >                     gather people's
> > >                             feedback.
> > >                             >>> >
> > >                             >>> >                     If there is a lot
> > >                             of interest, I would like to hold
> > >                             >>> >                     a community video
> > >                             conference between Sept 10th and
> > >                             >>> >                     14th about this
> > >                             topic. Please reply with your
> > >                             >>> >                     availability by
> > >                             Sept 6th if your interested.
> > >                             >>> >
> > >                             >>> >                     1:
> > >
> https://s.apache.org/beam-bundles-backlog-splitting
> > >                             >>> >                     2:
> > >                             https://s.apache.org/beam-breaking-fusion
> > >                             >>> >
> > >                             >>> >                     On Mon, Aug 13,
> > >                             2018 at 10:21 AM Jean-Baptiste
> > >                             >>> >                     Onofré
> > >                             <jb@nanthrax.net <ma...@nanthrax.net>
> > >                             <mailto:jb@nanthrax.net
> > >                             <ma...@nanthrax.net>>> wrote:
> > >                             >>> >
> > >                             >>> >                         Awesome !
> > >                             >>> >
> > >                             >>> >                         Thanks Luke !
> > >                             >>> >
> > >                             >>> >                         I plan to
> work
> > >                             with you and others on this one.
> > >                             >>> >
> > >                             >>> >                         Regards
> > >                             >>> >                         JB
> > >                             >>> >                         Le 13 août
> > >                             2018, à 19:14, Lukasz Cwik
> > >                             >>> >
> > >                              <lcwik@google.com <mailto:
> lcwik@google.com>
> > >                             <mailto:lcwik@google.com
> > >                             <ma...@google.com>>> a
> > >                             >>> >                         écrit:
> > >                             >>> >
> > >                             >>> >                             I wanted
> > >                             to reach out that I will be
> > >                             >>> >
>  continuing
> > >                             from where Eugene left off with
> > >                             >>> >
> > >                              SplittableDoFn. I know that many of you
> have
> > >                             >>> >                             done a
> > >                             bunch of work with IOs and/or runner
> > >                             >>> >
> > >                              integration for SplittableDoFn and would
> > >                             >>> >
>  appreciate
> > >                             your help in advancing this
> > >                             >>> >                             awesome
> > >                             idea. If you have questions or
> > >                             >>> >                             things
> you
> > >                             want to get reviewed related to
> > >                             >>> >
> > >                              SplittableDoFn, feel free to send them my
> > >                             >>> >                             way or
> > >                             include me on anything SplittableDoFn
> > >                             >>> >                             related.
> > >                             >>> >
> > >                             >>> >                             I was
> part
> > >                             of several discussions with
> > >                             >>> >                             Eugene
> and
> > >                             I think the biggest outstanding
> > >                             >>> >                             design
> > >                             portion is to figure out how dynamic
> > >                             >>> >                             work
> > >                             rebalancing would play out with the
> > >                             >>> >
> > >                              portability APIs. This includes reporting
> of
> > >                             >>> >                             progress
> > >                             from within a bundle. I know that
> > >                             >>> >                             Eugene
> had
> > >                             shared some documents in this
> > >                             >>> >                             regard
> but
> > >                             the position / split models
> > >                             >>> >                             didn't
> > >                             work too cleanly in a unified sense
> > >                             >>> >                             for
> > >                             bounded and unbounded SplittableDoFns.
> > >                             >>> >                             It will
> > >                             likely take me awhile to gather my
> > >                             >>> >                             thoughts
> > >                             but could use your expertise as to
> > >                             >>> >                             how
> > >                             compatible these ideas are with respect
> > >                             >>> >                             to to IOs
> > >                             and runners
> > >                             >>> >
> > >                              Flink/Spark/Dataflow/Samza/Apex/... and
> > >                             >>> >                             obviously
> > >                             help during implementation.
> > >                             >>> >
> > >
> >
> > --
> > Jean-Baptiste Onofré
> > jbonofre@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
>

Re: SplittableDoFn

Posted by Ismaël Mejía <ie...@gmail.com>.
Hello Axel, Thanks for sharing, really interesting quest story, we
really need more like this (kudos for the animations too).
Are you planning to contribute the continous SDF based version of the
mongo connector into Beam upstream (once ready)?


On Wed, Oct 3, 2018 at 7:07 AM Jean-Baptiste Onofré <jb...@nanthrax.net> wrote:
>
> Nice one Alex !
>
> Thanks
> Regards
> JB
>
> On 02/10/2018 23:19, Alex Van Boxel wrote:
> > Don't want to crash the tech discussion here, but... I just gave a
> > session at the Beam Summit about Splittable DoFn's as a users
> > perspective (from things I could gather from the documentation and
> > experimentation). Her is the slides deck, maybe it could be
> > useful: https://docs.google.com/presentation/d/1dSc6oKh5pZItQPB_QiUyEoLT2TebMnj-pmdGipkVFPk/edit?usp=sharing (quite
> > proud of the animations though ;-)
> >
> >  _/
> > _/ Alex Van Boxel
> >
> >
> > On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik <lcwik@google.com
> > <ma...@google.com>> wrote:
> >
> >     Reuven, just inside the restriction tracker itself which is scoped
> >     per executing SplittableDoFn. A user could incorrectly write the
> >     synchronization since they are currently responsible for writing it
> >     though.
> >
> >     On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax <relax@google.com
> >     <ma...@google.com>> wrote:
> >
> >         is synchronization over an entire work item, or just inside
> >         restriction tracker? my concern is that some runners (especially
> >         streaming runners) might have hundreds or thousands of parallel
> >         work items being processed for the same SDF (for different
> >         keys), and I'm afraid of creating lock-contention bottlenecks.
> >
> >         On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik <lcwik@google.com
> >         <ma...@google.com>> wrote:
> >
> >             The synchronization is related to Java thread safety since
> >             there is likely to be concurrent access needed to a
> >             restriction tracker to properly handle accessing the backlog
> >             and splitting concurrently from when the users DoFn is
> >             executing and updating the restriction tracker. This is
> >             similar to the Java thread safety needed in BoundedSource
> >             and UnboundedSource for fraction consumed, backlog bytes,
> >             and splitting.
> >
> >             On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax <relax@google.com
> >             <ma...@google.com>> wrote:
> >
> >                 Can you give details on what the synchronization is per?
> >                 Is it per key, or global to each worker?
> >
> >                 On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik
> >                 <lcwik@google.com <ma...@google.com>> wrote:
> >
> >                     As I was looking at the SplittableDoFn API while
> >                     working towards making a proposal for how the
> >                     backlog/splitting API could look, I found some sharp
> >                     edges that could be improved.
> >
> >                     I noticed that:
> >                     1) We require users to write thread safe code, this
> >                     is something that we haven't asked of users when
> >                     writing a DoFn.
> >                     2) We "internal" methods within the
> >                     RestrictionTracker that are not meant to be used by
> >                     the runner.
> >
> >                     I can fix these issues by giving the user a
> >                     forwarding restriction tracker[1] that provides an
> >                     appropriate level of synchronization as needed and
> >                     also provides the necessary observation hooks to see
> >                     when a claim failed or succeeded.
> >
> >                     This requires a change to our experimental API since
> >                     we need to pass a RestrictionTracker to the
> >                     @ProcessElement method instead of a sub-type of
> >                     RestrictionTracker.
> >                     @ProcessElement
> >                     processElement(ProcessContext context,
> >                     OffsetRangeTracker tracker) { ... }
> >                     becomes:
> >                     @ProcessElement
> >                     processElement(ProcessContext context,
> >                     RestrictionTracker<OffsetRange, Long> tracker) { ... }
> >
> >                     This provides an additional benefit that it prevents
> >                     users from working around the RestrictionTracker
> >                     APIs and potentially making underlying changes to
> >                     the tracker outside of the tryClaim call.
> >
> >                     Full implementation is available within this PR[2]
> >                     and was wondering what people thought.
> >
> >                     1: https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
> >                     2: https://github.com/apache/beam/pull/6467
> >
> >
> >                     On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik
> >                     <lcwik@google.com <ma...@google.com>> wrote:
> >
> >                         The changes to the API have not been proposed
> >                         yet. So far it has all been about what is the
> >                         representation and why.
> >
> >                         For splitting, the current idea has been about
> >                         using the backlog as a way of telling the
> >                         SplittableDoFn where to split, so it would be in
> >                         terms of whatever the SDK decided to report.
> >                         The runner always chooses a number for backlog
> >                         that is relative to the SDKs reported backlog.
> >                         It would be upto the SDK to round/clamp the
> >                         number given by the Runner to represent
> >                         something meaningful for itself.
> >                         For example if the backlog that the SDK was
> >                         reporting was bytes remaining in a file such as
> >                         500, then the Runner could provide some value
> >                         like 212.2 which the SDK would then round to 212.
> >                         If the backlog that the SDK was reporting 57
> >                         pubsub messages, then the Runner could provide a
> >                         value like 300 which would mean to read 57
> >                         values and then another 243 as part of the
> >                         current restriction.
> >
> >                         I believe that BoundedSource/UnboundedSource
> >                         will have wrappers added that provide a basic
> >                         SplittableDoFn implementation so existing IOs
> >                         should be migrated over without API changes.
> >
> >                         On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía
> >                         <iemejia@gmail.com <ma...@gmail.com>>
> >                         wrote:
> >
> >                             Thanks a lot Luke for bringing this back to
> >                             the mailing list and Ryan for taking
> >                             the notes.
> >
> >                             I would like to know if there was some
> >                             discussion, or if you guys have given
> >                             some thought to the required changes in the
> >                             SDK (API) part. What will be the
> >                             equivalent of `splitAtFraction` and what
> >                             should IO authors do to support it..
> >
> >                             On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik
> >                             <lcwik@google.com <ma...@google.com>>
> >                             wrote:
> >                             >
> >                             > Thanks to everyone who joined and for the
> >                             questions asked.
> >                             >
> >                             > Ryan graciously collected notes of the
> >                             discussion:
> >                             https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
> >                             >
> >                             > The summary was that bringing
> >                             BoundedSource/UnboundedSource into using a
> >                             unified backlog-reporting mechanism with
> >                             optional other signals that Dataflow has
> >                             found useful (such as is the remaining
> >                             restriction splittable (yes, no, unknown)).
> >                             Other runners can use or not. SDFs should
> >                             report backlog and watermark as minimum bar.
> >                             The backlog should use an arbitrary
> >                             precision float such as Java BigDecimal to
> >                             prevent issues where limited precision
> >                             removes the ability to compute delta
> >                             efficiently.
> >                             >
> >                             >
> >                             >
> >                             > On Wed, Sep 12, 2018 at 3:54 PM Lukasz
> >                             Cwik <lcwik@google.com
> >                             <ma...@google.com>> wrote:
> >                             >>
> >                             >> Here is the link to join the discussion:
> >                             https://meet.google.com/idc-japs-hwf
> >                             >> Remember that it is this Friday Sept 14th
> >                             from 11am-noon PST.
> >                             >>
> >                             >>
> >                             >>
> >                             >> On Mon, Sep 10, 2018 at 7:30 AM
> >                             Maximilian Michels <mxm@apache.org
> >                             <ma...@apache.org>> wrote:
> >                             >>>
> >                             >>> Thanks for moving forward with this, Lukasz!
> >                             >>>
> >                             >>> Unfortunately, can't make it on Friday
> >                             but I'll sync with somebody on
> >                             >>> the call (e.g. Ryan) about your discussion.
> >                             >>>
> >                             >>> On 08.09.18 02:00, Lukasz Cwik wrote:
> >                             >>> > Thanks for everyone who wanted to fill
> >                             out the doodle poll. The most
> >                             >>> > popular time was Friday Sept 14th from
> >                             11am-noon PST. I'll send out a
> >                             >>> > calendar invite and meeting link early
> >                             next week.
> >                             >>> >
> >                             >>> > I have received a lot of feedback on
> >                             the document and have addressed
> >                             >>> > some parts of it including:
> >                             >>> > * clarifying terminology
> >                             >>> > * processing skew due to some
> >                             restrictions having their watermarks much
> >                             >>> > further behind then others affecting
> >                             scheduling of bundles by runners
> >                             >>> > * external throttling & I/O wait
> >                             overhead reporting to make sure we
> >                             >>> > don't overscale
> >                             >>> >
> >                             >>> > Areas that still need additional
> >                             feedback and details are:
> >                             >>> > * reporting progress around the work
> >                             that is done and is active
> >                             >>> > * more examples
> >                             >>> > * unbounded restrictions being caused
> >                             by an unbounded number of splits
> >                             >>> > of existing unbounded restrictions
> >                             (infinite work growth)
> >                             >>> > * whether we should be reporting this
> >                             information at the PTransform
> >                             >>> > level or at the bundle level
> >                             >>> >
> >                             >>> >
> >                             >>> >
> >                             >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz
> >                             Cwik <lcwik@google.com <ma...@google.com>
> >                             >>> > <mailto:lcwik@google.com
> >                             <ma...@google.com>>> wrote:
> >                             >>> >
> >                             >>> >     Thanks to all those who have
> >                             provided interest in this topic by the
> >                             >>> >     questions they have asked on the
> >                             doc already and for those
> >                             >>> >     interested in having this
> >                             discussion. I have setup this doodle to
> >                             >>> >     allow people to provide their
> >                             availability:
> >                             >>> >
> >                              https://doodle.com/poll/nrw7w84255xnfwqy
> >                             >>> >
> >                             >>> >     I'll send out the chosen time
> >                             based upon peoples availability and a
> >                             >>> >     Hangout link by end of day Friday
> >                             so please mark your availability
> >                             >>> >     using the link above.
> >                             >>> >
> >                             >>> >     The agenda of the meeting will be
> >                             as follows:
> >                             >>> >     * Overview of the proposal
> >                             >>> >     * Enumerate and discuss/answer
> >                             questions brought up in the meeting
> >                             >>> >
> >                             >>> >     Note that all questions and any
> >                             discussions/answers provided will be
> >                             >>> >     added to the doc for those who are
> >                             unable to attend.
> >                             >>> >
> >                             >>> >     On Fri, Aug 31, 2018 at 9:47 AM
> >                             Jean-Baptiste Onofré
> >                             >>> >     <jb@nanthrax.net
> >                             <ma...@nanthrax.net>
> >                             <mailto:jb@nanthrax.net
> >                             <ma...@nanthrax.net>>> wrote:
> >                             >>> >
> >                             >>> >         +1
> >                             >>> >
> >                             >>> >         Regards
> >                             >>> >         JB
> >                             >>> >         Le 31 août 2018, à 18:22,
> >                             Lukasz Cwik <lcwik@google.com
> >                             <ma...@google.com>
> >                             >>> >         <mailto:lcwik@google.com
> >                             <ma...@google.com>>> a écrit:
> >                             >>> >
> >                             >>> >             That is possible, I'll
> >                             take people's date/time suggestions
> >                             >>> >             and create a simple online
> >                             poll with them.
> >                             >>> >
> >                             >>> >             On Fri, Aug 31, 2018 at
> >                             2:22 AM Robert Bradshaw
> >                             >>> >             <robertwb@google.com
> >                             <ma...@google.com>
> >                             <mailto:robertwb@google.com
> >                             <ma...@google.com>>> wrote:
> >                             >>> >
> >                             >>> >                 Thanks for taking this
> >                             up. I added some comments to the
> >                             >>> >                 doc. A
> >                             European-friendly time for discussion would
> >                             >>> >                 be great.
> >                             >>> >
> >                             >>> >                 On Fri, Aug 31, 2018
> >                             at 3:14 AM Lukasz Cwik
> >                             >>> >                 <lcwik@google.com
> >                             <ma...@google.com>
> >                             <mailto:lcwik@google.com
> >                             <ma...@google.com>>> wrote:
> >                             >>> >
> >                             >>> >                     I came up with a
> >                             proposal[1] for a progress model
> >                             >>> >                     solely based off
> >                             of the backlog and that splits
> >                             >>> >                     should be based
> >                             upon the remaining backlog we want
> >                             >>> >                     the SDK to split
> >                             at. I also give recommendations to
> >                             >>> >                     runner authors as
> >                             to how an autoscaling system could
> >                             >>> >                     work based upon
> >                             the measured backlog. A lot of
> >                             >>> >                     discussions around
> >                             progress reporting and splitting
> >                             >>> >                     in the past has
> >                             always been around finding an
> >                             >>> >                     optimal solution,
> >                             after reading a lot of information
> >                             >>> >                     about work
> >                             stealing, I don't believe there is a
> >                             >>> >                     general solution
> >                             and it really is upto
> >                             >>> >                     SplittableDoFns to
> >                             be well behaved. I did not do
> >                             >>> >                     much work in
> >                             classifying what a well behaved
> >                             >>> >                     SplittableDoFn is
> >                             though. Much of this work builds
> >                             >>> >                     off ideas that
> >                             Eugene had documented in the past[2].
> >                             >>> >
> >                             >>> >                     I could use the
> >                             communities wide knowledge of
> >                             >>> >                     different I/Os to
> >                             see if computing the backlog is
> >                             >>> >                     practical in the
> >                             way that I'm suggesting and to
> >                             >>> >                     gather people's
> >                             feedback.
> >                             >>> >
> >                             >>> >                     If there is a lot
> >                             of interest, I would like to hold
> >                             >>> >                     a community video
> >                             conference between Sept 10th and
> >                             >>> >                     14th about this
> >                             topic. Please reply with your
> >                             >>> >                     availability by
> >                             Sept 6th if your interested.
> >                             >>> >
> >                             >>> >                     1:
> >                             https://s.apache.org/beam-bundles-backlog-splitting
> >                             >>> >                     2:
> >                             https://s.apache.org/beam-breaking-fusion
> >                             >>> >
> >                             >>> >                     On Mon, Aug 13,
> >                             2018 at 10:21 AM Jean-Baptiste
> >                             >>> >                     Onofré
> >                             <jb@nanthrax.net <ma...@nanthrax.net>
> >                             <mailto:jb@nanthrax.net
> >                             <ma...@nanthrax.net>>> wrote:
> >                             >>> >
> >                             >>> >                         Awesome !
> >                             >>> >
> >                             >>> >                         Thanks Luke !
> >                             >>> >
> >                             >>> >                         I plan to work
> >                             with you and others on this one.
> >                             >>> >
> >                             >>> >                         Regards
> >                             >>> >                         JB
> >                             >>> >                         Le 13 août
> >                             2018, à 19:14, Lukasz Cwik
> >                             >>> >
> >                              <lcwik@google.com <ma...@google.com>
> >                             <mailto:lcwik@google.com
> >                             <ma...@google.com>>> a
> >                             >>> >                         écrit:
> >                             >>> >
> >                             >>> >                             I wanted
> >                             to reach out that I will be
> >                             >>> >                             continuing
> >                             from where Eugene left off with
> >                             >>> >
> >                              SplittableDoFn. I know that many of you have
> >                             >>> >                             done a
> >                             bunch of work with IOs and/or runner
> >                             >>> >
> >                              integration for SplittableDoFn and would
> >                             >>> >                             appreciate
> >                             your help in advancing this
> >                             >>> >                             awesome
> >                             idea. If you have questions or
> >                             >>> >                             things you
> >                             want to get reviewed related to
> >                             >>> >
> >                              SplittableDoFn, feel free to send them my
> >                             >>> >                             way or
> >                             include me on anything SplittableDoFn
> >                             >>> >                             related.
> >                             >>> >
> >                             >>> >                             I was part
> >                             of several discussions with
> >                             >>> >                             Eugene and
> >                             I think the biggest outstanding
> >                             >>> >                             design
> >                             portion is to figure out how dynamic
> >                             >>> >                             work
> >                             rebalancing would play out with the
> >                             >>> >
> >                              portability APIs. This includes reporting of
> >                             >>> >                             progress
> >                             from within a bundle. I know that
> >                             >>> >                             Eugene had
> >                             shared some documents in this
> >                             >>> >                             regard but
> >                             the position / split models
> >                             >>> >                             didn't
> >                             work too cleanly in a unified sense
> >                             >>> >                             for
> >                             bounded and unbounded SplittableDoFns.
> >                             >>> >                             It will
> >                             likely take me awhile to gather my
> >                             >>> >                             thoughts
> >                             but could use your expertise as to
> >                             >>> >                             how
> >                             compatible these ideas are with respect
> >                             >>> >                             to to IOs
> >                             and runners
> >                             >>> >
> >                              Flink/Spark/Dataflow/Samza/Apex/... and
> >                             >>> >                             obviously
> >                             help during implementation.
> >                             >>> >
> >
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Re: SplittableDoFn

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Nice one Alex !

Thanks
Regards
JB

On 02/10/2018 23:19, Alex Van Boxel wrote:
> Don't want to crash the tech discussion here, but... I just gave a
> session at the Beam Summit about Splittable DoFn's as a users
> perspective (from things I could gather from the documentation and
> experimentation). Her is the slides deck, maybe it could be
> useful: https://docs.google.com/presentation/d/1dSc6oKh5pZItQPB_QiUyEoLT2TebMnj-pmdGipkVFPk/edit?usp=sharing (quite
> proud of the animations though ;-)
> 
>  _/
> _/ Alex Van Boxel
> 
> 
> On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik <lcwik@google.com
> <ma...@google.com>> wrote:
> 
>     Reuven, just inside the restriction tracker itself which is scoped
>     per executing SplittableDoFn. A user could incorrectly write the
>     synchronization since they are currently responsible for writing it
>     though.
> 
>     On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax <relax@google.com
>     <ma...@google.com>> wrote:
> 
>         is synchronization over an entire work item, or just inside
>         restriction tracker? my concern is that some runners (especially
>         streaming runners) might have hundreds or thousands of parallel
>         work items being processed for the same SDF (for different
>         keys), and I'm afraid of creating lock-contention bottlenecks.
> 
>         On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik <lcwik@google.com
>         <ma...@google.com>> wrote:
> 
>             The synchronization is related to Java thread safety since
>             there is likely to be concurrent access needed to a
>             restriction tracker to properly handle accessing the backlog
>             and splitting concurrently from when the users DoFn is
>             executing and updating the restriction tracker. This is
>             similar to the Java thread safety needed in BoundedSource
>             and UnboundedSource for fraction consumed, backlog bytes,
>             and splitting.
> 
>             On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax <relax@google.com
>             <ma...@google.com>> wrote:
> 
>                 Can you give details on what the synchronization is per?
>                 Is it per key, or global to each worker?
> 
>                 On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik
>                 <lcwik@google.com <ma...@google.com>> wrote:
> 
>                     As I was looking at the SplittableDoFn API while
>                     working towards making a proposal for how the
>                     backlog/splitting API could look, I found some sharp
>                     edges that could be improved.
> 
>                     I noticed that:
>                     1) We require users to write thread safe code, this
>                     is something that we haven't asked of users when
>                     writing a DoFn.
>                     2) We "internal" methods within the
>                     RestrictionTracker that are not meant to be used by
>                     the runner.
> 
>                     I can fix these issues by giving the user a
>                     forwarding restriction tracker[1] that provides an
>                     appropriate level of synchronization as needed and
>                     also provides the necessary observation hooks to see
>                     when a claim failed or succeeded.
> 
>                     This requires a change to our experimental API since
>                     we need to pass a RestrictionTracker to the
>                     @ProcessElement method instead of a sub-type of
>                     RestrictionTracker.
>                     @ProcessElement
>                     processElement(ProcessContext context,
>                     OffsetRangeTracker tracker) { ... }
>                     becomes:
>                     @ProcessElement
>                     processElement(ProcessContext context,
>                     RestrictionTracker<OffsetRange, Long> tracker) { ... }
> 
>                     This provides an additional benefit that it prevents
>                     users from working around the RestrictionTracker
>                     APIs and potentially making underlying changes to
>                     the tracker outside of the tryClaim call.
> 
>                     Full implementation is available within this PR[2]
>                     and was wondering what people thought.
> 
>                     1: https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
>                     2: https://github.com/apache/beam/pull/6467
> 
> 
>                     On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik
>                     <lcwik@google.com <ma...@google.com>> wrote:
> 
>                         The changes to the API have not been proposed
>                         yet. So far it has all been about what is the
>                         representation and why.
> 
>                         For splitting, the current idea has been about
>                         using the backlog as a way of telling the
>                         SplittableDoFn where to split, so it would be in
>                         terms of whatever the SDK decided to report.
>                         The runner always chooses a number for backlog
>                         that is relative to the SDKs reported backlog.
>                         It would be upto the SDK to round/clamp the
>                         number given by the Runner to represent
>                         something meaningful for itself.
>                         For example if the backlog that the SDK was
>                         reporting was bytes remaining in a file such as
>                         500, then the Runner could provide some value
>                         like 212.2 which the SDK would then round to 212.
>                         If the backlog that the SDK was reporting 57
>                         pubsub messages, then the Runner could provide a
>                         value like 300 which would mean to read 57
>                         values and then another 243 as part of the
>                         current restriction.
> 
>                         I believe that BoundedSource/UnboundedSource
>                         will have wrappers added that provide a basic
>                         SplittableDoFn implementation so existing IOs
>                         should be migrated over without API changes.
> 
>                         On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía
>                         <iemejia@gmail.com <ma...@gmail.com>>
>                         wrote:
> 
>                             Thanks a lot Luke for bringing this back to
>                             the mailing list and Ryan for taking
>                             the notes.
> 
>                             I would like to know if there was some
>                             discussion, or if you guys have given
>                             some thought to the required changes in the
>                             SDK (API) part. What will be the
>                             equivalent of `splitAtFraction` and what
>                             should IO authors do to support it..
> 
>                             On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik
>                             <lcwik@google.com <ma...@google.com>>
>                             wrote:
>                             >
>                             > Thanks to everyone who joined and for the
>                             questions asked.
>                             >
>                             > Ryan graciously collected notes of the
>                             discussion:
>                             https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
>                             >
>                             > The summary was that bringing
>                             BoundedSource/UnboundedSource into using a
>                             unified backlog-reporting mechanism with
>                             optional other signals that Dataflow has
>                             found useful (such as is the remaining
>                             restriction splittable (yes, no, unknown)).
>                             Other runners can use or not. SDFs should
>                             report backlog and watermark as minimum bar.
>                             The backlog should use an arbitrary
>                             precision float such as Java BigDecimal to
>                             prevent issues where limited precision
>                             removes the ability to compute delta
>                             efficiently.
>                             >
>                             >
>                             >
>                             > On Wed, Sep 12, 2018 at 3:54 PM Lukasz
>                             Cwik <lcwik@google.com
>                             <ma...@google.com>> wrote:
>                             >>
>                             >> Here is the link to join the discussion:
>                             https://meet.google.com/idc-japs-hwf
>                             >> Remember that it is this Friday Sept 14th
>                             from 11am-noon PST.
>                             >>
>                             >>
>                             >>
>                             >> On Mon, Sep 10, 2018 at 7:30 AM
>                             Maximilian Michels <mxm@apache.org
>                             <ma...@apache.org>> wrote:
>                             >>>
>                             >>> Thanks for moving forward with this, Lukasz!
>                             >>>
>                             >>> Unfortunately, can't make it on Friday
>                             but I'll sync with somebody on
>                             >>> the call (e.g. Ryan) about your discussion.
>                             >>>
>                             >>> On 08.09.18 02:00, Lukasz Cwik wrote:
>                             >>> > Thanks for everyone who wanted to fill
>                             out the doodle poll. The most
>                             >>> > popular time was Friday Sept 14th from
>                             11am-noon PST. I'll send out a
>                             >>> > calendar invite and meeting link early
>                             next week.
>                             >>> >
>                             >>> > I have received a lot of feedback on
>                             the document and have addressed
>                             >>> > some parts of it including:
>                             >>> > * clarifying terminology
>                             >>> > * processing skew due to some
>                             restrictions having their watermarks much
>                             >>> > further behind then others affecting
>                             scheduling of bundles by runners
>                             >>> > * external throttling & I/O wait
>                             overhead reporting to make sure we
>                             >>> > don't overscale
>                             >>> >
>                             >>> > Areas that still need additional
>                             feedback and details are:
>                             >>> > * reporting progress around the work
>                             that is done and is active
>                             >>> > * more examples
>                             >>> > * unbounded restrictions being caused
>                             by an unbounded number of splits
>                             >>> > of existing unbounded restrictions
>                             (infinite work growth)
>                             >>> > * whether we should be reporting this
>                             information at the PTransform
>                             >>> > level or at the bundle level
>                             >>> >
>                             >>> >
>                             >>> >
>                             >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz
>                             Cwik <lcwik@google.com <ma...@google.com>
>                             >>> > <mailto:lcwik@google.com
>                             <ma...@google.com>>> wrote:
>                             >>> >
>                             >>> >     Thanks to all those who have
>                             provided interest in this topic by the
>                             >>> >     questions they have asked on the
>                             doc already and for those
>                             >>> >     interested in having this
>                             discussion. I have setup this doodle to
>                             >>> >     allow people to provide their
>                             availability:
>                             >>> >   
>                              https://doodle.com/poll/nrw7w84255xnfwqy
>                             >>> >
>                             >>> >     I'll send out the chosen time
>                             based upon peoples availability and a
>                             >>> >     Hangout link by end of day Friday
>                             so please mark your availability
>                             >>> >     using the link above.
>                             >>> >
>                             >>> >     The agenda of the meeting will be
>                             as follows:
>                             >>> >     * Overview of the proposal
>                             >>> >     * Enumerate and discuss/answer
>                             questions brought up in the meeting
>                             >>> >
>                             >>> >     Note that all questions and any
>                             discussions/answers provided will be
>                             >>> >     added to the doc for those who are
>                             unable to attend.
>                             >>> >
>                             >>> >     On Fri, Aug 31, 2018 at 9:47 AM
>                             Jean-Baptiste Onofré
>                             >>> >     <jb@nanthrax.net
>                             <ma...@nanthrax.net>
>                             <mailto:jb@nanthrax.net
>                             <ma...@nanthrax.net>>> wrote:
>                             >>> >
>                             >>> >         +1
>                             >>> >
>                             >>> >         Regards
>                             >>> >         JB
>                             >>> >         Le 31 août 2018, à 18:22,
>                             Lukasz Cwik <lcwik@google.com
>                             <ma...@google.com>
>                             >>> >         <mailto:lcwik@google.com
>                             <ma...@google.com>>> a écrit:
>                             >>> >
>                             >>> >             That is possible, I'll
>                             take people's date/time suggestions
>                             >>> >             and create a simple online
>                             poll with them.
>                             >>> >
>                             >>> >             On Fri, Aug 31, 2018 at
>                             2:22 AM Robert Bradshaw
>                             >>> >             <robertwb@google.com
>                             <ma...@google.com>
>                             <mailto:robertwb@google.com
>                             <ma...@google.com>>> wrote:
>                             >>> >
>                             >>> >                 Thanks for taking this
>                             up. I added some comments to the
>                             >>> >                 doc. A
>                             European-friendly time for discussion would
>                             >>> >                 be great.
>                             >>> >
>                             >>> >                 On Fri, Aug 31, 2018
>                             at 3:14 AM Lukasz Cwik
>                             >>> >                 <lcwik@google.com
>                             <ma...@google.com>
>                             <mailto:lcwik@google.com
>                             <ma...@google.com>>> wrote:
>                             >>> >
>                             >>> >                     I came up with a
>                             proposal[1] for a progress model
>                             >>> >                     solely based off
>                             of the backlog and that splits
>                             >>> >                     should be based
>                             upon the remaining backlog we want
>                             >>> >                     the SDK to split
>                             at. I also give recommendations to
>                             >>> >                     runner authors as
>                             to how an autoscaling system could
>                             >>> >                     work based upon
>                             the measured backlog. A lot of
>                             >>> >                     discussions around
>                             progress reporting and splitting
>                             >>> >                     in the past has
>                             always been around finding an
>                             >>> >                     optimal solution,
>                             after reading a lot of information
>                             >>> >                     about work
>                             stealing, I don't believe there is a
>                             >>> >                     general solution
>                             and it really is upto
>                             >>> >                     SplittableDoFns to
>                             be well behaved. I did not do
>                             >>> >                     much work in
>                             classifying what a well behaved
>                             >>> >                     SplittableDoFn is
>                             though. Much of this work builds
>                             >>> >                     off ideas that
>                             Eugene had documented in the past[2].
>                             >>> >
>                             >>> >                     I could use the
>                             communities wide knowledge of
>                             >>> >                     different I/Os to
>                             see if computing the backlog is
>                             >>> >                     practical in the
>                             way that I'm suggesting and to
>                             >>> >                     gather people's
>                             feedback.
>                             >>> >
>                             >>> >                     If there is a lot
>                             of interest, I would like to hold
>                             >>> >                     a community video
>                             conference between Sept 10th and
>                             >>> >                     14th about this
>                             topic. Please reply with your
>                             >>> >                     availability by
>                             Sept 6th if your interested.
>                             >>> >
>                             >>> >                     1:
>                             https://s.apache.org/beam-bundles-backlog-splitting
>                             >>> >                     2:
>                             https://s.apache.org/beam-breaking-fusion
>                             >>> >
>                             >>> >                     On Mon, Aug 13,
>                             2018 at 10:21 AM Jean-Baptiste
>                             >>> >                     Onofré
>                             <jb@nanthrax.net <ma...@nanthrax.net>
>                             <mailto:jb@nanthrax.net
>                             <ma...@nanthrax.net>>> wrote:
>                             >>> >
>                             >>> >                         Awesome !
>                             >>> >
>                             >>> >                         Thanks Luke !
>                             >>> >
>                             >>> >                         I plan to work
>                             with you and others on this one.
>                             >>> >
>                             >>> >                         Regards
>                             >>> >                         JB
>                             >>> >                         Le 13 août
>                             2018, à 19:14, Lukasz Cwik
>                             >>> >                       
>                              <lcwik@google.com <ma...@google.com>
>                             <mailto:lcwik@google.com
>                             <ma...@google.com>>> a
>                             >>> >                         écrit:
>                             >>> >
>                             >>> >                             I wanted
>                             to reach out that I will be
>                             >>> >                             continuing
>                             from where Eugene left off with
>                             >>> >                           
>                              SplittableDoFn. I know that many of you have
>                             >>> >                             done a
>                             bunch of work with IOs and/or runner
>                             >>> >                           
>                              integration for SplittableDoFn and would
>                             >>> >                             appreciate
>                             your help in advancing this
>                             >>> >                             awesome
>                             idea. If you have questions or
>                             >>> >                             things you
>                             want to get reviewed related to
>                             >>> >                           
>                              SplittableDoFn, feel free to send them my
>                             >>> >                             way or
>                             include me on anything SplittableDoFn
>                             >>> >                             related.
>                             >>> >
>                             >>> >                             I was part
>                             of several discussions with
>                             >>> >                             Eugene and
>                             I think the biggest outstanding
>                             >>> >                             design
>                             portion is to figure out how dynamic
>                             >>> >                             work
>                             rebalancing would play out with the
>                             >>> >                           
>                              portability APIs. This includes reporting of
>                             >>> >                             progress
>                             from within a bundle. I know that
>                             >>> >                             Eugene had
>                             shared some documents in this
>                             >>> >                             regard but
>                             the position / split models
>                             >>> >                             didn't
>                             work too cleanly in a unified sense
>                             >>> >                             for
>                             bounded and unbounded SplittableDoFns.
>                             >>> >                             It will
>                             likely take me awhile to gather my
>                             >>> >                             thoughts
>                             but could use your expertise as to
>                             >>> >                             how
>                             compatible these ideas are with respect
>                             >>> >                             to to IOs
>                             and runners
>                             >>> >                           
>                              Flink/Spark/Dataflow/Samza/Apex/... and
>                             >>> >                             obviously
>                             help during implementation.
>                             >>> >
> 

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Re: SplittableDoFn

Posted by Lukasz Cwik <lc...@google.com>.
It turned out that not providing direct access to the ByteKeyRangeTracker
and only to the RestrictionTracker prevented the usage of a "markDone"
method which through further investigation seems to have been hiding a bug
where we thought that lexicographically the byte key { 0x02 } was the
smallest key greater then { 0x01 } when it should have been { 0x01 0x00 }.
Will need to investigate further but it could be that we are actually
missing processing keys if your next key based upon the old calculation was
greater then the end key.

On Fri, Sep 21, 2018 at 3:41 PM Lukasz Cwik <lc...@google.com> wrote:

> The synchronization is related to Java thread safety since there is likely
> to be concurrent access needed to a restriction tracker to properly handle
> accessing the backlog and splitting concurrently from when the users DoFn
> is executing and updating the restriction tracker. This is similar to the
> Java thread safety needed in BoundedSource and UnboundedSource for fraction
> consumed, backlog bytes, and splitting.
>
> On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax <re...@google.com> wrote:
>
>> Can you give details on what the synchronization is per? Is it per key,
>> or global to each worker?
>>
>> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> As I was looking at the SplittableDoFn API while working towards making
>>> a proposal for how the backlog/splitting API could look, I found some sharp
>>> edges that could be improved.
>>>
>>> I noticed that:
>>> 1) We require users to write thread safe code, this is something that we
>>> haven't asked of users when writing a DoFn.
>>> 2) We "internal" methods within the RestrictionTracker that are not
>>> meant to be used by the runner.
>>>
>>> I can fix these issues by giving the user a forwarding restriction
>>> tracker[1] that provides an appropriate level of synchronization as needed
>>> and also provides the necessary observation hooks to see when a claim
>>> failed or succeeded.
>>>
>>> This requires a change to our experimental API since we need to pass
>>> a RestrictionTracker to the @ProcessElement method instead of a sub-type of
>>> RestrictionTracker.
>>> @ProcessElement
>>> processElement(ProcessContext context, OffsetRangeTracker tracker) { ...
>>> }
>>> becomes:
>>> @ProcessElement
>>> processElement(ProcessContext context, RestrictionTracker<OffsetRange,
>>> Long> tracker) { ... }
>>>
>>> This provides an additional benefit that it prevents users from working
>>> around the RestrictionTracker APIs and potentially making underlying
>>> changes to the tracker outside of the tryClaim call.
>>>
>>> Full implementation is available within this PR[2] and was wondering
>>> what people thought.
>>>
>>> 1:
>>> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
>>> 2: https://github.com/apache/beam/pull/6467
>>>
>>>
>>> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> The changes to the API have not been proposed yet. So far it has all
>>>> been about what is the representation and why.
>>>>
>>>> For splitting, the current idea has been about using the backlog as a
>>>> way of telling the SplittableDoFn where to split, so it would be in terms
>>>> of whatever the SDK decided to report.
>>>> The runner always chooses a number for backlog that is relative to the
>>>> SDKs reported backlog. It would be upto the SDK to round/clamp the number
>>>> given by the Runner to represent something meaningful for itself.
>>>> For example if the backlog that the SDK was reporting was bytes
>>>> remaining in a file such as 500, then the Runner could provide some value
>>>> like 212.2 which the SDK would then round to 212.
>>>> If the backlog that the SDK was reporting 57 pubsub messages, then the
>>>> Runner could provide a value like 300 which would mean to read 57 values
>>>> and then another 243 as part of the current restriction.
>>>>
>>>> I believe that BoundedSource/UnboundedSource will have wrappers added
>>>> that provide a basic SplittableDoFn implementation so existing IOs should
>>>> be migrated over without API changes.
>>>>
>>>> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>>>
>>>>> Thanks a lot Luke for bringing this back to the mailing list and Ryan
>>>>> for taking
>>>>> the notes.
>>>>>
>>>>> I would like to know if there was some discussion, or if you guys have
>>>>> given
>>>>> some thought to the required changes in the SDK (API) part. What will
>>>>> be the
>>>>> equivalent of `splitAtFraction` and what should IO authors do to
>>>>> support it..
>>>>>
>>>>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>> >
>>>>> > Thanks to everyone who joined and for the questions asked.
>>>>> >
>>>>> > Ryan graciously collected notes of the discussion:
>>>>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
>>>>> >
>>>>> > The summary was that bringing BoundedSource/UnboundedSource into
>>>>> using a unified backlog-reporting mechanism with optional other signals
>>>>> that Dataflow has found useful (such as is the remaining restriction
>>>>> splittable (yes, no, unknown)). Other runners can use or not. SDFs should
>>>>> report backlog and watermark as minimum bar. The backlog should use an
>>>>> arbitrary precision float such as Java BigDecimal to prevent issues where
>>>>> limited precision removes the ability to compute delta efficiently.
>>>>> >
>>>>> >
>>>>> >
>>>>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <lc...@google.com>
>>>>> wrote:
>>>>> >>
>>>>> >> Here is the link to join the discussion:
>>>>> https://meet.google.com/idc-japs-hwf
>>>>> >> Remember that it is this Friday Sept 14th from 11am-noon PST.
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <mx...@apache.org>
>>>>> wrote:
>>>>> >>>
>>>>> >>> Thanks for moving forward with this, Lukasz!
>>>>> >>>
>>>>> >>> Unfortunately, can't make it on Friday but I'll sync with somebody
>>>>> on
>>>>> >>> the call (e.g. Ryan) about your discussion.
>>>>> >>>
>>>>> >>> On 08.09.18 02:00, Lukasz Cwik wrote:
>>>>> >>> > Thanks for everyone who wanted to fill out the doodle poll. The
>>>>> most
>>>>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll send
>>>>> out a
>>>>> >>> > calendar invite and meeting link early next week.
>>>>> >>> >
>>>>> >>> > I have received a lot of feedback on the document and have
>>>>> addressed
>>>>> >>> > some parts of it including:
>>>>> >>> > * clarifying terminology
>>>>> >>> > * processing skew due to some restrictions having their
>>>>> watermarks much
>>>>> >>> > further behind then others affecting scheduling of bundles by
>>>>> runners
>>>>> >>> > * external throttling & I/O wait overhead reporting to make sure
>>>>> we
>>>>> >>> > don't overscale
>>>>> >>> >
>>>>> >>> > Areas that still need additional feedback and details are:
>>>>> >>> > * reporting progress around the work that is done and is active
>>>>> >>> > * more examples
>>>>> >>> > * unbounded restrictions being caused by an unbounded number of
>>>>> splits
>>>>> >>> > of existing unbounded restrictions (infinite work growth)
>>>>> >>> > * whether we should be reporting this information at the
>>>>> PTransform
>>>>> >>> > level or at the bundle level
>>>>> >>> >
>>>>> >>> >
>>>>> >>> >
>>>>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lcwik@google.com
>>>>> >>> > <ma...@google.com>> wrote:
>>>>> >>> >
>>>>> >>> >     Thanks to all those who have provided interest in this topic
>>>>> by the
>>>>> >>> >     questions they have asked on the doc already and for those
>>>>> >>> >     interested in having this discussion. I have setup this
>>>>> doodle to
>>>>> >>> >     allow people to provide their availability:
>>>>> >>> >     https://doodle.com/poll/nrw7w84255xnfwqy
>>>>> >>> >
>>>>> >>> >     I'll send out the chosen time based upon peoples
>>>>> availability and a
>>>>> >>> >     Hangout link by end of day Friday so please mark your
>>>>> availability
>>>>> >>> >     using the link above.
>>>>> >>> >
>>>>> >>> >     The agenda of the meeting will be as follows:
>>>>> >>> >     * Overview of the proposal
>>>>> >>> >     * Enumerate and discuss/answer questions brought up in the
>>>>> meeting
>>>>> >>> >
>>>>> >>> >     Note that all questions and any discussions/answers provided
>>>>> will be
>>>>> >>> >     added to the doc for those who are unable to attend.
>>>>> >>> >
>>>>> >>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>>>>> >>> >     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>>>>> >>> >
>>>>> >>> >         +1
>>>>> >>> >
>>>>> >>> >         Regards
>>>>> >>> >         JB
>>>>> >>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <lcwik@google.com
>>>>> >>> >         <ma...@google.com>> a écrit:
>>>>> >>> >
>>>>> >>> >             That is possible, I'll take people's date/time
>>>>> suggestions
>>>>> >>> >             and create a simple online poll with them.
>>>>> >>> >
>>>>> >>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>>>>> >>> >             <robertwb@google.com <ma...@google.com>>
>>>>> wrote:
>>>>> >>> >
>>>>> >>> >                 Thanks for taking this up. I added some comments
>>>>> to the
>>>>> >>> >                 doc. A European-friendly time for discussion
>>>>> would
>>>>> >>> >                 be great.
>>>>> >>> >
>>>>> >>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>>>>> >>> >                 <lcwik@google.com <ma...@google.com>>
>>>>> wrote:
>>>>> >>> >
>>>>> >>> >                     I came up with a proposal[1] for a progress
>>>>> model
>>>>> >>> >                     solely based off of the backlog and that
>>>>> splits
>>>>> >>> >                     should be based upon the remaining backlog
>>>>> we want
>>>>> >>> >                     the SDK to split at. I also give
>>>>> recommendations to
>>>>> >>> >                     runner authors as to how an autoscaling
>>>>> system could
>>>>> >>> >                     work based upon the measured backlog. A lot
>>>>> of
>>>>> >>> >                     discussions around progress reporting and
>>>>> splitting
>>>>> >>> >                     in the past has always been around finding an
>>>>> >>> >                     optimal solution, after reading a lot of
>>>>> information
>>>>> >>> >                     about work stealing, I don't believe there
>>>>> is a
>>>>> >>> >                     general solution and it really is upto
>>>>> >>> >                     SplittableDoFns to be well behaved. I did
>>>>> not do
>>>>> >>> >                     much work in classifying what a well behaved
>>>>> >>> >                     SplittableDoFn is though. Much of this work
>>>>> builds
>>>>> >>> >                     off ideas that Eugene had documented in the
>>>>> past[2].
>>>>> >>> >
>>>>> >>> >                     I could use the communities wide knowledge of
>>>>> >>> >                     different I/Os to see if computing the
>>>>> backlog is
>>>>> >>> >                     practical in the way that I'm suggesting and
>>>>> to
>>>>> >>> >                     gather people's feedback.
>>>>> >>> >
>>>>> >>> >                     If there is a lot of interest, I would like
>>>>> to hold
>>>>> >>> >                     a community video conference between Sept
>>>>> 10th and
>>>>> >>> >                     14th about this topic. Please reply with your
>>>>> >>> >                     availability by Sept 6th if your interested.
>>>>> >>> >
>>>>> >>> >                     1:
>>>>> https://s.apache.org/beam-bundles-backlog-splitting
>>>>> >>> >                     2: https://s.apache.org/beam-breaking-fusion
>>>>> >>> >
>>>>> >>> >                     On Mon, Aug 13, 2018 at 10:21 AM
>>>>> Jean-Baptiste
>>>>> >>> >                     Onofré <jb@nanthrax.net <mailto:
>>>>> jb@nanthrax.net>> wrote:
>>>>> >>> >
>>>>> >>> >                         Awesome !
>>>>> >>> >
>>>>> >>> >                         Thanks Luke !
>>>>> >>> >
>>>>> >>> >                         I plan to work with you and others on
>>>>> this one.
>>>>> >>> >
>>>>> >>> >                         Regards
>>>>> >>> >                         JB
>>>>> >>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
>>>>> >>> >                         <lcwik@google.com <mailto:
>>>>> lcwik@google.com>> a
>>>>> >>> >                         écrit:
>>>>> >>> >
>>>>> >>> >                             I wanted to reach out that I will be
>>>>> >>> >                             continuing from where Eugene left
>>>>> off with
>>>>> >>> >                             SplittableDoFn. I know that many of
>>>>> you have
>>>>> >>> >                             done a bunch of work with IOs and/or
>>>>> runner
>>>>> >>> >                             integration for SplittableDoFn and
>>>>> would
>>>>> >>> >                             appreciate your help in advancing
>>>>> this
>>>>> >>> >                             awesome idea. If you have questions
>>>>> or
>>>>> >>> >                             things you want to get reviewed
>>>>> related to
>>>>> >>> >                             SplittableDoFn, feel free to send
>>>>> them my
>>>>> >>> >                             way or include me on anything
>>>>> SplittableDoFn
>>>>> >>> >                             related.
>>>>> >>> >
>>>>> >>> >                             I was part of several discussions
>>>>> with
>>>>> >>> >                             Eugene and I think the biggest
>>>>> outstanding
>>>>> >>> >                             design portion is to figure out how
>>>>> dynamic
>>>>> >>> >                             work rebalancing would play out with
>>>>> the
>>>>> >>> >                             portability APIs. This includes
>>>>> reporting of
>>>>> >>> >                             progress from within a bundle. I
>>>>> know that
>>>>> >>> >                             Eugene had shared some documents in
>>>>> this
>>>>> >>> >                             regard but the position / split
>>>>> models
>>>>> >>> >                             didn't work too cleanly in a unified
>>>>> sense
>>>>> >>> >                             for bounded and unbounded
>>>>> SplittableDoFns.
>>>>> >>> >                             It will likely take me awhile to
>>>>> gather my
>>>>> >>> >                             thoughts but could use your
>>>>> expertise as to
>>>>> >>> >                             how compatible these ideas are with
>>>>> respect
>>>>> >>> >                             to to IOs and runners
>>>>> >>> >                             Flink/Spark/Dataflow/Samza/Apex/...
>>>>> and
>>>>> >>> >                             obviously help during implementation.
>>>>> >>> >
>>>>>
>>>>

Re: SplittableDoFn

Posted by Eugene Kirpichov <ki...@google.com>.
Very cool, thanks Alex!

On Tue, Oct 2, 2018 at 2:19 PM Alex Van Boxel <al...@vanboxel.be> wrote:

> Don't want to crash the tech discussion here, but... I just gave a session
> at the Beam Summit about Splittable DoFn's as a users perspective (from
> things I could gather from the documentation and experimentation). Her is
> the slides deck, maybe it could be useful:
> https://docs.google.com/presentation/d/1dSc6oKh5pZItQPB_QiUyEoLT2TebMnj-pmdGipkVFPk/edit?usp=sharing (quite
> proud of the animations though ;-)
>
>  _/
>
> _/ Alex Van Boxel
>
>
> On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> Reuven, just inside the restriction tracker itself which is scoped per
>> executing SplittableDoFn. A user could incorrectly write the
>> synchronization since they are currently responsible for writing it though.
>>
>> On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax <re...@google.com> wrote:
>>
>>> is synchronization over an entire work item, or just inside restriction
>>> tracker? my concern is that some runners (especially streaming runners)
>>> might have hundreds or thousands of parallel work items being processed for
>>> the same SDF (for different keys), and I'm afraid of creating
>>> lock-contention bottlenecks.
>>>
>>> On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> The synchronization is related to Java thread safety since there is
>>>> likely to be concurrent access needed to a restriction tracker to properly
>>>> handle accessing the backlog and splitting concurrently from when the users
>>>> DoFn is executing and updating the restriction tracker. This is similar to
>>>> the Java thread safety needed in BoundedSource and UnboundedSource for
>>>> fraction consumed, backlog bytes, and splitting.
>>>>
>>>> On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Can you give details on what the synchronization is per? Is it per
>>>>> key, or global to each worker?
>>>>>
>>>>> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> As I was looking at the SplittableDoFn API while working towards
>>>>>> making a proposal for how the backlog/splitting API could look, I found
>>>>>> some sharp edges that could be improved.
>>>>>>
>>>>>> I noticed that:
>>>>>> 1) We require users to write thread safe code, this is something that
>>>>>> we haven't asked of users when writing a DoFn.
>>>>>> 2) We "internal" methods within the RestrictionTracker that are not
>>>>>> meant to be used by the runner.
>>>>>>
>>>>>> I can fix these issues by giving the user a forwarding restriction
>>>>>> tracker[1] that provides an appropriate level of synchronization as needed
>>>>>> and also provides the necessary observation hooks to see when a claim
>>>>>> failed or succeeded.
>>>>>>
>>>>>> This requires a change to our experimental API since we need to pass
>>>>>> a RestrictionTracker to the @ProcessElement method instead of a sub-type of
>>>>>> RestrictionTracker.
>>>>>> @ProcessElement
>>>>>> processElement(ProcessContext context, OffsetRangeTracker tracker) {
>>>>>> ... }
>>>>>> becomes:
>>>>>> @ProcessElement
>>>>>> processElement(ProcessContext context,
>>>>>> RestrictionTracker<OffsetRange, Long> tracker) { ... }
>>>>>>
>>>>>> This provides an additional benefit that it prevents users from
>>>>>> working around the RestrictionTracker APIs and potentially making
>>>>>> underlying changes to the tracker outside of the tryClaim call.
>>>>>>
>>>>>> Full implementation is available within this PR[2] and was wondering
>>>>>> what people thought.
>>>>>>
>>>>>> 1:
>>>>>> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
>>>>>> 2: https://github.com/apache/beam/pull/6467
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> The changes to the API have not been proposed yet. So far it has all
>>>>>>> been about what is the representation and why.
>>>>>>>
>>>>>>> For splitting, the current idea has been about using the backlog as
>>>>>>> a way of telling the SplittableDoFn where to split, so it would be in terms
>>>>>>> of whatever the SDK decided to report.
>>>>>>> The runner always chooses a number for backlog that is relative to
>>>>>>> the SDKs reported backlog. It would be upto the SDK to round/clamp the
>>>>>>> number given by the Runner to represent something meaningful for itself.
>>>>>>> For example if the backlog that the SDK was reporting was bytes
>>>>>>> remaining in a file such as 500, then the Runner could provide some value
>>>>>>> like 212.2 which the SDK would then round to 212.
>>>>>>> If the backlog that the SDK was reporting 57 pubsub messages, then
>>>>>>> the Runner could provide a value like 300 which would mean to read 57
>>>>>>> values and then another 243 as part of the current restriction.
>>>>>>>
>>>>>>> I believe that BoundedSource/UnboundedSource will have wrappers
>>>>>>> added that provide a basic SplittableDoFn implementation so existing IOs
>>>>>>> should be migrated over without API changes.
>>>>>>>
>>>>>>> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <ie...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks a lot Luke for bringing this back to the mailing list and
>>>>>>>> Ryan for taking
>>>>>>>> the notes.
>>>>>>>>
>>>>>>>> I would like to know if there was some discussion, or if you guys
>>>>>>>> have given
>>>>>>>> some thought to the required changes in the SDK (API) part. What
>>>>>>>> will be the
>>>>>>>> equivalent of `splitAtFraction` and what should IO authors do to
>>>>>>>> support it..
>>>>>>>>
>>>>>>>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > Thanks to everyone who joined and for the questions asked.
>>>>>>>> >
>>>>>>>> > Ryan graciously collected notes of the discussion:
>>>>>>>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
>>>>>>>> >
>>>>>>>> > The summary was that bringing BoundedSource/UnboundedSource into
>>>>>>>> using a unified backlog-reporting mechanism with optional other signals
>>>>>>>> that Dataflow has found useful (such as is the remaining restriction
>>>>>>>> splittable (yes, no, unknown)). Other runners can use or not. SDFs should
>>>>>>>> report backlog and watermark as minimum bar. The backlog should use an
>>>>>>>> arbitrary precision float such as Java BigDecimal to prevent issues where
>>>>>>>> limited precision removes the ability to compute delta efficiently.
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <lc...@google.com>
>>>>>>>> wrote:
>>>>>>>> >>
>>>>>>>> >> Here is the link to join the discussion:
>>>>>>>> https://meet.google.com/idc-japs-hwf
>>>>>>>> >> Remember that it is this Friday Sept 14th from 11am-noon PST.
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <
>>>>>>>> mxm@apache.org> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Thanks for moving forward with this, Lukasz!
>>>>>>>> >>>
>>>>>>>> >>> Unfortunately, can't make it on Friday but I'll sync with
>>>>>>>> somebody on
>>>>>>>> >>> the call (e.g. Ryan) about your discussion.
>>>>>>>> >>>
>>>>>>>> >>> On 08.09.18 02:00, Lukasz Cwik wrote:
>>>>>>>> >>> > Thanks for everyone who wanted to fill out the doodle poll.
>>>>>>>> The most
>>>>>>>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll
>>>>>>>> send out a
>>>>>>>> >>> > calendar invite and meeting link early next week.
>>>>>>>> >>> >
>>>>>>>> >>> > I have received a lot of feedback on the document and have
>>>>>>>> addressed
>>>>>>>> >>> > some parts of it including:
>>>>>>>> >>> > * clarifying terminology
>>>>>>>> >>> > * processing skew due to some restrictions having their
>>>>>>>> watermarks much
>>>>>>>> >>> > further behind then others affecting scheduling of bundles by
>>>>>>>> runners
>>>>>>>> >>> > * external throttling & I/O wait overhead reporting to make
>>>>>>>> sure we
>>>>>>>> >>> > don't overscale
>>>>>>>> >>> >
>>>>>>>> >>> > Areas that still need additional feedback and details are:
>>>>>>>> >>> > * reporting progress around the work that is done and is
>>>>>>>> active
>>>>>>>> >>> > * more examples
>>>>>>>> >>> > * unbounded restrictions being caused by an unbounded number
>>>>>>>> of splits
>>>>>>>> >>> > of existing unbounded restrictions (infinite work growth)
>>>>>>>> >>> > * whether we should be reporting this information at the
>>>>>>>> PTransform
>>>>>>>> >>> > level or at the bundle level
>>>>>>>> >>> >
>>>>>>>> >>> >
>>>>>>>> >>> >
>>>>>>>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lcwik@google.com
>>>>>>>> >>> > <ma...@google.com>> wrote:
>>>>>>>> >>> >
>>>>>>>> >>> >     Thanks to all those who have provided interest in this
>>>>>>>> topic by the
>>>>>>>> >>> >     questions they have asked on the doc already and for those
>>>>>>>> >>> >     interested in having this discussion. I have setup this
>>>>>>>> doodle to
>>>>>>>> >>> >     allow people to provide their availability:
>>>>>>>> >>> >     https://doodle.com/poll/nrw7w84255xnfwqy
>>>>>>>> >>> >
>>>>>>>> >>> >     I'll send out the chosen time based upon peoples
>>>>>>>> availability and a
>>>>>>>> >>> >     Hangout link by end of day Friday so please mark your
>>>>>>>> availability
>>>>>>>> >>> >     using the link above.
>>>>>>>> >>> >
>>>>>>>> >>> >     The agenda of the meeting will be as follows:
>>>>>>>> >>> >     * Overview of the proposal
>>>>>>>> >>> >     * Enumerate and discuss/answer questions brought up in
>>>>>>>> the meeting
>>>>>>>> >>> >
>>>>>>>> >>> >     Note that all questions and any discussions/answers
>>>>>>>> provided will be
>>>>>>>> >>> >     added to the doc for those who are unable to attend.
>>>>>>>> >>> >
>>>>>>>> >>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>>>>>>>> >>> >     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>>>>>>>> >>> >
>>>>>>>> >>> >         +1
>>>>>>>> >>> >
>>>>>>>> >>> >         Regards
>>>>>>>> >>> >         JB
>>>>>>>> >>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <
>>>>>>>> lcwik@google.com
>>>>>>>> >>> >         <ma...@google.com>> a écrit:
>>>>>>>> >>> >
>>>>>>>> >>> >             That is possible, I'll take people's date/time
>>>>>>>> suggestions
>>>>>>>> >>> >             and create a simple online poll with them.
>>>>>>>> >>> >
>>>>>>>> >>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>>>>>>>> >>> >             <robertwb@google.com <ma...@google.com>>
>>>>>>>> wrote:
>>>>>>>> >>> >
>>>>>>>> >>> >                 Thanks for taking this up. I added some
>>>>>>>> comments to the
>>>>>>>> >>> >                 doc. A European-friendly time for discussion
>>>>>>>> would
>>>>>>>> >>> >                 be great.
>>>>>>>> >>> >
>>>>>>>> >>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>>>>>>>> >>> >                 <lcwik@google.com <ma...@google.com>>
>>>>>>>> wrote:
>>>>>>>> >>> >
>>>>>>>> >>> >                     I came up with a proposal[1] for a
>>>>>>>> progress model
>>>>>>>> >>> >                     solely based off of the backlog and that
>>>>>>>> splits
>>>>>>>> >>> >                     should be based upon the remaining
>>>>>>>> backlog we want
>>>>>>>> >>> >                     the SDK to split at. I also give
>>>>>>>> recommendations to
>>>>>>>> >>> >                     runner authors as to how an autoscaling
>>>>>>>> system could
>>>>>>>> >>> >                     work based upon the measured backlog. A
>>>>>>>> lot of
>>>>>>>> >>> >                     discussions around progress reporting and
>>>>>>>> splitting
>>>>>>>> >>> >                     in the past has always been around
>>>>>>>> finding an
>>>>>>>> >>> >                     optimal solution, after reading a lot of
>>>>>>>> information
>>>>>>>> >>> >                     about work stealing, I don't believe
>>>>>>>> there is a
>>>>>>>> >>> >                     general solution and it really is upto
>>>>>>>> >>> >                     SplittableDoFns to be well behaved. I did
>>>>>>>> not do
>>>>>>>> >>> >                     much work in classifying what a well
>>>>>>>> behaved
>>>>>>>> >>> >                     SplittableDoFn is though. Much of this
>>>>>>>> work builds
>>>>>>>> >>> >                     off ideas that Eugene had documented in
>>>>>>>> the past[2].
>>>>>>>> >>> >
>>>>>>>> >>> >                     I could use the communities wide
>>>>>>>> knowledge of
>>>>>>>> >>> >                     different I/Os to see if computing the
>>>>>>>> backlog is
>>>>>>>> >>> >                     practical in the way that I'm suggesting
>>>>>>>> and to
>>>>>>>> >>> >                     gather people's feedback.
>>>>>>>> >>> >
>>>>>>>> >>> >                     If there is a lot of interest, I would
>>>>>>>> like to hold
>>>>>>>> >>> >                     a community video conference between Sept
>>>>>>>> 10th and
>>>>>>>> >>> >                     14th about this topic. Please reply with
>>>>>>>> your
>>>>>>>> >>> >                     availability by Sept 6th if your
>>>>>>>> interested.
>>>>>>>> >>> >
>>>>>>>> >>> >                     1:
>>>>>>>> https://s.apache.org/beam-bundles-backlog-splitting
>>>>>>>> >>> >                     2:
>>>>>>>> https://s.apache.org/beam-breaking-fusion
>>>>>>>> >>> >
>>>>>>>> >>> >                     On Mon, Aug 13, 2018 at 10:21 AM
>>>>>>>> Jean-Baptiste
>>>>>>>> >>> >                     Onofré <jb@nanthrax.net <mailto:
>>>>>>>> jb@nanthrax.net>> wrote:
>>>>>>>> >>> >
>>>>>>>> >>> >                         Awesome !
>>>>>>>> >>> >
>>>>>>>> >>> >                         Thanks Luke !
>>>>>>>> >>> >
>>>>>>>> >>> >                         I plan to work with you and others on
>>>>>>>> this one.
>>>>>>>> >>> >
>>>>>>>> >>> >                         Regards
>>>>>>>> >>> >                         JB
>>>>>>>> >>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
>>>>>>>> >>> >                         <lcwik@google.com <mailto:
>>>>>>>> lcwik@google.com>> a
>>>>>>>> >>> >                         écrit:
>>>>>>>> >>> >
>>>>>>>> >>> >                             I wanted to reach out that I will
>>>>>>>> be
>>>>>>>> >>> >                             continuing from where Eugene left
>>>>>>>> off with
>>>>>>>> >>> >                             SplittableDoFn. I know that many
>>>>>>>> of you have
>>>>>>>> >>> >                             done a bunch of work with IOs
>>>>>>>> and/or runner
>>>>>>>> >>> >                             integration for SplittableDoFn
>>>>>>>> and would
>>>>>>>> >>> >                             appreciate your help in advancing
>>>>>>>> this
>>>>>>>> >>> >                             awesome idea. If you have
>>>>>>>> questions or
>>>>>>>> >>> >                             things you want to get reviewed
>>>>>>>> related to
>>>>>>>> >>> >                             SplittableDoFn, feel free to send
>>>>>>>> them my
>>>>>>>> >>> >                             way or include me on anything
>>>>>>>> SplittableDoFn
>>>>>>>> >>> >                             related.
>>>>>>>> >>> >
>>>>>>>> >>> >                             I was part of several discussions
>>>>>>>> with
>>>>>>>> >>> >                             Eugene and I think the biggest
>>>>>>>> outstanding
>>>>>>>> >>> >                             design portion is to figure out
>>>>>>>> how dynamic
>>>>>>>> >>> >                             work rebalancing would play out
>>>>>>>> with the
>>>>>>>> >>> >                             portability APIs. This includes
>>>>>>>> reporting of
>>>>>>>> >>> >                             progress from within a bundle. I
>>>>>>>> know that
>>>>>>>> >>> >                             Eugene had shared some documents
>>>>>>>> in this
>>>>>>>> >>> >                             regard but the position / split
>>>>>>>> models
>>>>>>>> >>> >                             didn't work too cleanly in a
>>>>>>>> unified sense
>>>>>>>> >>> >                             for bounded and unbounded
>>>>>>>> SplittableDoFns.
>>>>>>>> >>> >                             It will likely take me awhile to
>>>>>>>> gather my
>>>>>>>> >>> >                             thoughts but could use your
>>>>>>>> expertise as to
>>>>>>>> >>> >                             how compatible these ideas are
>>>>>>>> with respect
>>>>>>>> >>> >                             to to IOs and runners
>>>>>>>> >>> >
>>>>>>>>  Flink/Spark/Dataflow/Samza/Apex/... and
>>>>>>>> >>> >                             obviously help during
>>>>>>>> implementation.
>>>>>>>> >>> >
>>>>>>>>
>>>>>>>

Re: SplittableDoFn

Posted by Alex Van Boxel <al...@vanboxel.be>.
Don't want to crash the tech discussion here, but... I just gave a session
at the Beam Summit about Splittable DoFn's as a users perspective (from
things I could gather from the documentation and experimentation). Her is
the slides deck, maybe it could be useful:
https://docs.google.com/presentation/d/1dSc6oKh5pZItQPB_QiUyEoLT2TebMnj-pmdGipkVFPk/edit?usp=sharing
(quite
proud of the animations though ;-)

 _/
_/ Alex Van Boxel


On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik <lc...@google.com> wrote:

> Reuven, just inside the restriction tracker itself which is scoped per
> executing SplittableDoFn. A user could incorrectly write the
> synchronization since they are currently responsible for writing it though.
>
> On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax <re...@google.com> wrote:
>
>> is synchronization over an entire work item, or just inside restriction
>> tracker? my concern is that some runners (especially streaming runners)
>> might have hundreds or thousands of parallel work items being processed for
>> the same SDF (for different keys), and I'm afraid of creating
>> lock-contention bottlenecks.
>>
>> On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> The synchronization is related to Java thread safety since there is
>>> likely to be concurrent access needed to a restriction tracker to properly
>>> handle accessing the backlog and splitting concurrently from when the users
>>> DoFn is executing and updating the restriction tracker. This is similar to
>>> the Java thread safety needed in BoundedSource and UnboundedSource for
>>> fraction consumed, backlog bytes, and splitting.
>>>
>>> On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Can you give details on what the synchronization is per? Is it per key,
>>>> or global to each worker?
>>>>
>>>> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> As I was looking at the SplittableDoFn API while working towards
>>>>> making a proposal for how the backlog/splitting API could look, I found
>>>>> some sharp edges that could be improved.
>>>>>
>>>>> I noticed that:
>>>>> 1) We require users to write thread safe code, this is something that
>>>>> we haven't asked of users when writing a DoFn.
>>>>> 2) We "internal" methods within the RestrictionTracker that are not
>>>>> meant to be used by the runner.
>>>>>
>>>>> I can fix these issues by giving the user a forwarding restriction
>>>>> tracker[1] that provides an appropriate level of synchronization as needed
>>>>> and also provides the necessary observation hooks to see when a claim
>>>>> failed or succeeded.
>>>>>
>>>>> This requires a change to our experimental API since we need to pass
>>>>> a RestrictionTracker to the @ProcessElement method instead of a sub-type of
>>>>> RestrictionTracker.
>>>>> @ProcessElement
>>>>> processElement(ProcessContext context, OffsetRangeTracker tracker) {
>>>>> ... }
>>>>> becomes:
>>>>> @ProcessElement
>>>>> processElement(ProcessContext context, RestrictionTracker<OffsetRange,
>>>>> Long> tracker) { ... }
>>>>>
>>>>> This provides an additional benefit that it prevents users from
>>>>> working around the RestrictionTracker APIs and potentially making
>>>>> underlying changes to the tracker outside of the tryClaim call.
>>>>>
>>>>> Full implementation is available within this PR[2] and was wondering
>>>>> what people thought.
>>>>>
>>>>> 1:
>>>>> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
>>>>> 2: https://github.com/apache/beam/pull/6467
>>>>>
>>>>>
>>>>> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> The changes to the API have not been proposed yet. So far it has all
>>>>>> been about what is the representation and why.
>>>>>>
>>>>>> For splitting, the current idea has been about using the backlog as a
>>>>>> way of telling the SplittableDoFn where to split, so it would be in terms
>>>>>> of whatever the SDK decided to report.
>>>>>> The runner always chooses a number for backlog that is relative to
>>>>>> the SDKs reported backlog. It would be upto the SDK to round/clamp the
>>>>>> number given by the Runner to represent something meaningful for itself.
>>>>>> For example if the backlog that the SDK was reporting was bytes
>>>>>> remaining in a file such as 500, then the Runner could provide some value
>>>>>> like 212.2 which the SDK would then round to 212.
>>>>>> If the backlog that the SDK was reporting 57 pubsub messages, then
>>>>>> the Runner could provide a value like 300 which would mean to read 57
>>>>>> values and then another 243 as part of the current restriction.
>>>>>>
>>>>>> I believe that BoundedSource/UnboundedSource will have wrappers added
>>>>>> that provide a basic SplittableDoFn implementation so existing IOs should
>>>>>> be migrated over without API changes.
>>>>>>
>>>>>> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <ie...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks a lot Luke for bringing this back to the mailing list and
>>>>>>> Ryan for taking
>>>>>>> the notes.
>>>>>>>
>>>>>>> I would like to know if there was some discussion, or if you guys
>>>>>>> have given
>>>>>>> some thought to the required changes in the SDK (API) part. What
>>>>>>> will be the
>>>>>>> equivalent of `splitAtFraction` and what should IO authors do to
>>>>>>> support it..
>>>>>>>
>>>>>>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > Thanks to everyone who joined and for the questions asked.
>>>>>>> >
>>>>>>> > Ryan graciously collected notes of the discussion:
>>>>>>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
>>>>>>> >
>>>>>>> > The summary was that bringing BoundedSource/UnboundedSource into
>>>>>>> using a unified backlog-reporting mechanism with optional other signals
>>>>>>> that Dataflow has found useful (such as is the remaining restriction
>>>>>>> splittable (yes, no, unknown)). Other runners can use or not. SDFs should
>>>>>>> report backlog and watermark as minimum bar. The backlog should use an
>>>>>>> arbitrary precision float such as Java BigDecimal to prevent issues where
>>>>>>> limited precision removes the ability to compute delta efficiently.
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>> >>
>>>>>>> >> Here is the link to join the discussion:
>>>>>>> https://meet.google.com/idc-japs-hwf
>>>>>>> >> Remember that it is this Friday Sept 14th from 11am-noon PST.
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <
>>>>>>> mxm@apache.org> wrote:
>>>>>>> >>>
>>>>>>> >>> Thanks for moving forward with this, Lukasz!
>>>>>>> >>>
>>>>>>> >>> Unfortunately, can't make it on Friday but I'll sync with
>>>>>>> somebody on
>>>>>>> >>> the call (e.g. Ryan) about your discussion.
>>>>>>> >>>
>>>>>>> >>> On 08.09.18 02:00, Lukasz Cwik wrote:
>>>>>>> >>> > Thanks for everyone who wanted to fill out the doodle poll.
>>>>>>> The most
>>>>>>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll
>>>>>>> send out a
>>>>>>> >>> > calendar invite and meeting link early next week.
>>>>>>> >>> >
>>>>>>> >>> > I have received a lot of feedback on the document and have
>>>>>>> addressed
>>>>>>> >>> > some parts of it including:
>>>>>>> >>> > * clarifying terminology
>>>>>>> >>> > * processing skew due to some restrictions having their
>>>>>>> watermarks much
>>>>>>> >>> > further behind then others affecting scheduling of bundles by
>>>>>>> runners
>>>>>>> >>> > * external throttling & I/O wait overhead reporting to make
>>>>>>> sure we
>>>>>>> >>> > don't overscale
>>>>>>> >>> >
>>>>>>> >>> > Areas that still need additional feedback and details are:
>>>>>>> >>> > * reporting progress around the work that is done and is active
>>>>>>> >>> > * more examples
>>>>>>> >>> > * unbounded restrictions being caused by an unbounded number
>>>>>>> of splits
>>>>>>> >>> > of existing unbounded restrictions (infinite work growth)
>>>>>>> >>> > * whether we should be reporting this information at the
>>>>>>> PTransform
>>>>>>> >>> > level or at the bundle level
>>>>>>> >>> >
>>>>>>> >>> >
>>>>>>> >>> >
>>>>>>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lcwik@google.com
>>>>>>> >>> > <ma...@google.com>> wrote:
>>>>>>> >>> >
>>>>>>> >>> >     Thanks to all those who have provided interest in this
>>>>>>> topic by the
>>>>>>> >>> >     questions they have asked on the doc already and for those
>>>>>>> >>> >     interested in having this discussion. I have setup this
>>>>>>> doodle to
>>>>>>> >>> >     allow people to provide their availability:
>>>>>>> >>> >     https://doodle.com/poll/nrw7w84255xnfwqy
>>>>>>> >>> >
>>>>>>> >>> >     I'll send out the chosen time based upon peoples
>>>>>>> availability and a
>>>>>>> >>> >     Hangout link by end of day Friday so please mark your
>>>>>>> availability
>>>>>>> >>> >     using the link above.
>>>>>>> >>> >
>>>>>>> >>> >     The agenda of the meeting will be as follows:
>>>>>>> >>> >     * Overview of the proposal
>>>>>>> >>> >     * Enumerate and discuss/answer questions brought up in the
>>>>>>> meeting
>>>>>>> >>> >
>>>>>>> >>> >     Note that all questions and any discussions/answers
>>>>>>> provided will be
>>>>>>> >>> >     added to the doc for those who are unable to attend.
>>>>>>> >>> >
>>>>>>> >>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>>>>>>> >>> >     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>>>>>>> >>> >
>>>>>>> >>> >         +1
>>>>>>> >>> >
>>>>>>> >>> >         Regards
>>>>>>> >>> >         JB
>>>>>>> >>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <
>>>>>>> lcwik@google.com
>>>>>>> >>> >         <ma...@google.com>> a écrit:
>>>>>>> >>> >
>>>>>>> >>> >             That is possible, I'll take people's date/time
>>>>>>> suggestions
>>>>>>> >>> >             and create a simple online poll with them.
>>>>>>> >>> >
>>>>>>> >>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>>>>>>> >>> >             <robertwb@google.com <ma...@google.com>>
>>>>>>> wrote:
>>>>>>> >>> >
>>>>>>> >>> >                 Thanks for taking this up. I added some
>>>>>>> comments to the
>>>>>>> >>> >                 doc. A European-friendly time for discussion
>>>>>>> would
>>>>>>> >>> >                 be great.
>>>>>>> >>> >
>>>>>>> >>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>>>>>>> >>> >                 <lcwik@google.com <ma...@google.com>>
>>>>>>> wrote:
>>>>>>> >>> >
>>>>>>> >>> >                     I came up with a proposal[1] for a
>>>>>>> progress model
>>>>>>> >>> >                     solely based off of the backlog and that
>>>>>>> splits
>>>>>>> >>> >                     should be based upon the remaining backlog
>>>>>>> we want
>>>>>>> >>> >                     the SDK to split at. I also give
>>>>>>> recommendations to
>>>>>>> >>> >                     runner authors as to how an autoscaling
>>>>>>> system could
>>>>>>> >>> >                     work based upon the measured backlog. A
>>>>>>> lot of
>>>>>>> >>> >                     discussions around progress reporting and
>>>>>>> splitting
>>>>>>> >>> >                     in the past has always been around finding
>>>>>>> an
>>>>>>> >>> >                     optimal solution, after reading a lot of
>>>>>>> information
>>>>>>> >>> >                     about work stealing, I don't believe there
>>>>>>> is a
>>>>>>> >>> >                     general solution and it really is upto
>>>>>>> >>> >                     SplittableDoFns to be well behaved. I did
>>>>>>> not do
>>>>>>> >>> >                     much work in classifying what a well
>>>>>>> behaved
>>>>>>> >>> >                     SplittableDoFn is though. Much of this
>>>>>>> work builds
>>>>>>> >>> >                     off ideas that Eugene had documented in
>>>>>>> the past[2].
>>>>>>> >>> >
>>>>>>> >>> >                     I could use the communities wide knowledge
>>>>>>> of
>>>>>>> >>> >                     different I/Os to see if computing the
>>>>>>> backlog is
>>>>>>> >>> >                     practical in the way that I'm suggesting
>>>>>>> and to
>>>>>>> >>> >                     gather people's feedback.
>>>>>>> >>> >
>>>>>>> >>> >                     If there is a lot of interest, I would
>>>>>>> like to hold
>>>>>>> >>> >                     a community video conference between Sept
>>>>>>> 10th and
>>>>>>> >>> >                     14th about this topic. Please reply with
>>>>>>> your
>>>>>>> >>> >                     availability by Sept 6th if your
>>>>>>> interested.
>>>>>>> >>> >
>>>>>>> >>> >                     1:
>>>>>>> https://s.apache.org/beam-bundles-backlog-splitting
>>>>>>> >>> >                     2:
>>>>>>> https://s.apache.org/beam-breaking-fusion
>>>>>>> >>> >
>>>>>>> >>> >                     On Mon, Aug 13, 2018 at 10:21 AM
>>>>>>> Jean-Baptiste
>>>>>>> >>> >                     Onofré <jb@nanthrax.net <mailto:
>>>>>>> jb@nanthrax.net>> wrote:
>>>>>>> >>> >
>>>>>>> >>> >                         Awesome !
>>>>>>> >>> >
>>>>>>> >>> >                         Thanks Luke !
>>>>>>> >>> >
>>>>>>> >>> >                         I plan to work with you and others on
>>>>>>> this one.
>>>>>>> >>> >
>>>>>>> >>> >                         Regards
>>>>>>> >>> >                         JB
>>>>>>> >>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
>>>>>>> >>> >                         <lcwik@google.com <mailto:
>>>>>>> lcwik@google.com>> a
>>>>>>> >>> >                         écrit:
>>>>>>> >>> >
>>>>>>> >>> >                             I wanted to reach out that I will
>>>>>>> be
>>>>>>> >>> >                             continuing from where Eugene left
>>>>>>> off with
>>>>>>> >>> >                             SplittableDoFn. I know that many
>>>>>>> of you have
>>>>>>> >>> >                             done a bunch of work with IOs
>>>>>>> and/or runner
>>>>>>> >>> >                             integration for SplittableDoFn and
>>>>>>> would
>>>>>>> >>> >                             appreciate your help in advancing
>>>>>>> this
>>>>>>> >>> >                             awesome idea. If you have
>>>>>>> questions or
>>>>>>> >>> >                             things you want to get reviewed
>>>>>>> related to
>>>>>>> >>> >                             SplittableDoFn, feel free to send
>>>>>>> them my
>>>>>>> >>> >                             way or include me on anything
>>>>>>> SplittableDoFn
>>>>>>> >>> >                             related.
>>>>>>> >>> >
>>>>>>> >>> >                             I was part of several discussions
>>>>>>> with
>>>>>>> >>> >                             Eugene and I think the biggest
>>>>>>> outstanding
>>>>>>> >>> >                             design portion is to figure out
>>>>>>> how dynamic
>>>>>>> >>> >                             work rebalancing would play out
>>>>>>> with the
>>>>>>> >>> >                             portability APIs. This includes
>>>>>>> reporting of
>>>>>>> >>> >                             progress from within a bundle. I
>>>>>>> know that
>>>>>>> >>> >                             Eugene had shared some documents
>>>>>>> in this
>>>>>>> >>> >                             regard but the position / split
>>>>>>> models
>>>>>>> >>> >                             didn't work too cleanly in a
>>>>>>> unified sense
>>>>>>> >>> >                             for bounded and unbounded
>>>>>>> SplittableDoFns.
>>>>>>> >>> >                             It will likely take me awhile to
>>>>>>> gather my
>>>>>>> >>> >                             thoughts but could use your
>>>>>>> expertise as to
>>>>>>> >>> >                             how compatible these ideas are
>>>>>>> with respect
>>>>>>> >>> >                             to to IOs and runners
>>>>>>> >>> >
>>>>>>>  Flink/Spark/Dataflow/Samza/Apex/... and
>>>>>>> >>> >                             obviously help during
>>>>>>> implementation.
>>>>>>> >>> >
>>>>>>>
>>>>>>

Re: SplittableDoFn

Posted by Lukasz Cwik <lc...@google.com>.
Reuven, just inside the restriction tracker itself which is scoped per
executing SplittableDoFn. A user could incorrectly write the
synchronization since they are currently responsible for writing it though.

On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax <re...@google.com> wrote:

> is synchronization over an entire work item, or just inside restriction
> tracker? my concern is that some runners (especially streaming runners)
> might have hundreds or thousands of parallel work items being processed for
> the same SDF (for different keys), and I'm afraid of creating
> lock-contention bottlenecks.
>
> On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> The synchronization is related to Java thread safety since there is
>> likely to be concurrent access needed to a restriction tracker to properly
>> handle accessing the backlog and splitting concurrently from when the users
>> DoFn is executing and updating the restriction tracker. This is similar to
>> the Java thread safety needed in BoundedSource and UnboundedSource for
>> fraction consumed, backlog bytes, and splitting.
>>
>> On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Can you give details on what the synchronization is per? Is it per key,
>>> or global to each worker?
>>>
>>> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> As I was looking at the SplittableDoFn API while working towards making
>>>> a proposal for how the backlog/splitting API could look, I found some sharp
>>>> edges that could be improved.
>>>>
>>>> I noticed that:
>>>> 1) We require users to write thread safe code, this is something that
>>>> we haven't asked of users when writing a DoFn.
>>>> 2) We "internal" methods within the RestrictionTracker that are not
>>>> meant to be used by the runner.
>>>>
>>>> I can fix these issues by giving the user a forwarding restriction
>>>> tracker[1] that provides an appropriate level of synchronization as needed
>>>> and also provides the necessary observation hooks to see when a claim
>>>> failed or succeeded.
>>>>
>>>> This requires a change to our experimental API since we need to pass
>>>> a RestrictionTracker to the @ProcessElement method instead of a sub-type of
>>>> RestrictionTracker.
>>>> @ProcessElement
>>>> processElement(ProcessContext context, OffsetRangeTracker tracker) {
>>>> ... }
>>>> becomes:
>>>> @ProcessElement
>>>> processElement(ProcessContext context, RestrictionTracker<OffsetRange,
>>>> Long> tracker) { ... }
>>>>
>>>> This provides an additional benefit that it prevents users from working
>>>> around the RestrictionTracker APIs and potentially making underlying
>>>> changes to the tracker outside of the tryClaim call.
>>>>
>>>> Full implementation is available within this PR[2] and was wondering
>>>> what people thought.
>>>>
>>>> 1:
>>>> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
>>>> 2: https://github.com/apache/beam/pull/6467
>>>>
>>>>
>>>> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> The changes to the API have not been proposed yet. So far it has all
>>>>> been about what is the representation and why.
>>>>>
>>>>> For splitting, the current idea has been about using the backlog as a
>>>>> way of telling the SplittableDoFn where to split, so it would be in terms
>>>>> of whatever the SDK decided to report.
>>>>> The runner always chooses a number for backlog that is relative to the
>>>>> SDKs reported backlog. It would be upto the SDK to round/clamp the number
>>>>> given by the Runner to represent something meaningful for itself.
>>>>> For example if the backlog that the SDK was reporting was bytes
>>>>> remaining in a file such as 500, then the Runner could provide some value
>>>>> like 212.2 which the SDK would then round to 212.
>>>>> If the backlog that the SDK was reporting 57 pubsub messages, then the
>>>>> Runner could provide a value like 300 which would mean to read 57 values
>>>>> and then another 243 as part of the current restriction.
>>>>>
>>>>> I believe that BoundedSource/UnboundedSource will have wrappers added
>>>>> that provide a basic SplittableDoFn implementation so existing IOs should
>>>>> be migrated over without API changes.
>>>>>
>>>>> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <ie...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks a lot Luke for bringing this back to the mailing list and Ryan
>>>>>> for taking
>>>>>> the notes.
>>>>>>
>>>>>> I would like to know if there was some discussion, or if you guys
>>>>>> have given
>>>>>> some thought to the required changes in the SDK (API) part. What will
>>>>>> be the
>>>>>> equivalent of `splitAtFraction` and what should IO authors do to
>>>>>> support it..
>>>>>>
>>>>>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>>> >
>>>>>> > Thanks to everyone who joined and for the questions asked.
>>>>>> >
>>>>>> > Ryan graciously collected notes of the discussion:
>>>>>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
>>>>>> >
>>>>>> > The summary was that bringing BoundedSource/UnboundedSource into
>>>>>> using a unified backlog-reporting mechanism with optional other signals
>>>>>> that Dataflow has found useful (such as is the remaining restriction
>>>>>> splittable (yes, no, unknown)). Other runners can use or not. SDFs should
>>>>>> report backlog and watermark as minimum bar. The backlog should use an
>>>>>> arbitrary precision float such as Java BigDecimal to prevent issues where
>>>>>> limited precision removes the ability to compute delta efficiently.
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> Here is the link to join the discussion:
>>>>>> https://meet.google.com/idc-japs-hwf
>>>>>> >> Remember that it is this Friday Sept 14th from 11am-noon PST.
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <mx...@apache.org>
>>>>>> wrote:
>>>>>> >>>
>>>>>> >>> Thanks for moving forward with this, Lukasz!
>>>>>> >>>
>>>>>> >>> Unfortunately, can't make it on Friday but I'll sync with
>>>>>> somebody on
>>>>>> >>> the call (e.g. Ryan) about your discussion.
>>>>>> >>>
>>>>>> >>> On 08.09.18 02:00, Lukasz Cwik wrote:
>>>>>> >>> > Thanks for everyone who wanted to fill out the doodle poll. The
>>>>>> most
>>>>>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll send
>>>>>> out a
>>>>>> >>> > calendar invite and meeting link early next week.
>>>>>> >>> >
>>>>>> >>> > I have received a lot of feedback on the document and have
>>>>>> addressed
>>>>>> >>> > some parts of it including:
>>>>>> >>> > * clarifying terminology
>>>>>> >>> > * processing skew due to some restrictions having their
>>>>>> watermarks much
>>>>>> >>> > further behind then others affecting scheduling of bundles by
>>>>>> runners
>>>>>> >>> > * external throttling & I/O wait overhead reporting to make
>>>>>> sure we
>>>>>> >>> > don't overscale
>>>>>> >>> >
>>>>>> >>> > Areas that still need additional feedback and details are:
>>>>>> >>> > * reporting progress around the work that is done and is active
>>>>>> >>> > * more examples
>>>>>> >>> > * unbounded restrictions being caused by an unbounded number of
>>>>>> splits
>>>>>> >>> > of existing unbounded restrictions (infinite work growth)
>>>>>> >>> > * whether we should be reporting this information at the
>>>>>> PTransform
>>>>>> >>> > level or at the bundle level
>>>>>> >>> >
>>>>>> >>> >
>>>>>> >>> >
>>>>>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lcwik@google.com
>>>>>> >>> > <ma...@google.com>> wrote:
>>>>>> >>> >
>>>>>> >>> >     Thanks to all those who have provided interest in this
>>>>>> topic by the
>>>>>> >>> >     questions they have asked on the doc already and for those
>>>>>> >>> >     interested in having this discussion. I have setup this
>>>>>> doodle to
>>>>>> >>> >     allow people to provide their availability:
>>>>>> >>> >     https://doodle.com/poll/nrw7w84255xnfwqy
>>>>>> >>> >
>>>>>> >>> >     I'll send out the chosen time based upon peoples
>>>>>> availability and a
>>>>>> >>> >     Hangout link by end of day Friday so please mark your
>>>>>> availability
>>>>>> >>> >     using the link above.
>>>>>> >>> >
>>>>>> >>> >     The agenda of the meeting will be as follows:
>>>>>> >>> >     * Overview of the proposal
>>>>>> >>> >     * Enumerate and discuss/answer questions brought up in the
>>>>>> meeting
>>>>>> >>> >
>>>>>> >>> >     Note that all questions and any discussions/answers
>>>>>> provided will be
>>>>>> >>> >     added to the doc for those who are unable to attend.
>>>>>> >>> >
>>>>>> >>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>>>>>> >>> >     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>>>>>> >>> >
>>>>>> >>> >         +1
>>>>>> >>> >
>>>>>> >>> >         Regards
>>>>>> >>> >         JB
>>>>>> >>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <lcwik@google.com
>>>>>> >>> >         <ma...@google.com>> a écrit:
>>>>>> >>> >
>>>>>> >>> >             That is possible, I'll take people's date/time
>>>>>> suggestions
>>>>>> >>> >             and create a simple online poll with them.
>>>>>> >>> >
>>>>>> >>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>>>>>> >>> >             <robertwb@google.com <ma...@google.com>>
>>>>>> wrote:
>>>>>> >>> >
>>>>>> >>> >                 Thanks for taking this up. I added some
>>>>>> comments to the
>>>>>> >>> >                 doc. A European-friendly time for discussion
>>>>>> would
>>>>>> >>> >                 be great.
>>>>>> >>> >
>>>>>> >>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>>>>>> >>> >                 <lcwik@google.com <ma...@google.com>>
>>>>>> wrote:
>>>>>> >>> >
>>>>>> >>> >                     I came up with a proposal[1] for a progress
>>>>>> model
>>>>>> >>> >                     solely based off of the backlog and that
>>>>>> splits
>>>>>> >>> >                     should be based upon the remaining backlog
>>>>>> we want
>>>>>> >>> >                     the SDK to split at. I also give
>>>>>> recommendations to
>>>>>> >>> >                     runner authors as to how an autoscaling
>>>>>> system could
>>>>>> >>> >                     work based upon the measured backlog. A lot
>>>>>> of
>>>>>> >>> >                     discussions around progress reporting and
>>>>>> splitting
>>>>>> >>> >                     in the past has always been around finding
>>>>>> an
>>>>>> >>> >                     optimal solution, after reading a lot of
>>>>>> information
>>>>>> >>> >                     about work stealing, I don't believe there
>>>>>> is a
>>>>>> >>> >                     general solution and it really is upto
>>>>>> >>> >                     SplittableDoFns to be well behaved. I did
>>>>>> not do
>>>>>> >>> >                     much work in classifying what a well behaved
>>>>>> >>> >                     SplittableDoFn is though. Much of this work
>>>>>> builds
>>>>>> >>> >                     off ideas that Eugene had documented in the
>>>>>> past[2].
>>>>>> >>> >
>>>>>> >>> >                     I could use the communities wide knowledge
>>>>>> of
>>>>>> >>> >                     different I/Os to see if computing the
>>>>>> backlog is
>>>>>> >>> >                     practical in the way that I'm suggesting
>>>>>> and to
>>>>>> >>> >                     gather people's feedback.
>>>>>> >>> >
>>>>>> >>> >                     If there is a lot of interest, I would like
>>>>>> to hold
>>>>>> >>> >                     a community video conference between Sept
>>>>>> 10th and
>>>>>> >>> >                     14th about this topic. Please reply with
>>>>>> your
>>>>>> >>> >                     availability by Sept 6th if your interested.
>>>>>> >>> >
>>>>>> >>> >                     1:
>>>>>> https://s.apache.org/beam-bundles-backlog-splitting
>>>>>> >>> >                     2:
>>>>>> https://s.apache.org/beam-breaking-fusion
>>>>>> >>> >
>>>>>> >>> >                     On Mon, Aug 13, 2018 at 10:21 AM
>>>>>> Jean-Baptiste
>>>>>> >>> >                     Onofré <jb@nanthrax.net <mailto:
>>>>>> jb@nanthrax.net>> wrote:
>>>>>> >>> >
>>>>>> >>> >                         Awesome !
>>>>>> >>> >
>>>>>> >>> >                         Thanks Luke !
>>>>>> >>> >
>>>>>> >>> >                         I plan to work with you and others on
>>>>>> this one.
>>>>>> >>> >
>>>>>> >>> >                         Regards
>>>>>> >>> >                         JB
>>>>>> >>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
>>>>>> >>> >                         <lcwik@google.com <mailto:
>>>>>> lcwik@google.com>> a
>>>>>> >>> >                         écrit:
>>>>>> >>> >
>>>>>> >>> >                             I wanted to reach out that I will be
>>>>>> >>> >                             continuing from where Eugene left
>>>>>> off with
>>>>>> >>> >                             SplittableDoFn. I know that many of
>>>>>> you have
>>>>>> >>> >                             done a bunch of work with IOs
>>>>>> and/or runner
>>>>>> >>> >                             integration for SplittableDoFn and
>>>>>> would
>>>>>> >>> >                             appreciate your help in advancing
>>>>>> this
>>>>>> >>> >                             awesome idea. If you have questions
>>>>>> or
>>>>>> >>> >                             things you want to get reviewed
>>>>>> related to
>>>>>> >>> >                             SplittableDoFn, feel free to send
>>>>>> them my
>>>>>> >>> >                             way or include me on anything
>>>>>> SplittableDoFn
>>>>>> >>> >                             related.
>>>>>> >>> >
>>>>>> >>> >                             I was part of several discussions
>>>>>> with
>>>>>> >>> >                             Eugene and I think the biggest
>>>>>> outstanding
>>>>>> >>> >                             design portion is to figure out how
>>>>>> dynamic
>>>>>> >>> >                             work rebalancing would play out
>>>>>> with the
>>>>>> >>> >                             portability APIs. This includes
>>>>>> reporting of
>>>>>> >>> >                             progress from within a bundle. I
>>>>>> know that
>>>>>> >>> >                             Eugene had shared some documents in
>>>>>> this
>>>>>> >>> >                             regard but the position / split
>>>>>> models
>>>>>> >>> >                             didn't work too cleanly in a
>>>>>> unified sense
>>>>>> >>> >                             for bounded and unbounded
>>>>>> SplittableDoFns.
>>>>>> >>> >                             It will likely take me awhile to
>>>>>> gather my
>>>>>> >>> >                             thoughts but could use your
>>>>>> expertise as to
>>>>>> >>> >                             how compatible these ideas are with
>>>>>> respect
>>>>>> >>> >                             to to IOs and runners
>>>>>> >>> >                             Flink/Spark/Dataflow/Samza/Apex/...
>>>>>> and
>>>>>> >>> >                             obviously help during
>>>>>> implementation.
>>>>>> >>> >
>>>>>>
>>>>>

Re: SplittableDoFn

Posted by Reuven Lax <re...@google.com>.
is synchronization over an entire work item, or just inside restriction
tracker? my concern is that some runners (especially streaming runners)
might have hundreds or thousands of parallel work items being processed for
the same SDF (for different keys), and I'm afraid of creating
lock-contention bottlenecks.

On Fri, Sep 21, 2018 at 3:42 PM Lukasz Cwik <lc...@google.com> wrote:

> The synchronization is related to Java thread safety since there is likely
> to be concurrent access needed to a restriction tracker to properly handle
> accessing the backlog and splitting concurrently from when the users DoFn
> is executing and updating the restriction tracker. This is similar to the
> Java thread safety needed in BoundedSource and UnboundedSource for fraction
> consumed, backlog bytes, and splitting.
>
> On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax <re...@google.com> wrote:
>
>> Can you give details on what the synchronization is per? Is it per key,
>> or global to each worker?
>>
>> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> As I was looking at the SplittableDoFn API while working towards making
>>> a proposal for how the backlog/splitting API could look, I found some sharp
>>> edges that could be improved.
>>>
>>> I noticed that:
>>> 1) We require users to write thread safe code, this is something that we
>>> haven't asked of users when writing a DoFn.
>>> 2) We "internal" methods within the RestrictionTracker that are not
>>> meant to be used by the runner.
>>>
>>> I can fix these issues by giving the user a forwarding restriction
>>> tracker[1] that provides an appropriate level of synchronization as needed
>>> and also provides the necessary observation hooks to see when a claim
>>> failed or succeeded.
>>>
>>> This requires a change to our experimental API since we need to pass
>>> a RestrictionTracker to the @ProcessElement method instead of a sub-type of
>>> RestrictionTracker.
>>> @ProcessElement
>>> processElement(ProcessContext context, OffsetRangeTracker tracker) { ...
>>> }
>>> becomes:
>>> @ProcessElement
>>> processElement(ProcessContext context, RestrictionTracker<OffsetRange,
>>> Long> tracker) { ... }
>>>
>>> This provides an additional benefit that it prevents users from working
>>> around the RestrictionTracker APIs and potentially making underlying
>>> changes to the tracker outside of the tryClaim call.
>>>
>>> Full implementation is available within this PR[2] and was wondering
>>> what people thought.
>>>
>>> 1:
>>> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
>>> 2: https://github.com/apache/beam/pull/6467
>>>
>>>
>>> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> The changes to the API have not been proposed yet. So far it has all
>>>> been about what is the representation and why.
>>>>
>>>> For splitting, the current idea has been about using the backlog as a
>>>> way of telling the SplittableDoFn where to split, so it would be in terms
>>>> of whatever the SDK decided to report.
>>>> The runner always chooses a number for backlog that is relative to the
>>>> SDKs reported backlog. It would be upto the SDK to round/clamp the number
>>>> given by the Runner to represent something meaningful for itself.
>>>> For example if the backlog that the SDK was reporting was bytes
>>>> remaining in a file such as 500, then the Runner could provide some value
>>>> like 212.2 which the SDK would then round to 212.
>>>> If the backlog that the SDK was reporting 57 pubsub messages, then the
>>>> Runner could provide a value like 300 which would mean to read 57 values
>>>> and then another 243 as part of the current restriction.
>>>>
>>>> I believe that BoundedSource/UnboundedSource will have wrappers added
>>>> that provide a basic SplittableDoFn implementation so existing IOs should
>>>> be migrated over without API changes.
>>>>
>>>> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>>>
>>>>> Thanks a lot Luke for bringing this back to the mailing list and Ryan
>>>>> for taking
>>>>> the notes.
>>>>>
>>>>> I would like to know if there was some discussion, or if you guys have
>>>>> given
>>>>> some thought to the required changes in the SDK (API) part. What will
>>>>> be the
>>>>> equivalent of `splitAtFraction` and what should IO authors do to
>>>>> support it..
>>>>>
>>>>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>> >
>>>>> > Thanks to everyone who joined and for the questions asked.
>>>>> >
>>>>> > Ryan graciously collected notes of the discussion:
>>>>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
>>>>> >
>>>>> > The summary was that bringing BoundedSource/UnboundedSource into
>>>>> using a unified backlog-reporting mechanism with optional other signals
>>>>> that Dataflow has found useful (such as is the remaining restriction
>>>>> splittable (yes, no, unknown)). Other runners can use or not. SDFs should
>>>>> report backlog and watermark as minimum bar. The backlog should use an
>>>>> arbitrary precision float such as Java BigDecimal to prevent issues where
>>>>> limited precision removes the ability to compute delta efficiently.
>>>>> >
>>>>> >
>>>>> >
>>>>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <lc...@google.com>
>>>>> wrote:
>>>>> >>
>>>>> >> Here is the link to join the discussion:
>>>>> https://meet.google.com/idc-japs-hwf
>>>>> >> Remember that it is this Friday Sept 14th from 11am-noon PST.
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <mx...@apache.org>
>>>>> wrote:
>>>>> >>>
>>>>> >>> Thanks for moving forward with this, Lukasz!
>>>>> >>>
>>>>> >>> Unfortunately, can't make it on Friday but I'll sync with somebody
>>>>> on
>>>>> >>> the call (e.g. Ryan) about your discussion.
>>>>> >>>
>>>>> >>> On 08.09.18 02:00, Lukasz Cwik wrote:
>>>>> >>> > Thanks for everyone who wanted to fill out the doodle poll. The
>>>>> most
>>>>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll send
>>>>> out a
>>>>> >>> > calendar invite and meeting link early next week.
>>>>> >>> >
>>>>> >>> > I have received a lot of feedback on the document and have
>>>>> addressed
>>>>> >>> > some parts of it including:
>>>>> >>> > * clarifying terminology
>>>>> >>> > * processing skew due to some restrictions having their
>>>>> watermarks much
>>>>> >>> > further behind then others affecting scheduling of bundles by
>>>>> runners
>>>>> >>> > * external throttling & I/O wait overhead reporting to make sure
>>>>> we
>>>>> >>> > don't overscale
>>>>> >>> >
>>>>> >>> > Areas that still need additional feedback and details are:
>>>>> >>> > * reporting progress around the work that is done and is active
>>>>> >>> > * more examples
>>>>> >>> > * unbounded restrictions being caused by an unbounded number of
>>>>> splits
>>>>> >>> > of existing unbounded restrictions (infinite work growth)
>>>>> >>> > * whether we should be reporting this information at the
>>>>> PTransform
>>>>> >>> > level or at the bundle level
>>>>> >>> >
>>>>> >>> >
>>>>> >>> >
>>>>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lcwik@google.com
>>>>> >>> > <ma...@google.com>> wrote:
>>>>> >>> >
>>>>> >>> >     Thanks to all those who have provided interest in this topic
>>>>> by the
>>>>> >>> >     questions they have asked on the doc already and for those
>>>>> >>> >     interested in having this discussion. I have setup this
>>>>> doodle to
>>>>> >>> >     allow people to provide their availability:
>>>>> >>> >     https://doodle.com/poll/nrw7w84255xnfwqy
>>>>> >>> >
>>>>> >>> >     I'll send out the chosen time based upon peoples
>>>>> availability and a
>>>>> >>> >     Hangout link by end of day Friday so please mark your
>>>>> availability
>>>>> >>> >     using the link above.
>>>>> >>> >
>>>>> >>> >     The agenda of the meeting will be as follows:
>>>>> >>> >     * Overview of the proposal
>>>>> >>> >     * Enumerate and discuss/answer questions brought up in the
>>>>> meeting
>>>>> >>> >
>>>>> >>> >     Note that all questions and any discussions/answers provided
>>>>> will be
>>>>> >>> >     added to the doc for those who are unable to attend.
>>>>> >>> >
>>>>> >>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>>>>> >>> >     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>>>>> >>> >
>>>>> >>> >         +1
>>>>> >>> >
>>>>> >>> >         Regards
>>>>> >>> >         JB
>>>>> >>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <lcwik@google.com
>>>>> >>> >         <ma...@google.com>> a écrit:
>>>>> >>> >
>>>>> >>> >             That is possible, I'll take people's date/time
>>>>> suggestions
>>>>> >>> >             and create a simple online poll with them.
>>>>> >>> >
>>>>> >>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>>>>> >>> >             <robertwb@google.com <ma...@google.com>>
>>>>> wrote:
>>>>> >>> >
>>>>> >>> >                 Thanks for taking this up. I added some comments
>>>>> to the
>>>>> >>> >                 doc. A European-friendly time for discussion
>>>>> would
>>>>> >>> >                 be great.
>>>>> >>> >
>>>>> >>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>>>>> >>> >                 <lcwik@google.com <ma...@google.com>>
>>>>> wrote:
>>>>> >>> >
>>>>> >>> >                     I came up with a proposal[1] for a progress
>>>>> model
>>>>> >>> >                     solely based off of the backlog and that
>>>>> splits
>>>>> >>> >                     should be based upon the remaining backlog
>>>>> we want
>>>>> >>> >                     the SDK to split at. I also give
>>>>> recommendations to
>>>>> >>> >                     runner authors as to how an autoscaling
>>>>> system could
>>>>> >>> >                     work based upon the measured backlog. A lot
>>>>> of
>>>>> >>> >                     discussions around progress reporting and
>>>>> splitting
>>>>> >>> >                     in the past has always been around finding an
>>>>> >>> >                     optimal solution, after reading a lot of
>>>>> information
>>>>> >>> >                     about work stealing, I don't believe there
>>>>> is a
>>>>> >>> >                     general solution and it really is upto
>>>>> >>> >                     SplittableDoFns to be well behaved. I did
>>>>> not do
>>>>> >>> >                     much work in classifying what a well behaved
>>>>> >>> >                     SplittableDoFn is though. Much of this work
>>>>> builds
>>>>> >>> >                     off ideas that Eugene had documented in the
>>>>> past[2].
>>>>> >>> >
>>>>> >>> >                     I could use the communities wide knowledge of
>>>>> >>> >                     different I/Os to see if computing the
>>>>> backlog is
>>>>> >>> >                     practical in the way that I'm suggesting and
>>>>> to
>>>>> >>> >                     gather people's feedback.
>>>>> >>> >
>>>>> >>> >                     If there is a lot of interest, I would like
>>>>> to hold
>>>>> >>> >                     a community video conference between Sept
>>>>> 10th and
>>>>> >>> >                     14th about this topic. Please reply with your
>>>>> >>> >                     availability by Sept 6th if your interested.
>>>>> >>> >
>>>>> >>> >                     1:
>>>>> https://s.apache.org/beam-bundles-backlog-splitting
>>>>> >>> >                     2: https://s.apache.org/beam-breaking-fusion
>>>>> >>> >
>>>>> >>> >                     On Mon, Aug 13, 2018 at 10:21 AM
>>>>> Jean-Baptiste
>>>>> >>> >                     Onofré <jb@nanthrax.net <mailto:
>>>>> jb@nanthrax.net>> wrote:
>>>>> >>> >
>>>>> >>> >                         Awesome !
>>>>> >>> >
>>>>> >>> >                         Thanks Luke !
>>>>> >>> >
>>>>> >>> >                         I plan to work with you and others on
>>>>> this one.
>>>>> >>> >
>>>>> >>> >                         Regards
>>>>> >>> >                         JB
>>>>> >>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
>>>>> >>> >                         <lcwik@google.com <mailto:
>>>>> lcwik@google.com>> a
>>>>> >>> >                         écrit:
>>>>> >>> >
>>>>> >>> >                             I wanted to reach out that I will be
>>>>> >>> >                             continuing from where Eugene left
>>>>> off with
>>>>> >>> >                             SplittableDoFn. I know that many of
>>>>> you have
>>>>> >>> >                             done a bunch of work with IOs and/or
>>>>> runner
>>>>> >>> >                             integration for SplittableDoFn and
>>>>> would
>>>>> >>> >                             appreciate your help in advancing
>>>>> this
>>>>> >>> >                             awesome idea. If you have questions
>>>>> or
>>>>> >>> >                             things you want to get reviewed
>>>>> related to
>>>>> >>> >                             SplittableDoFn, feel free to send
>>>>> them my
>>>>> >>> >                             way or include me on anything
>>>>> SplittableDoFn
>>>>> >>> >                             related.
>>>>> >>> >
>>>>> >>> >                             I was part of several discussions
>>>>> with
>>>>> >>> >                             Eugene and I think the biggest
>>>>> outstanding
>>>>> >>> >                             design portion is to figure out how
>>>>> dynamic
>>>>> >>> >                             work rebalancing would play out with
>>>>> the
>>>>> >>> >                             portability APIs. This includes
>>>>> reporting of
>>>>> >>> >                             progress from within a bundle. I
>>>>> know that
>>>>> >>> >                             Eugene had shared some documents in
>>>>> this
>>>>> >>> >                             regard but the position / split
>>>>> models
>>>>> >>> >                             didn't work too cleanly in a unified
>>>>> sense
>>>>> >>> >                             for bounded and unbounded
>>>>> SplittableDoFns.
>>>>> >>> >                             It will likely take me awhile to
>>>>> gather my
>>>>> >>> >                             thoughts but could use your
>>>>> expertise as to
>>>>> >>> >                             how compatible these ideas are with
>>>>> respect
>>>>> >>> >                             to to IOs and runners
>>>>> >>> >                             Flink/Spark/Dataflow/Samza/Apex/...
>>>>> and
>>>>> >>> >                             obviously help during implementation.
>>>>> >>> >
>>>>>
>>>>

Re: SplittableDoFn

Posted by Lukasz Cwik <lc...@google.com>.
The synchronization is related to Java thread safety since there is likely
to be concurrent access needed to a restriction tracker to properly handle
accessing the backlog and splitting concurrently from when the users DoFn
is executing and updating the restriction tracker. This is similar to the
Java thread safety needed in BoundedSource and UnboundedSource for fraction
consumed, backlog bytes, and splitting.

On Fri, Sep 21, 2018 at 2:38 PM Reuven Lax <re...@google.com> wrote:

> Can you give details on what the synchronization is per? Is it per key, or
> global to each worker?
>
> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> As I was looking at the SplittableDoFn API while working towards making a
>> proposal for how the backlog/splitting API could look, I found some sharp
>> edges that could be improved.
>>
>> I noticed that:
>> 1) We require users to write thread safe code, this is something that we
>> haven't asked of users when writing a DoFn.
>> 2) We "internal" methods within the RestrictionTracker that are not meant
>> to be used by the runner.
>>
>> I can fix these issues by giving the user a forwarding restriction
>> tracker[1] that provides an appropriate level of synchronization as needed
>> and also provides the necessary observation hooks to see when a claim
>> failed or succeeded.
>>
>> This requires a change to our experimental API since we need to pass
>> a RestrictionTracker to the @ProcessElement method instead of a sub-type of
>> RestrictionTracker.
>> @ProcessElement
>> processElement(ProcessContext context, OffsetRangeTracker tracker) { ... }
>> becomes:
>> @ProcessElement
>> processElement(ProcessContext context, RestrictionTracker<OffsetRange,
>> Long> tracker) { ... }
>>
>> This provides an additional benefit that it prevents users from working
>> around the RestrictionTracker APIs and potentially making underlying
>> changes to the tracker outside of the tryClaim call.
>>
>> Full implementation is available within this PR[2] and was wondering what
>> people thought.
>>
>> 1:
>> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
>> 2: https://github.com/apache/beam/pull/6467
>>
>>
>> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> The changes to the API have not been proposed yet. So far it has all
>>> been about what is the representation and why.
>>>
>>> For splitting, the current idea has been about using the backlog as a
>>> way of telling the SplittableDoFn where to split, so it would be in terms
>>> of whatever the SDK decided to report.
>>> The runner always chooses a number for backlog that is relative to the
>>> SDKs reported backlog. It would be upto the SDK to round/clamp the number
>>> given by the Runner to represent something meaningful for itself.
>>> For example if the backlog that the SDK was reporting was bytes
>>> remaining in a file such as 500, then the Runner could provide some value
>>> like 212.2 which the SDK would then round to 212.
>>> If the backlog that the SDK was reporting 57 pubsub messages, then the
>>> Runner could provide a value like 300 which would mean to read 57 values
>>> and then another 243 as part of the current restriction.
>>>
>>> I believe that BoundedSource/UnboundedSource will have wrappers added
>>> that provide a basic SplittableDoFn implementation so existing IOs should
>>> be migrated over without API changes.
>>>
>>> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>>
>>>> Thanks a lot Luke for bringing this back to the mailing list and Ryan
>>>> for taking
>>>> the notes.
>>>>
>>>> I would like to know if there was some discussion, or if you guys have
>>>> given
>>>> some thought to the required changes in the SDK (API) part. What will
>>>> be the
>>>> equivalent of `splitAtFraction` and what should IO authors do to
>>>> support it..
>>>>
>>>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <lc...@google.com> wrote:
>>>> >
>>>> > Thanks to everyone who joined and for the questions asked.
>>>> >
>>>> > Ryan graciously collected notes of the discussion:
>>>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
>>>> >
>>>> > The summary was that bringing BoundedSource/UnboundedSource into
>>>> using a unified backlog-reporting mechanism with optional other signals
>>>> that Dataflow has found useful (such as is the remaining restriction
>>>> splittable (yes, no, unknown)). Other runners can use or not. SDFs should
>>>> report backlog and watermark as minimum bar. The backlog should use an
>>>> arbitrary precision float such as Java BigDecimal to prevent issues where
>>>> limited precision removes the ability to compute delta efficiently.
>>>> >
>>>> >
>>>> >
>>>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <lc...@google.com> wrote:
>>>> >>
>>>> >> Here is the link to join the discussion:
>>>> https://meet.google.com/idc-japs-hwf
>>>> >> Remember that it is this Friday Sept 14th from 11am-noon PST.
>>>> >>
>>>> >>
>>>> >>
>>>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <mx...@apache.org>
>>>> wrote:
>>>> >>>
>>>> >>> Thanks for moving forward with this, Lukasz!
>>>> >>>
>>>> >>> Unfortunately, can't make it on Friday but I'll sync with somebody
>>>> on
>>>> >>> the call (e.g. Ryan) about your discussion.
>>>> >>>
>>>> >>> On 08.09.18 02:00, Lukasz Cwik wrote:
>>>> >>> > Thanks for everyone who wanted to fill out the doodle poll. The
>>>> most
>>>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll send
>>>> out a
>>>> >>> > calendar invite and meeting link early next week.
>>>> >>> >
>>>> >>> > I have received a lot of feedback on the document and have
>>>> addressed
>>>> >>> > some parts of it including:
>>>> >>> > * clarifying terminology
>>>> >>> > * processing skew due to some restrictions having their
>>>> watermarks much
>>>> >>> > further behind then others affecting scheduling of bundles by
>>>> runners
>>>> >>> > * external throttling & I/O wait overhead reporting to make sure
>>>> we
>>>> >>> > don't overscale
>>>> >>> >
>>>> >>> > Areas that still need additional feedback and details are:
>>>> >>> > * reporting progress around the work that is done and is active
>>>> >>> > * more examples
>>>> >>> > * unbounded restrictions being caused by an unbounded number of
>>>> splits
>>>> >>> > of existing unbounded restrictions (infinite work growth)
>>>> >>> > * whether we should be reporting this information at the
>>>> PTransform
>>>> >>> > level or at the bundle level
>>>> >>> >
>>>> >>> >
>>>> >>> >
>>>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lcwik@google.com
>>>> >>> > <ma...@google.com>> wrote:
>>>> >>> >
>>>> >>> >     Thanks to all those who have provided interest in this topic
>>>> by the
>>>> >>> >     questions they have asked on the doc already and for those
>>>> >>> >     interested in having this discussion. I have setup this
>>>> doodle to
>>>> >>> >     allow people to provide their availability:
>>>> >>> >     https://doodle.com/poll/nrw7w84255xnfwqy
>>>> >>> >
>>>> >>> >     I'll send out the chosen time based upon peoples availability
>>>> and a
>>>> >>> >     Hangout link by end of day Friday so please mark your
>>>> availability
>>>> >>> >     using the link above.
>>>> >>> >
>>>> >>> >     The agenda of the meeting will be as follows:
>>>> >>> >     * Overview of the proposal
>>>> >>> >     * Enumerate and discuss/answer questions brought up in the
>>>> meeting
>>>> >>> >
>>>> >>> >     Note that all questions and any discussions/answers provided
>>>> will be
>>>> >>> >     added to the doc for those who are unable to attend.
>>>> >>> >
>>>> >>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>>>> >>> >     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>>>> >>> >
>>>> >>> >         +1
>>>> >>> >
>>>> >>> >         Regards
>>>> >>> >         JB
>>>> >>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <lcwik@google.com
>>>> >>> >         <ma...@google.com>> a écrit:
>>>> >>> >
>>>> >>> >             That is possible, I'll take people's date/time
>>>> suggestions
>>>> >>> >             and create a simple online poll with them.
>>>> >>> >
>>>> >>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>>>> >>> >             <robertwb@google.com <ma...@google.com>>
>>>> wrote:
>>>> >>> >
>>>> >>> >                 Thanks for taking this up. I added some comments
>>>> to the
>>>> >>> >                 doc. A European-friendly time for discussion would
>>>> >>> >                 be great.
>>>> >>> >
>>>> >>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>>>> >>> >                 <lcwik@google.com <ma...@google.com>>
>>>> wrote:
>>>> >>> >
>>>> >>> >                     I came up with a proposal[1] for a progress
>>>> model
>>>> >>> >                     solely based off of the backlog and that
>>>> splits
>>>> >>> >                     should be based upon the remaining backlog we
>>>> want
>>>> >>> >                     the SDK to split at. I also give
>>>> recommendations to
>>>> >>> >                     runner authors as to how an autoscaling
>>>> system could
>>>> >>> >                     work based upon the measured backlog. A lot of
>>>> >>> >                     discussions around progress reporting and
>>>> splitting
>>>> >>> >                     in the past has always been around finding an
>>>> >>> >                     optimal solution, after reading a lot of
>>>> information
>>>> >>> >                     about work stealing, I don't believe there is
>>>> a
>>>> >>> >                     general solution and it really is upto
>>>> >>> >                     SplittableDoFns to be well behaved. I did not
>>>> do
>>>> >>> >                     much work in classifying what a well behaved
>>>> >>> >                     SplittableDoFn is though. Much of this work
>>>> builds
>>>> >>> >                     off ideas that Eugene had documented in the
>>>> past[2].
>>>> >>> >
>>>> >>> >                     I could use the communities wide knowledge of
>>>> >>> >                     different I/Os to see if computing the
>>>> backlog is
>>>> >>> >                     practical in the way that I'm suggesting and
>>>> to
>>>> >>> >                     gather people's feedback.
>>>> >>> >
>>>> >>> >                     If there is a lot of interest, I would like
>>>> to hold
>>>> >>> >                     a community video conference between Sept
>>>> 10th and
>>>> >>> >                     14th about this topic. Please reply with your
>>>> >>> >                     availability by Sept 6th if your interested.
>>>> >>> >
>>>> >>> >                     1:
>>>> https://s.apache.org/beam-bundles-backlog-splitting
>>>> >>> >                     2: https://s.apache.org/beam-breaking-fusion
>>>> >>> >
>>>> >>> >                     On Mon, Aug 13, 2018 at 10:21 AM Jean-Baptiste
>>>> >>> >                     Onofré <jb@nanthrax.net <mailto:
>>>> jb@nanthrax.net>> wrote:
>>>> >>> >
>>>> >>> >                         Awesome !
>>>> >>> >
>>>> >>> >                         Thanks Luke !
>>>> >>> >
>>>> >>> >                         I plan to work with you and others on
>>>> this one.
>>>> >>> >
>>>> >>> >                         Regards
>>>> >>> >                         JB
>>>> >>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
>>>> >>> >                         <lcwik@google.com <mailto:
>>>> lcwik@google.com>> a
>>>> >>> >                         écrit:
>>>> >>> >
>>>> >>> >                             I wanted to reach out that I will be
>>>> >>> >                             continuing from where Eugene left off
>>>> with
>>>> >>> >                             SplittableDoFn. I know that many of
>>>> you have
>>>> >>> >                             done a bunch of work with IOs and/or
>>>> runner
>>>> >>> >                             integration for SplittableDoFn and
>>>> would
>>>> >>> >                             appreciate your help in advancing this
>>>> >>> >                             awesome idea. If you have questions or
>>>> >>> >                             things you want to get reviewed
>>>> related to
>>>> >>> >                             SplittableDoFn, feel free to send
>>>> them my
>>>> >>> >                             way or include me on anything
>>>> SplittableDoFn
>>>> >>> >                             related.
>>>> >>> >
>>>> >>> >                             I was part of several discussions with
>>>> >>> >                             Eugene and I think the biggest
>>>> outstanding
>>>> >>> >                             design portion is to figure out how
>>>> dynamic
>>>> >>> >                             work rebalancing would play out with
>>>> the
>>>> >>> >                             portability APIs. This includes
>>>> reporting of
>>>> >>> >                             progress from within a bundle. I know
>>>> that
>>>> >>> >                             Eugene had shared some documents in
>>>> this
>>>> >>> >                             regard but the position / split models
>>>> >>> >                             didn't work too cleanly in a unified
>>>> sense
>>>> >>> >                             for bounded and unbounded
>>>> SplittableDoFns.
>>>> >>> >                             It will likely take me awhile to
>>>> gather my
>>>> >>> >                             thoughts but could use your expertise
>>>> as to
>>>> >>> >                             how compatible these ideas are with
>>>> respect
>>>> >>> >                             to to IOs and runners
>>>> >>> >                             Flink/Spark/Dataflow/Samza/Apex/...
>>>> and
>>>> >>> >                             obviously help during implementation.
>>>> >>> >
>>>>
>>>

Re: SplittableDoFn

Posted by Reuven Lax <re...@google.com>.
Can you give details on what the synchronization is per? Is it per key, or
global to each worker?

On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik <lc...@google.com> wrote:

> As I was looking at the SplittableDoFn API while working towards making a
> proposal for how the backlog/splitting API could look, I found some sharp
> edges that could be improved.
>
> I noticed that:
> 1) We require users to write thread safe code, this is something that we
> haven't asked of users when writing a DoFn.
> 2) We "internal" methods within the RestrictionTracker that are not meant
> to be used by the runner.
>
> I can fix these issues by giving the user a forwarding restriction
> tracker[1] that provides an appropriate level of synchronization as needed
> and also provides the necessary observation hooks to see when a claim
> failed or succeeded.
>
> This requires a change to our experimental API since we need to pass
> a RestrictionTracker to the @ProcessElement method instead of a sub-type of
> RestrictionTracker.
> @ProcessElement
> processElement(ProcessContext context, OffsetRangeTracker tracker) { ... }
> becomes:
> @ProcessElement
> processElement(ProcessContext context, RestrictionTracker<OffsetRange,
> Long> tracker) { ... }
>
> This provides an additional benefit that it prevents users from working
> around the RestrictionTracker APIs and potentially making underlying
> changes to the tracker outside of the tryClaim call.
>
> Full implementation is available within this PR[2] and was wondering what
> people thought.
>
> 1:
> https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
> 2: https://github.com/apache/beam/pull/6467
>
>
> On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> The changes to the API have not been proposed yet. So far it has all been
>> about what is the representation and why.
>>
>> For splitting, the current idea has been about using the backlog as a way
>> of telling the SplittableDoFn where to split, so it would be in terms of
>> whatever the SDK decided to report.
>> The runner always chooses a number for backlog that is relative to the
>> SDKs reported backlog. It would be upto the SDK to round/clamp the number
>> given by the Runner to represent something meaningful for itself.
>> For example if the backlog that the SDK was reporting was bytes remaining
>> in a file such as 500, then the Runner could provide some value like 212.2
>> which the SDK would then round to 212.
>> If the backlog that the SDK was reporting 57 pubsub messages, then the
>> Runner could provide a value like 300 which would mean to read 57 values
>> and then another 243 as part of the current restriction.
>>
>> I believe that BoundedSource/UnboundedSource will have wrappers added
>> that provide a basic SplittableDoFn implementation so existing IOs should
>> be migrated over without API changes.
>>
>> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>
>>> Thanks a lot Luke for bringing this back to the mailing list and Ryan
>>> for taking
>>> the notes.
>>>
>>> I would like to know if there was some discussion, or if you guys have
>>> given
>>> some thought to the required changes in the SDK (API) part. What will be
>>> the
>>> equivalent of `splitAtFraction` and what should IO authors do to support
>>> it..
>>>
>>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <lc...@google.com> wrote:
>>> >
>>> > Thanks to everyone who joined and for the questions asked.
>>> >
>>> > Ryan graciously collected notes of the discussion:
>>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
>>> >
>>> > The summary was that bringing BoundedSource/UnboundedSource into using
>>> a unified backlog-reporting mechanism with optional other signals that
>>> Dataflow has found useful (such as is the remaining restriction splittable
>>> (yes, no, unknown)). Other runners can use or not. SDFs should report
>>> backlog and watermark as minimum bar. The backlog should use an arbitrary
>>> precision float such as Java BigDecimal to prevent issues where limited
>>> precision removes the ability to compute delta efficiently.
>>> >
>>> >
>>> >
>>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <lc...@google.com> wrote:
>>> >>
>>> >> Here is the link to join the discussion:
>>> https://meet.google.com/idc-japs-hwf
>>> >> Remember that it is this Friday Sept 14th from 11am-noon PST.
>>> >>
>>> >>
>>> >>
>>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>> >>>
>>> >>> Thanks for moving forward with this, Lukasz!
>>> >>>
>>> >>> Unfortunately, can't make it on Friday but I'll sync with somebody on
>>> >>> the call (e.g. Ryan) about your discussion.
>>> >>>
>>> >>> On 08.09.18 02:00, Lukasz Cwik wrote:
>>> >>> > Thanks for everyone who wanted to fill out the doodle poll. The
>>> most
>>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll send
>>> out a
>>> >>> > calendar invite and meeting link early next week.
>>> >>> >
>>> >>> > I have received a lot of feedback on the document and have
>>> addressed
>>> >>> > some parts of it including:
>>> >>> > * clarifying terminology
>>> >>> > * processing skew due to some restrictions having their watermarks
>>> much
>>> >>> > further behind then others affecting scheduling of bundles by
>>> runners
>>> >>> > * external throttling & I/O wait overhead reporting to make sure we
>>> >>> > don't overscale
>>> >>> >
>>> >>> > Areas that still need additional feedback and details are:
>>> >>> > * reporting progress around the work that is done and is active
>>> >>> > * more examples
>>> >>> > * unbounded restrictions being caused by an unbounded number of
>>> splits
>>> >>> > of existing unbounded restrictions (infinite work growth)
>>> >>> > * whether we should be reporting this information at the PTransform
>>> >>> > level or at the bundle level
>>> >>> >
>>> >>> >
>>> >>> >
>>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lcwik@google.com
>>> >>> > <ma...@google.com>> wrote:
>>> >>> >
>>> >>> >     Thanks to all those who have provided interest in this topic
>>> by the
>>> >>> >     questions they have asked on the doc already and for those
>>> >>> >     interested in having this discussion. I have setup this doodle
>>> to
>>> >>> >     allow people to provide their availability:
>>> >>> >     https://doodle.com/poll/nrw7w84255xnfwqy
>>> >>> >
>>> >>> >     I'll send out the chosen time based upon peoples availability
>>> and a
>>> >>> >     Hangout link by end of day Friday so please mark your
>>> availability
>>> >>> >     using the link above.
>>> >>> >
>>> >>> >     The agenda of the meeting will be as follows:
>>> >>> >     * Overview of the proposal
>>> >>> >     * Enumerate and discuss/answer questions brought up in the
>>> meeting
>>> >>> >
>>> >>> >     Note that all questions and any discussions/answers provided
>>> will be
>>> >>> >     added to the doc for those who are unable to attend.
>>> >>> >
>>> >>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>>> >>> >     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>>> >>> >
>>> >>> >         +1
>>> >>> >
>>> >>> >         Regards
>>> >>> >         JB
>>> >>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <lcwik@google.com
>>> >>> >         <ma...@google.com>> a écrit:
>>> >>> >
>>> >>> >             That is possible, I'll take people's date/time
>>> suggestions
>>> >>> >             and create a simple online poll with them.
>>> >>> >
>>> >>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>>> >>> >             <robertwb@google.com <ma...@google.com>>
>>> wrote:
>>> >>> >
>>> >>> >                 Thanks for taking this up. I added some comments
>>> to the
>>> >>> >                 doc. A European-friendly time for discussion would
>>> >>> >                 be great.
>>> >>> >
>>> >>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>>> >>> >                 <lcwik@google.com <ma...@google.com>>
>>> wrote:
>>> >>> >
>>> >>> >                     I came up with a proposal[1] for a progress
>>> model
>>> >>> >                     solely based off of the backlog and that splits
>>> >>> >                     should be based upon the remaining backlog we
>>> want
>>> >>> >                     the SDK to split at. I also give
>>> recommendations to
>>> >>> >                     runner authors as to how an autoscaling system
>>> could
>>> >>> >                     work based upon the measured backlog. A lot of
>>> >>> >                     discussions around progress reporting and
>>> splitting
>>> >>> >                     in the past has always been around finding an
>>> >>> >                     optimal solution, after reading a lot of
>>> information
>>> >>> >                     about work stealing, I don't believe there is a
>>> >>> >                     general solution and it really is upto
>>> >>> >                     SplittableDoFns to be well behaved. I did not
>>> do
>>> >>> >                     much work in classifying what a well behaved
>>> >>> >                     SplittableDoFn is though. Much of this work
>>> builds
>>> >>> >                     off ideas that Eugene had documented in the
>>> past[2].
>>> >>> >
>>> >>> >                     I could use the communities wide knowledge of
>>> >>> >                     different I/Os to see if computing the backlog
>>> is
>>> >>> >                     practical in the way that I'm suggesting and to
>>> >>> >                     gather people's feedback.
>>> >>> >
>>> >>> >                     If there is a lot of interest, I would like to
>>> hold
>>> >>> >                     a community video conference between Sept 10th
>>> and
>>> >>> >                     14th about this topic. Please reply with your
>>> >>> >                     availability by Sept 6th if your interested.
>>> >>> >
>>> >>> >                     1:
>>> https://s.apache.org/beam-bundles-backlog-splitting
>>> >>> >                     2: https://s.apache.org/beam-breaking-fusion
>>> >>> >
>>> >>> >                     On Mon, Aug 13, 2018 at 10:21 AM Jean-Baptiste
>>> >>> >                     Onofré <jb@nanthrax.net <mailto:
>>> jb@nanthrax.net>> wrote:
>>> >>> >
>>> >>> >                         Awesome !
>>> >>> >
>>> >>> >                         Thanks Luke !
>>> >>> >
>>> >>> >                         I plan to work with you and others on this
>>> one.
>>> >>> >
>>> >>> >                         Regards
>>> >>> >                         JB
>>> >>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
>>> >>> >                         <lcwik@google.com <ma...@google.com>>
>>> a
>>> >>> >                         écrit:
>>> >>> >
>>> >>> >                             I wanted to reach out that I will be
>>> >>> >                             continuing from where Eugene left off
>>> with
>>> >>> >                             SplittableDoFn. I know that many of
>>> you have
>>> >>> >                             done a bunch of work with IOs and/or
>>> runner
>>> >>> >                             integration for SplittableDoFn and
>>> would
>>> >>> >                             appreciate your help in advancing this
>>> >>> >                             awesome idea. If you have questions or
>>> >>> >                             things you want to get reviewed
>>> related to
>>> >>> >                             SplittableDoFn, feel free to send them
>>> my
>>> >>> >                             way or include me on anything
>>> SplittableDoFn
>>> >>> >                             related.
>>> >>> >
>>> >>> >                             I was part of several discussions with
>>> >>> >                             Eugene and I think the biggest
>>> outstanding
>>> >>> >                             design portion is to figure out how
>>> dynamic
>>> >>> >                             work rebalancing would play out with
>>> the
>>> >>> >                             portability APIs. This includes
>>> reporting of
>>> >>> >                             progress from within a bundle. I know
>>> that
>>> >>> >                             Eugene had shared some documents in
>>> this
>>> >>> >                             regard but the position / split models
>>> >>> >                             didn't work too cleanly in a unified
>>> sense
>>> >>> >                             for bounded and unbounded
>>> SplittableDoFns.
>>> >>> >                             It will likely take me awhile to
>>> gather my
>>> >>> >                             thoughts but could use your expertise
>>> as to
>>> >>> >                             how compatible these ideas are with
>>> respect
>>> >>> >                             to to IOs and runners
>>> >>> >                             Flink/Spark/Dataflow/Samza/Apex/... and
>>> >>> >                             obviously help during implementation.
>>> >>> >
>>>
>>

Re: SplittableDoFn

Posted by Lukasz Cwik <lc...@google.com>.
As I was looking at the SplittableDoFn API while working towards making a
proposal for how the backlog/splitting API could look, I found some sharp
edges that could be improved.

I noticed that:
1) We require users to write thread safe code, this is something that we
haven't asked of users when writing a DoFn.
2) We "internal" methods within the RestrictionTracker that are not meant
to be used by the runner.

I can fix these issues by giving the user a forwarding restriction
tracker[1] that provides an appropriate level of synchronization as needed
and also provides the necessary observation hooks to see when a claim
failed or succeeded.

This requires a change to our experimental API since we need to pass
a RestrictionTracker to the @ProcessElement method instead of a sub-type of
RestrictionTracker.
@ProcessElement
processElement(ProcessContext context, OffsetRangeTracker tracker) { ... }
becomes:
@ProcessElement
processElement(ProcessContext context, RestrictionTracker<OffsetRange,
Long> tracker) { ... }

This provides an additional benefit that it prevents users from working
around the RestrictionTracker APIs and potentially making underlying
changes to the tracker outside of the tryClaim call.

Full implementation is available within this PR[2] and was wondering what
people thought.

1:
https://github.com/apache/beam/pull/6467/files#diff-ed95abb6bc30a9ed07faef5c3fea93f0R72
2: https://github.com/apache/beam/pull/6467


On Mon, Sep 17, 2018 at 12:45 PM Lukasz Cwik <lc...@google.com> wrote:

> The changes to the API have not been proposed yet. So far it has all been
> about what is the representation and why.
>
> For splitting, the current idea has been about using the backlog as a way
> of telling the SplittableDoFn where to split, so it would be in terms of
> whatever the SDK decided to report.
> The runner always chooses a number for backlog that is relative to the
> SDKs reported backlog. It would be upto the SDK to round/clamp the number
> given by the Runner to represent something meaningful for itself.
> For example if the backlog that the SDK was reporting was bytes remaining
> in a file such as 500, then the Runner could provide some value like 212.2
> which the SDK would then round to 212.
> If the backlog that the SDK was reporting 57 pubsub messages, then the
> Runner could provide a value like 300 which would mean to read 57 values
> and then another 243 as part of the current restriction.
>
> I believe that BoundedSource/UnboundedSource will have wrappers added that
> provide a basic SplittableDoFn implementation so existing IOs should be
> migrated over without API changes.
>
> On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <ie...@gmail.com> wrote:
>
>> Thanks a lot Luke for bringing this back to the mailing list and Ryan for
>> taking
>> the notes.
>>
>> I would like to know if there was some discussion, or if you guys have
>> given
>> some thought to the required changes in the SDK (API) part. What will be
>> the
>> equivalent of `splitAtFraction` and what should IO authors do to support
>> it..
>>
>> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <lc...@google.com> wrote:
>> >
>> > Thanks to everyone who joined and for the questions asked.
>> >
>> > Ryan graciously collected notes of the discussion:
>> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
>> >
>> > The summary was that bringing BoundedSource/UnboundedSource into using
>> a unified backlog-reporting mechanism with optional other signals that
>> Dataflow has found useful (such as is the remaining restriction splittable
>> (yes, no, unknown)). Other runners can use or not. SDFs should report
>> backlog and watermark as minimum bar. The backlog should use an arbitrary
>> precision float such as Java BigDecimal to prevent issues where limited
>> precision removes the ability to compute delta efficiently.
>> >
>> >
>> >
>> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <lc...@google.com> wrote:
>> >>
>> >> Here is the link to join the discussion:
>> https://meet.google.com/idc-japs-hwf
>> >> Remember that it is this Friday Sept 14th from 11am-noon PST.
>> >>
>> >>
>> >>
>> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <mx...@apache.org>
>> wrote:
>> >>>
>> >>> Thanks for moving forward with this, Lukasz!
>> >>>
>> >>> Unfortunately, can't make it on Friday but I'll sync with somebody on
>> >>> the call (e.g. Ryan) about your discussion.
>> >>>
>> >>> On 08.09.18 02:00, Lukasz Cwik wrote:
>> >>> > Thanks for everyone who wanted to fill out the doodle poll. The most
>> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll send out
>> a
>> >>> > calendar invite and meeting link early next week.
>> >>> >
>> >>> > I have received a lot of feedback on the document and have addressed
>> >>> > some parts of it including:
>> >>> > * clarifying terminology
>> >>> > * processing skew due to some restrictions having their watermarks
>> much
>> >>> > further behind then others affecting scheduling of bundles by
>> runners
>> >>> > * external throttling & I/O wait overhead reporting to make sure we
>> >>> > don't overscale
>> >>> >
>> >>> > Areas that still need additional feedback and details are:
>> >>> > * reporting progress around the work that is done and is active
>> >>> > * more examples
>> >>> > * unbounded restrictions being caused by an unbounded number of
>> splits
>> >>> > of existing unbounded restrictions (infinite work growth)
>> >>> > * whether we should be reporting this information at the PTransform
>> >>> > level or at the bundle level
>> >>> >
>> >>> >
>> >>> >
>> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lcwik@google.com
>> >>> > <ma...@google.com>> wrote:
>> >>> >
>> >>> >     Thanks to all those who have provided interest in this topic by
>> the
>> >>> >     questions they have asked on the doc already and for those
>> >>> >     interested in having this discussion. I have setup this doodle
>> to
>> >>> >     allow people to provide their availability:
>> >>> >     https://doodle.com/poll/nrw7w84255xnfwqy
>> >>> >
>> >>> >     I'll send out the chosen time based upon peoples availability
>> and a
>> >>> >     Hangout link by end of day Friday so please mark your
>> availability
>> >>> >     using the link above.
>> >>> >
>> >>> >     The agenda of the meeting will be as follows:
>> >>> >     * Overview of the proposal
>> >>> >     * Enumerate and discuss/answer questions brought up in the
>> meeting
>> >>> >
>> >>> >     Note that all questions and any discussions/answers provided
>> will be
>> >>> >     added to the doc for those who are unable to attend.
>> >>> >
>> >>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>> >>> >     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>> >>> >
>> >>> >         +1
>> >>> >
>> >>> >         Regards
>> >>> >         JB
>> >>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <lcwik@google.com
>> >>> >         <ma...@google.com>> a écrit:
>> >>> >
>> >>> >             That is possible, I'll take people's date/time
>> suggestions
>> >>> >             and create a simple online poll with them.
>> >>> >
>> >>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>> >>> >             <robertwb@google.com <ma...@google.com>>
>> wrote:
>> >>> >
>> >>> >                 Thanks for taking this up. I added some comments to
>> the
>> >>> >                 doc. A European-friendly time for discussion would
>> >>> >                 be great.
>> >>> >
>> >>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>> >>> >                 <lcwik@google.com <ma...@google.com>> wrote:
>> >>> >
>> >>> >                     I came up with a proposal[1] for a progress
>> model
>> >>> >                     solely based off of the backlog and that splits
>> >>> >                     should be based upon the remaining backlog we
>> want
>> >>> >                     the SDK to split at. I also give
>> recommendations to
>> >>> >                     runner authors as to how an autoscaling system
>> could
>> >>> >                     work based upon the measured backlog. A lot of
>> >>> >                     discussions around progress reporting and
>> splitting
>> >>> >                     in the past has always been around finding an
>> >>> >                     optimal solution, after reading a lot of
>> information
>> >>> >                     about work stealing, I don't believe there is a
>> >>> >                     general solution and it really is upto
>> >>> >                     SplittableDoFns to be well behaved. I did not do
>> >>> >                     much work in classifying what a well behaved
>> >>> >                     SplittableDoFn is though. Much of this work
>> builds
>> >>> >                     off ideas that Eugene had documented in the
>> past[2].
>> >>> >
>> >>> >                     I could use the communities wide knowledge of
>> >>> >                     different I/Os to see if computing the backlog
>> is
>> >>> >                     practical in the way that I'm suggesting and to
>> >>> >                     gather people's feedback.
>> >>> >
>> >>> >                     If there is a lot of interest, I would like to
>> hold
>> >>> >                     a community video conference between Sept 10th
>> and
>> >>> >                     14th about this topic. Please reply with your
>> >>> >                     availability by Sept 6th if your interested.
>> >>> >
>> >>> >                     1:
>> https://s.apache.org/beam-bundles-backlog-splitting
>> >>> >                     2: https://s.apache.org/beam-breaking-fusion
>> >>> >
>> >>> >                     On Mon, Aug 13, 2018 at 10:21 AM Jean-Baptiste
>> >>> >                     Onofré <jb@nanthrax.net <ma...@nanthrax.net>>
>> wrote:
>> >>> >
>> >>> >                         Awesome !
>> >>> >
>> >>> >                         Thanks Luke !
>> >>> >
>> >>> >                         I plan to work with you and others on this
>> one.
>> >>> >
>> >>> >                         Regards
>> >>> >                         JB
>> >>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
>> >>> >                         <lcwik@google.com <ma...@google.com>>
>> a
>> >>> >                         écrit:
>> >>> >
>> >>> >                             I wanted to reach out that I will be
>> >>> >                             continuing from where Eugene left off
>> with
>> >>> >                             SplittableDoFn. I know that many of you
>> have
>> >>> >                             done a bunch of work with IOs and/or
>> runner
>> >>> >                             integration for SplittableDoFn and would
>> >>> >                             appreciate your help in advancing this
>> >>> >                             awesome idea. If you have questions or
>> >>> >                             things you want to get reviewed related
>> to
>> >>> >                             SplittableDoFn, feel free to send them
>> my
>> >>> >                             way or include me on anything
>> SplittableDoFn
>> >>> >                             related.
>> >>> >
>> >>> >                             I was part of several discussions with
>> >>> >                             Eugene and I think the biggest
>> outstanding
>> >>> >                             design portion is to figure out how
>> dynamic
>> >>> >                             work rebalancing would play out with the
>> >>> >                             portability APIs. This includes
>> reporting of
>> >>> >                             progress from within a bundle. I know
>> that
>> >>> >                             Eugene had shared some documents in this
>> >>> >                             regard but the position / split models
>> >>> >                             didn't work too cleanly in a unified
>> sense
>> >>> >                             for bounded and unbounded
>> SplittableDoFns.
>> >>> >                             It will likely take me awhile to gather
>> my
>> >>> >                             thoughts but could use your expertise
>> as to
>> >>> >                             how compatible these ideas are with
>> respect
>> >>> >                             to to IOs and runners
>> >>> >                             Flink/Spark/Dataflow/Samza/Apex/... and
>> >>> >                             obviously help during implementation.
>> >>> >
>>
>

Re: SplittableDoFn

Posted by Lukasz Cwik <lc...@google.com>.
The changes to the API have not been proposed yet. So far it has all been
about what is the representation and why.

For splitting, the current idea has been about using the backlog as a way
of telling the SplittableDoFn where to split, so it would be in terms of
whatever the SDK decided to report.
The runner always chooses a number for backlog that is relative to the SDKs
reported backlog. It would be upto the SDK to round/clamp the number given
by the Runner to represent something meaningful for itself.
For example if the backlog that the SDK was reporting was bytes remaining
in a file such as 500, then the Runner could provide some value like 212.2
which the SDK would then round to 212.
If the backlog that the SDK was reporting 57 pubsub messages, then the
Runner could provide a value like 300 which would mean to read 57 values
and then another 243 as part of the current restriction.

I believe that BoundedSource/UnboundedSource will have wrappers added that
provide a basic SplittableDoFn implementation so existing IOs should be
migrated over without API changes.

On Mon, Sep 17, 2018 at 1:11 AM Ismaël Mejía <ie...@gmail.com> wrote:

> Thanks a lot Luke for bringing this back to the mailing list and Ryan for
> taking
> the notes.
>
> I would like to know if there was some discussion, or if you guys have
> given
> some thought to the required changes in the SDK (API) part. What will be
> the
> equivalent of `splitAtFraction` and what should IO authors do to support
> it..
>
> On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <lc...@google.com> wrote:
> >
> > Thanks to everyone who joined and for the questions asked.
> >
> > Ryan graciously collected notes of the discussion:
> https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
> >
> > The summary was that bringing BoundedSource/UnboundedSource into using a
> unified backlog-reporting mechanism with optional other signals that
> Dataflow has found useful (such as is the remaining restriction splittable
> (yes, no, unknown)). Other runners can use or not. SDFs should report
> backlog and watermark as minimum bar. The backlog should use an arbitrary
> precision float such as Java BigDecimal to prevent issues where limited
> precision removes the ability to compute delta efficiently.
> >
> >
> >
> > On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <lc...@google.com> wrote:
> >>
> >> Here is the link to join the discussion:
> https://meet.google.com/idc-japs-hwf
> >> Remember that it is this Friday Sept 14th from 11am-noon PST.
> >>
> >>
> >>
> >> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <mx...@apache.org>
> wrote:
> >>>
> >>> Thanks for moving forward with this, Lukasz!
> >>>
> >>> Unfortunately, can't make it on Friday but I'll sync with somebody on
> >>> the call (e.g. Ryan) about your discussion.
> >>>
> >>> On 08.09.18 02:00, Lukasz Cwik wrote:
> >>> > Thanks for everyone who wanted to fill out the doodle poll. The most
> >>> > popular time was Friday Sept 14th from 11am-noon PST. I'll send out a
> >>> > calendar invite and meeting link early next week.
> >>> >
> >>> > I have received a lot of feedback on the document and have addressed
> >>> > some parts of it including:
> >>> > * clarifying terminology
> >>> > * processing skew due to some restrictions having their watermarks
> much
> >>> > further behind then others affecting scheduling of bundles by runners
> >>> > * external throttling & I/O wait overhead reporting to make sure we
> >>> > don't overscale
> >>> >
> >>> > Areas that still need additional feedback and details are:
> >>> > * reporting progress around the work that is done and is active
> >>> > * more examples
> >>> > * unbounded restrictions being caused by an unbounded number of
> splits
> >>> > of existing unbounded restrictions (infinite work growth)
> >>> > * whether we should be reporting this information at the PTransform
> >>> > level or at the bundle level
> >>> >
> >>> >
> >>> >
> >>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lcwik@google.com
> >>> > <ma...@google.com>> wrote:
> >>> >
> >>> >     Thanks to all those who have provided interest in this topic by
> the
> >>> >     questions they have asked on the doc already and for those
> >>> >     interested in having this discussion. I have setup this doodle to
> >>> >     allow people to provide their availability:
> >>> >     https://doodle.com/poll/nrw7w84255xnfwqy
> >>> >
> >>> >     I'll send out the chosen time based upon peoples availability
> and a
> >>> >     Hangout link by end of day Friday so please mark your
> availability
> >>> >     using the link above.
> >>> >
> >>> >     The agenda of the meeting will be as follows:
> >>> >     * Overview of the proposal
> >>> >     * Enumerate and discuss/answer questions brought up in the
> meeting
> >>> >
> >>> >     Note that all questions and any discussions/answers provided
> will be
> >>> >     added to the doc for those who are unable to attend.
> >>> >
> >>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
> >>> >     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
> >>> >
> >>> >         +1
> >>> >
> >>> >         Regards
> >>> >         JB
> >>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <lcwik@google.com
> >>> >         <ma...@google.com>> a écrit:
> >>> >
> >>> >             That is possible, I'll take people's date/time
> suggestions
> >>> >             and create a simple online poll with them.
> >>> >
> >>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
> >>> >             <robertwb@google.com <ma...@google.com>>
> wrote:
> >>> >
> >>> >                 Thanks for taking this up. I added some comments to
> the
> >>> >                 doc. A European-friendly time for discussion would
> >>> >                 be great.
> >>> >
> >>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
> >>> >                 <lcwik@google.com <ma...@google.com>> wrote:
> >>> >
> >>> >                     I came up with a proposal[1] for a progress model
> >>> >                     solely based off of the backlog and that splits
> >>> >                     should be based upon the remaining backlog we
> want
> >>> >                     the SDK to split at. I also give recommendations
> to
> >>> >                     runner authors as to how an autoscaling system
> could
> >>> >                     work based upon the measured backlog. A lot of
> >>> >                     discussions around progress reporting and
> splitting
> >>> >                     in the past has always been around finding an
> >>> >                     optimal solution, after reading a lot of
> information
> >>> >                     about work stealing, I don't believe there is a
> >>> >                     general solution and it really is upto
> >>> >                     SplittableDoFns to be well behaved. I did not do
> >>> >                     much work in classifying what a well behaved
> >>> >                     SplittableDoFn is though. Much of this work
> builds
> >>> >                     off ideas that Eugene had documented in the
> past[2].
> >>> >
> >>> >                     I could use the communities wide knowledge of
> >>> >                     different I/Os to see if computing the backlog is
> >>> >                     practical in the way that I'm suggesting and to
> >>> >                     gather people's feedback.
> >>> >
> >>> >                     If there is a lot of interest, I would like to
> hold
> >>> >                     a community video conference between Sept 10th
> and
> >>> >                     14th about this topic. Please reply with your
> >>> >                     availability by Sept 6th if your interested.
> >>> >
> >>> >                     1:
> https://s.apache.org/beam-bundles-backlog-splitting
> >>> >                     2: https://s.apache.org/beam-breaking-fusion
> >>> >
> >>> >                     On Mon, Aug 13, 2018 at 10:21 AM Jean-Baptiste
> >>> >                     Onofré <jb@nanthrax.net <ma...@nanthrax.net>>
> wrote:
> >>> >
> >>> >                         Awesome !
> >>> >
> >>> >                         Thanks Luke !
> >>> >
> >>> >                         I plan to work with you and others on this
> one.
> >>> >
> >>> >                         Regards
> >>> >                         JB
> >>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
> >>> >                         <lcwik@google.com <ma...@google.com>>
> a
> >>> >                         écrit:
> >>> >
> >>> >                             I wanted to reach out that I will be
> >>> >                             continuing from where Eugene left off
> with
> >>> >                             SplittableDoFn. I know that many of you
> have
> >>> >                             done a bunch of work with IOs and/or
> runner
> >>> >                             integration for SplittableDoFn and would
> >>> >                             appreciate your help in advancing this
> >>> >                             awesome idea. If you have questions or
> >>> >                             things you want to get reviewed related
> to
> >>> >                             SplittableDoFn, feel free to send them my
> >>> >                             way or include me on anything
> SplittableDoFn
> >>> >                             related.
> >>> >
> >>> >                             I was part of several discussions with
> >>> >                             Eugene and I think the biggest
> outstanding
> >>> >                             design portion is to figure out how
> dynamic
> >>> >                             work rebalancing would play out with the
> >>> >                             portability APIs. This includes
> reporting of
> >>> >                             progress from within a bundle. I know
> that
> >>> >                             Eugene had shared some documents in this
> >>> >                             regard but the position / split models
> >>> >                             didn't work too cleanly in a unified
> sense
> >>> >                             for bounded and unbounded
> SplittableDoFns.
> >>> >                             It will likely take me awhile to gather
> my
> >>> >                             thoughts but could use your expertise as
> to
> >>> >                             how compatible these ideas are with
> respect
> >>> >                             to to IOs and runners
> >>> >                             Flink/Spark/Dataflow/Samza/Apex/... and
> >>> >                             obviously help during implementation.
> >>> >
>

Re: SplittableDoFn

Posted by Ismaël Mejía <ie...@gmail.com>.
Thanks a lot Luke for bringing this back to the mailing list and Ryan for taking
the notes.

I would like to know if there was some discussion, or if you guys have given
some thought to the required changes in the SDK (API) part. What will be the
equivalent of `splitAtFraction` and what should IO authors do to support it..

On Sat, Sep 15, 2018 at 1:52 AM Lukasz Cwik <lc...@google.com> wrote:
>
> Thanks to everyone who joined and for the questions asked.
>
> Ryan graciously collected notes of the discussion: https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing
>
> The summary was that bringing BoundedSource/UnboundedSource into using a unified backlog-reporting mechanism with optional other signals that Dataflow has found useful (such as is the remaining restriction splittable (yes, no, unknown)). Other runners can use or not. SDFs should report backlog and watermark as minimum bar. The backlog should use an arbitrary precision float such as Java BigDecimal to prevent issues where limited precision removes the ability to compute delta efficiently.
>
>
>
> On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>> Here is the link to join the discussion: https://meet.google.com/idc-japs-hwf
>> Remember that it is this Friday Sept 14th from 11am-noon PST.
>>
>>
>>
>> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <mx...@apache.org> wrote:
>>>
>>> Thanks for moving forward with this, Lukasz!
>>>
>>> Unfortunately, can't make it on Friday but I'll sync with somebody on
>>> the call (e.g. Ryan) about your discussion.
>>>
>>> On 08.09.18 02:00, Lukasz Cwik wrote:
>>> > Thanks for everyone who wanted to fill out the doodle poll. The most
>>> > popular time was Friday Sept 14th from 11am-noon PST. I'll send out a
>>> > calendar invite and meeting link early next week.
>>> >
>>> > I have received a lot of feedback on the document and have addressed
>>> > some parts of it including:
>>> > * clarifying terminology
>>> > * processing skew due to some restrictions having their watermarks much
>>> > further behind then others affecting scheduling of bundles by runners
>>> > * external throttling & I/O wait overhead reporting to make sure we
>>> > don't overscale
>>> >
>>> > Areas that still need additional feedback and details are:
>>> > * reporting progress around the work that is done and is active
>>> > * more examples
>>> > * unbounded restrictions being caused by an unbounded number of splits
>>> > of existing unbounded restrictions (infinite work growth)
>>> > * whether we should be reporting this information at the PTransform
>>> > level or at the bundle level
>>> >
>>> >
>>> >
>>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lcwik@google.com
>>> > <ma...@google.com>> wrote:
>>> >
>>> >     Thanks to all those who have provided interest in this topic by the
>>> >     questions they have asked on the doc already and for those
>>> >     interested in having this discussion. I have setup this doodle to
>>> >     allow people to provide their availability:
>>> >     https://doodle.com/poll/nrw7w84255xnfwqy
>>> >
>>> >     I'll send out the chosen time based upon peoples availability and a
>>> >     Hangout link by end of day Friday so please mark your availability
>>> >     using the link above.
>>> >
>>> >     The agenda of the meeting will be as follows:
>>> >     * Overview of the proposal
>>> >     * Enumerate and discuss/answer questions brought up in the meeting
>>> >
>>> >     Note that all questions and any discussions/answers provided will be
>>> >     added to the doc for those who are unable to attend.
>>> >
>>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>>> >     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>>> >
>>> >         +1
>>> >
>>> >         Regards
>>> >         JB
>>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <lcwik@google.com
>>> >         <ma...@google.com>> a écrit:
>>> >
>>> >             That is possible, I'll take people's date/time suggestions
>>> >             and create a simple online poll with them.
>>> >
>>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>>> >             <robertwb@google.com <ma...@google.com>> wrote:
>>> >
>>> >                 Thanks for taking this up. I added some comments to the
>>> >                 doc. A European-friendly time for discussion would
>>> >                 be great.
>>> >
>>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>>> >                 <lcwik@google.com <ma...@google.com>> wrote:
>>> >
>>> >                     I came up with a proposal[1] for a progress model
>>> >                     solely based off of the backlog and that splits
>>> >                     should be based upon the remaining backlog we want
>>> >                     the SDK to split at. I also give recommendations to
>>> >                     runner authors as to how an autoscaling system could
>>> >                     work based upon the measured backlog. A lot of
>>> >                     discussions around progress reporting and splitting
>>> >                     in the past has always been around finding an
>>> >                     optimal solution, after reading a lot of information
>>> >                     about work stealing, I don't believe there is a
>>> >                     general solution and it really is upto
>>> >                     SplittableDoFns to be well behaved. I did not do
>>> >                     much work in classifying what a well behaved
>>> >                     SplittableDoFn is though. Much of this work builds
>>> >                     off ideas that Eugene had documented in the past[2].
>>> >
>>> >                     I could use the communities wide knowledge of
>>> >                     different I/Os to see if computing the backlog is
>>> >                     practical in the way that I'm suggesting and to
>>> >                     gather people's feedback.
>>> >
>>> >                     If there is a lot of interest, I would like to hold
>>> >                     a community video conference between Sept 10th and
>>> >                     14th about this topic. Please reply with your
>>> >                     availability by Sept 6th if your interested.
>>> >
>>> >                     1: https://s.apache.org/beam-bundles-backlog-splitting
>>> >                     2: https://s.apache.org/beam-breaking-fusion
>>> >
>>> >                     On Mon, Aug 13, 2018 at 10:21 AM Jean-Baptiste
>>> >                     Onofré <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>>> >
>>> >                         Awesome !
>>> >
>>> >                         Thanks Luke !
>>> >
>>> >                         I plan to work with you and others on this one.
>>> >
>>> >                         Regards
>>> >                         JB
>>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
>>> >                         <lcwik@google.com <ma...@google.com>> a
>>> >                         écrit:
>>> >
>>> >                             I wanted to reach out that I will be
>>> >                             continuing from where Eugene left off with
>>> >                             SplittableDoFn. I know that many of you have
>>> >                             done a bunch of work with IOs and/or runner
>>> >                             integration for SplittableDoFn and would
>>> >                             appreciate your help in advancing this
>>> >                             awesome idea. If you have questions or
>>> >                             things you want to get reviewed related to
>>> >                             SplittableDoFn, feel free to send them my
>>> >                             way or include me on anything SplittableDoFn
>>> >                             related.
>>> >
>>> >                             I was part of several discussions with
>>> >                             Eugene and I think the biggest outstanding
>>> >                             design portion is to figure out how dynamic
>>> >                             work rebalancing would play out with the
>>> >                             portability APIs. This includes reporting of
>>> >                             progress from within a bundle. I know that
>>> >                             Eugene had shared some documents in this
>>> >                             regard but the position / split models
>>> >                             didn't work too cleanly in a unified sense
>>> >                             for bounded and unbounded SplittableDoFns.
>>> >                             It will likely take me awhile to gather my
>>> >                             thoughts but could use your expertise as to
>>> >                             how compatible these ideas are with respect
>>> >                             to to IOs and runners
>>> >                             Flink/Spark/Dataflow/Samza/Apex/... and
>>> >                             obviously help during implementation.
>>> >

Re: SplittableDoFn

Posted by Lukasz Cwik <lc...@google.com>.
Thanks to everyone who joined and for the questions asked.

Ryan graciously collected notes of the discussion:
https://docs.google.com/document/d/1kjJLGIiNAGvDiUCMEtQbw8tyOXESvwGeGZLL-0M06fQ/edit?usp=sharing

The summary was that bringing BoundedSource/UnboundedSource into using a
unified backlog-reporting mechanism with optional other signals that
Dataflow has found useful (such as is the remaining restriction splittable
(yes, no, unknown)). Other runners can use or not. SDFs should report
backlog and watermark as minimum bar. The backlog should use an arbitrary
precision float such as Java BigDecimal to prevent issues where limited
precision removes the ability to compute delta efficiently.



On Wed, Sep 12, 2018 at 3:54 PM Lukasz Cwik <lc...@google.com> wrote:

> Here is the link to join the discussion:
> https://meet.google.com/idc-japs-hwf
> Remember that it is this Friday Sept 14th from 11am-noon PST.
>
>
>
> On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <mx...@apache.org> wrote:
>
>> Thanks for moving forward with this, Lukasz!
>>
>> Unfortunately, can't make it on Friday but I'll sync with somebody on
>> the call (e.g. Ryan) about your discussion.
>>
>> On 08.09.18 02:00, Lukasz Cwik wrote:
>> > Thanks for everyone who wanted to fill out the doodle poll. The most
>> > popular time was Friday Sept 14th from 11am-noon PST. I'll send out a
>> > calendar invite and meeting link early next week.
>> >
>> > I have received a lot of feedback on the document and have addressed
>> > some parts of it including:
>> > * clarifying terminology
>> > * processing skew due to some restrictions having their watermarks much
>> > further behind then others affecting scheduling of bundles by runners
>> > * external throttling & I/O wait overhead reporting to make sure we
>> > don't overscale
>> >
>> > Areas that still need additional feedback and details are:
>> > * reporting progress around the work that is done and is active
>> > * more examples
>> > * unbounded restrictions being caused by an unbounded number of splits
>> > of existing unbounded restrictions (infinite work growth)
>> > * whether we should be reporting this information at the PTransform
>> > level or at the bundle level
>> >
>> >
>> >
>> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lcwik@google.com
>> > <ma...@google.com>> wrote:
>> >
>> >     Thanks to all those who have provided interest in this topic by the
>> >     questions they have asked on the doc already and for those
>> >     interested in having this discussion. I have setup this doodle to
>> >     allow people to provide their availability:
>> >     https://doodle.com/poll/nrw7w84255xnfwqy
>> >
>> >     I'll send out the chosen time based upon peoples availability and a
>> >     Hangout link by end of day Friday so please mark your availability
>> >     using the link above.
>> >
>> >     The agenda of the meeting will be as follows:
>> >     * Overview of the proposal
>> >     * Enumerate and discuss/answer questions brought up in the meeting
>> >
>> >     Note that all questions and any discussions/answers provided will be
>> >     added to the doc for those who are unable to attend.
>> >
>> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>> >     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
>> >
>> >         +1
>> >
>> >         Regards
>> >         JB
>> >         Le 31 août 2018, à 18:22, Lukasz Cwik <lcwik@google.com
>> >         <ma...@google.com>> a écrit:
>> >
>> >             That is possible, I'll take people's date/time suggestions
>> >             and create a simple online poll with them.
>> >
>> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>> >             <robertwb@google.com <ma...@google.com>> wrote:
>> >
>> >                 Thanks for taking this up. I added some comments to the
>> >                 doc. A European-friendly time for discussion would
>> >                 be great.
>> >
>> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>> >                 <lcwik@google.com <ma...@google.com>> wrote:
>> >
>> >                     I came up with a proposal[1] for a progress model
>> >                     solely based off of the backlog and that splits
>> >                     should be based upon the remaining backlog we want
>> >                     the SDK to split at. I also give recommendations to
>> >                     runner authors as to how an autoscaling system could
>> >                     work based upon the measured backlog. A lot of
>> >                     discussions around progress reporting and splitting
>> >                     in the past has always been around finding an
>> >                     optimal solution, after reading a lot of information
>> >                     about work stealing, I don't believe there is a
>> >                     general solution and it really is upto
>> >                     SplittableDoFns to be well behaved. I did not do
>> >                     much work in classifying what a well behaved
>> >                     SplittableDoFn is though. Much of this work builds
>> >                     off ideas that Eugene had documented in the past[2].
>> >
>> >                     I could use the communities wide knowledge of
>> >                     different I/Os to see if computing the backlog is
>> >                     practical in the way that I'm suggesting and to
>> >                     gather people's feedback.
>> >
>> >                     If there is a lot of interest, I would like to hold
>> >                     a community video conference between Sept 10th and
>> >                     14th about this topic. Please reply with your
>> >                     availability by Sept 6th if your interested.
>> >
>> >                     1:
>> https://s.apache.org/beam-bundles-backlog-splitting
>> >                     2: https://s.apache.org/beam-breaking-fusion
>> >
>> >                     On Mon, Aug 13, 2018 at 10:21 AM Jean-Baptiste
>> >                     Onofré <jb@nanthrax.net <ma...@nanthrax.net>>
>> wrote:
>> >
>> >                         Awesome !
>> >
>> >                         Thanks Luke !
>> >
>> >                         I plan to work with you and others on this one.
>> >
>> >                         Regards
>> >                         JB
>> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
>> >                         <lcwik@google.com <ma...@google.com>> a
>> >                         écrit:
>> >
>> >                             I wanted to reach out that I will be
>> >                             continuing from where Eugene left off with
>> >                             SplittableDoFn. I know that many of you have
>> >                             done a bunch of work with IOs and/or runner
>> >                             integration for SplittableDoFn and would
>> >                             appreciate your help in advancing this
>> >                             awesome idea. If you have questions or
>> >                             things you want to get reviewed related to
>> >                             SplittableDoFn, feel free to send them my
>> >                             way or include me on anything SplittableDoFn
>> >                             related.
>> >
>> >                             I was part of several discussions with
>> >                             Eugene and I think the biggest outstanding
>> >                             design portion is to figure out how dynamic
>> >                             work rebalancing would play out with the
>> >                             portability APIs. This includes reporting of
>> >                             progress from within a bundle. I know that
>> >                             Eugene had shared some documents in this
>> >                             regard but the position / split models
>> >                             didn't work too cleanly in a unified sense
>> >                             for bounded and unbounded SplittableDoFns.
>> >                             It will likely take me awhile to gather my
>> >                             thoughts but could use your expertise as to
>> >                             how compatible these ideas are with respect
>> >                             to to IOs and runners
>> >                             Flink/Spark/Dataflow/Samza/Apex/... and
>> >                             obviously help during implementation.
>> >
>>
>

Re: SplittableDoFn

Posted by Lukasz Cwik <lc...@google.com>.
Here is the link to join the discussion:
https://meet.google.com/idc-japs-hwf
Remember that it is this Friday Sept 14th from 11am-noon PST.



On Mon, Sep 10, 2018 at 7:30 AM Maximilian Michels <mx...@apache.org> wrote:

> Thanks for moving forward with this, Lukasz!
>
> Unfortunately, can't make it on Friday but I'll sync with somebody on
> the call (e.g. Ryan) about your discussion.
>
> On 08.09.18 02:00, Lukasz Cwik wrote:
> > Thanks for everyone who wanted to fill out the doodle poll. The most
> > popular time was Friday Sept 14th from 11am-noon PST. I'll send out a
> > calendar invite and meeting link early next week.
> >
> > I have received a lot of feedback on the document and have addressed
> > some parts of it including:
> > * clarifying terminology
> > * processing skew due to some restrictions having their watermarks much
> > further behind then others affecting scheduling of bundles by runners
> > * external throttling & I/O wait overhead reporting to make sure we
> > don't overscale
> >
> > Areas that still need additional feedback and details are:
> > * reporting progress around the work that is done and is active
> > * more examples
> > * unbounded restrictions being caused by an unbounded number of splits
> > of existing unbounded restrictions (infinite work growth)
> > * whether we should be reporting this information at the PTransform
> > level or at the bundle level
> >
> >
> >
> > On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lcwik@google.com
> > <ma...@google.com>> wrote:
> >
> >     Thanks to all those who have provided interest in this topic by the
> >     questions they have asked on the doc already and for those
> >     interested in having this discussion. I have setup this doodle to
> >     allow people to provide their availability:
> >     https://doodle.com/poll/nrw7w84255xnfwqy
> >
> >     I'll send out the chosen time based upon peoples availability and a
> >     Hangout link by end of day Friday so please mark your availability
> >     using the link above.
> >
> >     The agenda of the meeting will be as follows:
> >     * Overview of the proposal
> >     * Enumerate and discuss/answer questions brought up in the meeting
> >
> >     Note that all questions and any discussions/answers provided will be
> >     added to the doc for those who are unable to attend.
> >
> >     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
> >     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
> >
> >         +1
> >
> >         Regards
> >         JB
> >         Le 31 août 2018, à 18:22, Lukasz Cwik <lcwik@google.com
> >         <ma...@google.com>> a écrit:
> >
> >             That is possible, I'll take people's date/time suggestions
> >             and create a simple online poll with them.
> >
> >             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
> >             <robertwb@google.com <ma...@google.com>> wrote:
> >
> >                 Thanks for taking this up. I added some comments to the
> >                 doc. A European-friendly time for discussion would
> >                 be great.
> >
> >                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
> >                 <lcwik@google.com <ma...@google.com>> wrote:
> >
> >                     I came up with a proposal[1] for a progress model
> >                     solely based off of the backlog and that splits
> >                     should be based upon the remaining backlog we want
> >                     the SDK to split at. I also give recommendations to
> >                     runner authors as to how an autoscaling system could
> >                     work based upon the measured backlog. A lot of
> >                     discussions around progress reporting and splitting
> >                     in the past has always been around finding an
> >                     optimal solution, after reading a lot of information
> >                     about work stealing, I don't believe there is a
> >                     general solution and it really is upto
> >                     SplittableDoFns to be well behaved. I did not do
> >                     much work in classifying what a well behaved
> >                     SplittableDoFn is though. Much of this work builds
> >                     off ideas that Eugene had documented in the past[2].
> >
> >                     I could use the communities wide knowledge of
> >                     different I/Os to see if computing the backlog is
> >                     practical in the way that I'm suggesting and to
> >                     gather people's feedback.
> >
> >                     If there is a lot of interest, I would like to hold
> >                     a community video conference between Sept 10th and
> >                     14th about this topic. Please reply with your
> >                     availability by Sept 6th if your interested.
> >
> >                     1:
> https://s.apache.org/beam-bundles-backlog-splitting
> >                     2: https://s.apache.org/beam-breaking-fusion
> >
> >                     On Mon, Aug 13, 2018 at 10:21 AM Jean-Baptiste
> >                     Onofré <jb@nanthrax.net <ma...@nanthrax.net>>
> wrote:
> >
> >                         Awesome !
> >
> >                         Thanks Luke !
> >
> >                         I plan to work with you and others on this one.
> >
> >                         Regards
> >                         JB
> >                         Le 13 août 2018, à 19:14, Lukasz Cwik
> >                         <lcwik@google.com <ma...@google.com>> a
> >                         écrit:
> >
> >                             I wanted to reach out that I will be
> >                             continuing from where Eugene left off with
> >                             SplittableDoFn. I know that many of you have
> >                             done a bunch of work with IOs and/or runner
> >                             integration for SplittableDoFn and would
> >                             appreciate your help in advancing this
> >                             awesome idea. If you have questions or
> >                             things you want to get reviewed related to
> >                             SplittableDoFn, feel free to send them my
> >                             way or include me on anything SplittableDoFn
> >                             related.
> >
> >                             I was part of several discussions with
> >                             Eugene and I think the biggest outstanding
> >                             design portion is to figure out how dynamic
> >                             work rebalancing would play out with the
> >                             portability APIs. This includes reporting of
> >                             progress from within a bundle. I know that
> >                             Eugene had shared some documents in this
> >                             regard but the position / split models
> >                             didn't work too cleanly in a unified sense
> >                             for bounded and unbounded SplittableDoFns.
> >                             It will likely take me awhile to gather my
> >                             thoughts but could use your expertise as to
> >                             how compatible these ideas are with respect
> >                             to to IOs and runners
> >                             Flink/Spark/Dataflow/Samza/Apex/... and
> >                             obviously help during implementation.
> >
>

Re: SplittableDoFn

Posted by Maximilian Michels <mx...@apache.org>.
Thanks for moving forward with this, Lukasz!

Unfortunately, can't make it on Friday but I'll sync with somebody on 
the call (e.g. Ryan) about your discussion.

On 08.09.18 02:00, Lukasz Cwik wrote:
> Thanks for everyone who wanted to fill out the doodle poll. The most 
> popular time was Friday Sept 14th from 11am-noon PST. I'll send out a 
> calendar invite and meeting link early next week.
> 
> I have received a lot of feedback on the document and have addressed 
> some parts of it including:
> * clarifying terminology
> * processing skew due to some restrictions having their watermarks much 
> further behind then others affecting scheduling of bundles by runners
> * external throttling & I/O wait overhead reporting to make sure we 
> don't overscale
> 
> Areas that still need additional feedback and details are:
> * reporting progress around the work that is done and is active
> * more examples
> * unbounded restrictions being caused by an unbounded number of splits 
> of existing unbounded restrictions (infinite work growth)
> * whether we should be reporting this information at the PTransform 
> level or at the bundle level
> 
> 
> 
> On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lcwik@google.com 
> <ma...@google.com>> wrote:
> 
>     Thanks to all those who have provided interest in this topic by the
>     questions they have asked on the doc already and for those
>     interested in having this discussion. I have setup this doodle to
>     allow people to provide their availability:
>     https://doodle.com/poll/nrw7w84255xnfwqy
> 
>     I'll send out the chosen time based upon peoples availability and a
>     Hangout link by end of day Friday so please mark your availability
>     using the link above.
> 
>     The agenda of the meeting will be as follows:
>     * Overview of the proposal
>     * Enumerate and discuss/answer questions brought up in the meeting
> 
>     Note that all questions and any discussions/answers provided will be
>     added to the doc for those who are unable to attend.
> 
>     On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré
>     <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
> 
>         +1
> 
>         Regards
>         JB
>         Le 31 août 2018, à 18:22, Lukasz Cwik <lcwik@google.com
>         <ma...@google.com>> a écrit:
> 
>             That is possible, I'll take people's date/time suggestions
>             and create a simple online poll with them.
> 
>             On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw
>             <robertwb@google.com <ma...@google.com>> wrote:
> 
>                 Thanks for taking this up. I added some comments to the
>                 doc. A European-friendly time for discussion would
>                 be great.
> 
>                 On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik
>                 <lcwik@google.com <ma...@google.com>> wrote:
> 
>                     I came up with a proposal[1] for a progress model
>                     solely based off of the backlog and that splits
>                     should be based upon the remaining backlog we want
>                     the SDK to split at. I also give recommendations to
>                     runner authors as to how an autoscaling system could
>                     work based upon the measured backlog. A lot of
>                     discussions around progress reporting and splitting
>                     in the past has always been around finding an
>                     optimal solution, after reading a lot of information
>                     about work stealing, I don't believe there is a
>                     general solution and it really is upto
>                     SplittableDoFns to be well behaved. I did not do
>                     much work in classifying what a well behaved
>                     SplittableDoFn is though. Much of this work builds
>                     off ideas that Eugene had documented in the past[2].
> 
>                     I could use the communities wide knowledge of
>                     different I/Os to see if computing the backlog is
>                     practical in the way that I'm suggesting and to
>                     gather people's feedback.
> 
>                     If there is a lot of interest, I would like to hold
>                     a community video conference between Sept 10th and
>                     14th about this topic. Please reply with your
>                     availability by Sept 6th if your interested.
> 
>                     1: https://s.apache.org/beam-bundles-backlog-splitting
>                     2: https://s.apache.org/beam-breaking-fusion
> 
>                     On Mon, Aug 13, 2018 at 10:21 AM Jean-Baptiste
>                     Onofré <jb@nanthrax.net <ma...@nanthrax.net>> wrote:
> 
>                         Awesome !
> 
>                         Thanks Luke !
> 
>                         I plan to work with you and others on this one.
> 
>                         Regards
>                         JB
>                         Le 13 août 2018, à 19:14, Lukasz Cwik
>                         <lcwik@google.com <ma...@google.com>> a
>                         écrit:
> 
>                             I wanted to reach out that I will be
>                             continuing from where Eugene left off with
>                             SplittableDoFn. I know that many of you have
>                             done a bunch of work with IOs and/or runner
>                             integration for SplittableDoFn and would
>                             appreciate your help in advancing this
>                             awesome idea. If you have questions or
>                             things you want to get reviewed related to
>                             SplittableDoFn, feel free to send them my
>                             way or include me on anything SplittableDoFn
>                             related.
> 
>                             I was part of several discussions with
>                             Eugene and I think the biggest outstanding
>                             design portion is to figure out how dynamic
>                             work rebalancing would play out with the
>                             portability APIs. This includes reporting of
>                             progress from within a bundle. I know that
>                             Eugene had shared some documents in this
>                             regard but the position / split models
>                             didn't work too cleanly in a unified sense
>                             for bounded and unbounded SplittableDoFns.
>                             It will likely take me awhile to gather my
>                             thoughts but could use your expertise as to
>                             how compatible these ideas are with respect
>                             to to IOs and runners
>                             Flink/Spark/Dataflow/Samza/Apex/... and
>                             obviously help during implementation.
> 

Re: SplittableDoFn

Posted by Lukasz Cwik <lc...@google.com>.
Thanks for everyone who wanted to fill out the doodle poll. The most
popular time was Friday Sept 14th from 11am-noon PST. I'll send out a
calendar invite and meeting link early next week.

I have received a lot of feedback on the document and have addressed some
parts of it including:
* clarifying terminology
* processing skew due to some restrictions having their watermarks much
further behind then others affecting scheduling of bundles by runners
* external throttling & I/O wait overhead reporting to make sure we don't
overscale

Areas that still need additional feedback and details are:
* reporting progress around the work that is done and is active
* more examples
* unbounded restrictions being caused by an unbounded number of splits of
existing unbounded restrictions (infinite work growth)
* whether we should be reporting this information at the PTransform level
or at the bundle level



On Wed, Sep 5, 2018 at 1:53 PM Lukasz Cwik <lc...@google.com> wrote:

> Thanks to all those who have provided interest in this topic by the
> questions they have asked on the doc already and for those interested in
> having this discussion. I have setup this doodle to allow people to provide
> their availability:
> https://doodle.com/poll/nrw7w84255xnfwqy
>
> I'll send out the chosen time based upon peoples availability and a
> Hangout link by end of day Friday so please mark your availability using
> the link above.
>
> The agenda of the meeting will be as follows:
> * Overview of the proposal
> * Enumerate and discuss/answer questions brought up in the meeting
>
> Note that all questions and any discussions/answers provided will be added
> to the doc for those who are unable to attend.
>
> On Fri, Aug 31, 2018 at 9:47 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
>> +1
>>
>> Regards
>> JB
>> Le 31 août 2018, à 18:22, Lukasz Cwik <lc...@google.com> a écrit:
>>>
>>> That is possible, I'll take people's date/time suggestions and create a
>>> simple online poll with them.
>>>
>>> On Fri, Aug 31, 2018 at 2:22 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> Thanks for taking this up. I added some comments to the doc. A
>>>> European-friendly time for discussion would be great.
>>>>
>>>> On Fri, Aug 31, 2018 at 3:14 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> I came up with a proposal[1] for a progress model solely based off of
>>>>> the backlog and that splits should be based upon the remaining backlog we
>>>>> want the SDK to split at. I also give recommendations to runner authors as
>>>>> to how an autoscaling system could work based upon the measured backlog. A
>>>>> lot of discussions around progress reporting and splitting in the past has
>>>>> always been around finding an optimal solution, after reading a lot of
>>>>> information about work stealing, I don't believe there is a general
>>>>> solution and it really is upto SplittableDoFns to be well behaved. I did
>>>>> not do much work in classifying what a well behaved SplittableDoFn is
>>>>> though. Much of this work builds off ideas that Eugene had documented in
>>>>> the past[2].
>>>>>
>>>>> I could use the communities wide knowledge of different I/Os to see if
>>>>> computing the backlog is practical in the way that I'm suggesting and to
>>>>> gather people's feedback.
>>>>>
>>>>> If there is a lot of interest, I would like to hold a community video
>>>>> conference between Sept 10th and 14th about this topic. Please reply with
>>>>> your availability by Sept 6th if your interested.
>>>>>
>>>>> 1: https://s.apache.org/beam-bundles-backlog-splitting
>>>>> 2: https://s.apache.org/beam-breaking-fusion
>>>>>
>>>>> On Mon, Aug 13, 2018 at 10:21 AM Jean-Baptiste Onofré <jb...@nanthrax.net>
>>>>> wrote:
>>>>>
>>>>>> Awesome !
>>>>>>
>>>>>> Thanks Luke !
>>>>>>
>>>>>> I plan to work with you and others on this one.
>>>>>>
>>>>>> Regards
>>>>>> JB
>>>>>> Le 13 août 2018, à 19:14, Lukasz Cwik <lc...@google.com> a écrit:
>>>>>>>
>>>>>>> I wanted to reach out that I will be continuing from where Eugene
>>>>>>> left off with SplittableDoFn. I know that many of you have done a bunch of
>>>>>>> work with IOs and/or runner integration for SplittableDoFn and would
>>>>>>> appreciate your help in advancing this awesome idea. If you have questions
>>>>>>> or things you want to get reviewed related to SplittableDoFn, feel free to
>>>>>>> send them my way or include me on anything SplittableDoFn related.
>>>>>>>
>>>>>>> I was part of several discussions with Eugene and I think the
>>>>>>> biggest outstanding design portion is to figure out how dynamic work
>>>>>>> rebalancing would play out with the portability APIs. This includes
>>>>>>> reporting of progress from within a bundle. I know that Eugene had shared
>>>>>>> some documents in this regard but the position / split models didn't work
>>>>>>> too cleanly in a unified sense for bounded and unbounded SplittableDoFns.
>>>>>>> It will likely take me awhile to gather my thoughts but could use your
>>>>>>> expertise as to how compatible these ideas are with respect to to IOs and
>>>>>>> runners Flink/Spark/Dataflow/Samza/Apex/... and obviously help during
>>>>>>> implementation.
>>>>>>>
>>>>>>