You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Eugene Kirpichov <ki...@google.com.INVALID> on 2017/04/01 08:17:04 UTC

Re: Call for help: let's add Splittable DoFn to Spark, Flink and Apex runners

Hey all,

The Flink PR has been merged, and thus - Flink becomes the first
distributed runner to support Splittable DoFn!!!
Thank you, Aljoscha!

Looking forward to Spark and Apex, and continuing work on Dataflow.
I'll also send proposals about a couple of new ideas related to SDF next
week.

On Thu, Mar 30, 2017 at 9:08 AM Amit Sela <am...@gmail.com> wrote:

> I will not be able to make it this weekend, too busy. Let's chat at the
> beginning of next week and see what's on my plate.
>
> On Tue, Mar 28, 2017 at 5:44 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Thanks for the offers, guys! The code is finished, though. I only need
> > to do the last touch ups.
> >
> > On Tue, Mar 28, 2017, at 09:16, JingsongLee wrote:
> > > Hi Aljoscha,
> > > I would like to work on the Flink runner with you.
> > >
> >
> Best,JingsongLee------------------------------------------------------------------From:Jean-Baptiste
> > > Onofré <jb...@nanthrax.net>Time:2017 Mar 28 (Tue) 14:04To:dev
> > > <de...@beam.apache.org>Subject:Re: Call for help: let's add Splittable
> > DoFn
> > > to Spark, Flink and Apex runners
> > > Hi Aljoscha,
> > >
> > > do you need some help on this ?
> > >
> > > Regards
> > > JB
> > >
> > > On 03/28/2017 08:00 AM, Aljoscha Krettek wrote:
> > > > Hi,
> > > > sorry for being so slow but I’m currently traveling.
> > > >
> > > > The Flink code works but I think it could benefit from some
> refactoring
> > > > to make the code nice and maintainable.
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > On Tue, Mar 28, 2017, at 07:40, Jean-Baptiste Onofré wrote:
> > > >> I add myself on the Spark runner.
> > > >>
> > > >> Regards
> > > >> JB
> > > >>
> > > >> On 03/27/2017 08:18 PM, Eugene Kirpichov wrote:
> > > >>> Hi all,
> > > >>>
> > > >>> Let's continue the ~bi-weekly sync-ups about state of SDF support
> in
> > > >>> Spark/Flink/Apex runners.
> > > >>>
> > > >>> Spark:
> > >
> > >>> Amit, Aviem, Ismaël - when would be a good time for you; does same
> time
> > > >>> work (8am PST this Friday)? Who else would like to join?
> > > >>>
> > > >>> Flink:
> > > >>> I pinged the PR, but - Aljoscha, do you think it's worth discussing
> > > >>> anything there over a videocall?
> > > >>>
> > > >>> Apex:
> > >
> > >>> Thomas - how about same time next Monday? (9:30am PST) Who else
> would like
> > > >>> to join?
> > > >>>
> > > >>> On Mon, Mar 20, 2017 at 9:59 AM Eugene Kirpichov <
> > kirpichov@google.com>
> > > >>> wrote:
> > > >>>
> > > >>>> Meeting notes:
> > > >>>> Me and Thomas had a video call and we pretty much walked through
> the
> > >
> > >>>> implementation of SDF in the runner-agnostic part and in the direct
> runner.
> > > >>>> Flink and Apex are pretty similar, so likely
> > > >>>> https://github.com/apache/beam/pull/2235
> >  (the Flink PR) will give a very
> > > >>>> good guideline as to how to do this in Apex.
> > > >>>> Will talk again in ~2 weeks; and will involve +David Yan
> > > >>>> <davidyan@google.com
> > > who is also on Apex and currently conveniently
> > >
> > >>>> works on the Google Dataflow team and, from in-person conversation,
> was
> > > >>>> interested in being involved :)
> > > >>>>
> > > >>>> On Mon, Mar 20, 2017 at 7:34 AM Eugene Kirpichov <
> > kirpichov@google.com>
> > > >>>> wrote:
> > > >>>>
> > > >>>> Thomas - yes, 9:30 works, shall we do that?
> > > >>>>
> > >
> > >>>> JB - excellent! You can start experimenting already, using direct
> runner!
> > > >>>>
> > > >>>> On Mon, Mar 20, 2017, 2:26 AM Jean-Baptiste Onofré <
> jb@nanthrax.net
> > >
> > > >>>> wrote:
> > > >>>>
> > > >>>> Hi Eugene,
> > > >>>>
> > > >>>> Thanks for the meeting notes !
> > > >>>>
> > >
> > >>>> I will be in the next call and Ismaël also provided to me some
> updates.
> > > >>>>
> > >
> > >>>> I will sync with Amit on Spark runner and start to experiment and
> test SDF
> > > >>>> on
> > > >>>> the JMS IO.
> > > >>>>
> > > >>>> Thanks !
> > > >>>> Regards
> > > >>>> JB
> > > >>>>
> > > >>>> On 03/17/2017 04:36 PM, Eugene Kirpichov wrote:
> > > >>>>> Meeting notes from today's call with Amit, Aviem and Ismaël:
> > > >>>>>
> > > >>>>> Spark has 2 types of stateful operators; a cheap one intended for
> > > >>>> updating
> > >
> > >>>>> elements (works with state but not with timers) and an expensive
> one.
> > > >>>> I.e.
> > >
> > >>>>> there's no efficient direct counterpart to Beam's keyed state
> model. In
> > >
> > >>>>> implementation of Beam State & Timers API, Spark runner will use
> the
> > >
> > >>>>> cheaper one for state and the expensive one for timers. So, for
> SDF,
> > > >>>> which
> > >
> > >>>>> in the runner-agnostic SplittableParDo expansion needs both state
> and
> > >
> > >>>>> timers, we'll need the expensive one - but this should be fine
> since with
> > >
> > >>>>> SDF the bottleneck should be in the ProcessElement call itself,
> not in
> > > >>>>> splitting/scheduling it.
> > > >>>>>
> > >
> > >>>>> For Spark batch runner, implementing SDF might be still simpler:
> runner
> > >
> > >>>>> will just not request any checkpointing. Hard parts about
> SDF/batch are
> > >
> > >>>>> dynamic rebalancing and size estimation APIs - they will be
> refined this
> > > >>>>> quarter, but it's ok to initially not have them.
> > > >>>>>
> > > >>>>> Spark runner might use a different expansion of SDF not involving
> > > >>>>> KeyedWorkItem's (i.e. not overriding the GBKIntoKeyedWorkItems
> > > >>>> transform),
> > >
> > >>>>> though still striving to reuse as much code as possible from the
> standard
> > > >>>>> expansion implemented in SplittableParDo, at least ProcessFn.
> > > >>>>>
> > > >>>>> Testing questions:
> > > >>>>> - Spark runner already implements termination on
> > > >>>>> watermarks-reaching-infinity properly.
> > >
> > >>>>> - Q: How to test that the runner actually splits? A: The code that
> splits
> > > >>>>> is in the runner-agnostic, so a runner would have to deliberately
> > > >>>> sabotage
> > > >>>>> it in order to break it - unlikely. Also, for semantics we have
> > >
> > >>>>> runner-agnostic ROS tests; but at some point will need performance
> tests
> > > >>>>> too.
> > > >>>>>
> > > >>>>> Next steps:
> > > >>>>> - Amit will look at the standard SplittableParDo expansion and
> > >
> > >>>>> implementation in Flink and Direct runner, will write up a doc
> about how
> > > >>>> to
> > > >>>>> do this in Spark.
> > > >>>>> - Another videotalk in 2 weeks to check on progress/issues.
> > > >>>>>
> > > >>>>> Thanks all!
> > > >>>>>
> > > >>>>> On Fri, Mar 17, 2017 at 8:29 AM Eugene Kirpichov <
> > kirpichov@google.com>
> > > >>>>> wrote:
> > > >>>>>
> > >
> > >>>>>> Yes, Monday morning works! How about also 8am PST, same Hangout
> link -
> > > >>>>>> does that work for you?
> > > >>>>>>
> > > >>>>>> On Fri, Mar 17, 2017 at 7:50 AM Thomas Weise <
> > thomas.weise@gmail.com>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>> Eugene,
> > > >>>>>>
> > >
> > >>>>>> I cannot make it for the call today. Would Monday morning work
> for you
> > > >>>> to
> > > >>>>>> discuss the Apex changes?
> > > >>>>>>
> > > >>>>>> Thanks
> > > >>>>>>
> > > >>>>>> On Tue, Mar 14, 2017 at 7:27 PM, Eugene Kirpichov <
> > > >>>>>> kirpichov@google.com.invalid> wrote:
> > > >>>>>>
> > >
> > >>>>>>> Hi! Please feel free to join this call, but I think we'd be
> mostly
> > >
> > >>>>>>> discussing how to do it in the Spark runner in particular; so
> we'll
> > > >>>>>>> probably need another call for Apex anyway.
> > > >>>>>>>
> > > >>>>>>> On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <thw@apache.org
> > > wrote:
> > > >>>>>>>
> > > >>>>>>>> Hi Eugene,
> > > >>>>>>>>
> > >
> > >>>>>>>> This would work for me also. Please let me know if you want to
> keep
> > > >>>> the
> > > >>>>>>>> Apex related discussion separate or want me to join this call.
> > > >>>>>>>>
> > > >>>>>>>> Thanks,
> > > >>>>>>>> Thomas
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov <
> > > >>>>>>>> kirpichov@google.com.invalid> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Sure, Friday morning sounds good. How about 9am Friday PST,
> at
> > > >>>>>>> videocall
> > > >>>>>>>> by
> > > >>>>>>>>> link
> > > >>>>>>
> https://hangouts.google.com/hangouts/_/google.com/splittabledofn
> > > >>>>>>> ?
> > > >>>>>>>>>
> > > >>>>>>>>> On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <
> > amitsela33@gmail.com>
> > > >>>>>>> wrote:
> > > >>>>>>>>>
> > >
> > >>>>>>>>>> PST mornings are better, because they are evening/nights for
> me.
> > > >>>>>>> Friday
> > > >>>>>>>>>> would work-out best for me.
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov
> > > >>>>>>>>>> <ki...@google.com.invalid> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Awesome!!!
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Amit - remind me your time zone? JB, do you want to join?
> > > >>>>>>>>>>> I'm free this week all afternoons (say after 2pm) in
> Pacific
> > > >>>>>> Time,
> > > >>>>>>>> and
> > > >>>>>>>>>>> mornings of Wed & Fri. We'll probably need half an hour to
> an
> > > >>>>>> hour.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek <
> > > >>>>>>>> aljoscha@apache.org>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> I whipped up a quick version for Flink that seems to work:
> > > >>>>>>>>>>>> https://github.com/apache/beam/pull/2235
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> There are still two failing tests, as described in the PR.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote:
> > > >>>>>>>>>>>>> +1 for a video call. I think it should be pretty straight
> > > >>>>>>> forward
> > > >>>>>>>>> for
> > > >>>>>>>>>>> the
> > >
> > >>>>>>>>>>>>> Spark runner after the work on read from UnboundedSource
> and
> > > >>>>>>>> after
> > > >>>>>>>>>>>>> GroupAlsoByWindow, but from my experience such a call
> could
> > > >>>>>>> move
> > > >>>>>>>> us
> > > >>>>>>>>>>>>> forward
> > > >>>>>>>>>>>>> fast enough.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov <
> > > >>>>>>>> kirpichov@google.com
> > > >>>>>>>>>>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Let us continue working on this. I am back from various
> > > >>>>>>> travels
> > > >>>>>>>>> and
> > > >>>>>>>>>>> am
> > > >>>>>>>>>>>>>> eager to help.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Amit, JB - would you like to perhaps have a videocall to
> > > >>>>>> hash
> > > >>>>>>>>> this
> > > >>>>>>>>>>> out
> > > >>>>>>>>>>>> for
> > > >>>>>>>>>>>>>> the Spark runner?
> > > >>>>>>>>>>>>>>
> > >
> > >>>>>>>>>>>>>> Aljoscha - are the necessary Flink changes done / or is
> the
> > > >>>>>>>> need
> > > >>>>>>>>>> for
> > > >>>>>>>>>>>> them
> > > >>>>>>>>>>>>>> obviated by using the (existing) runner-facing
> state/timer
> > > >>>>>>>> APIs?
> > > >>>>>>>>>>>> Should we
> > > >>>>>>>>>>>>>> have a videocall too?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thomas - what do you think about getting this into Apex
> > > >>>>>>> runner?
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> (I think videocalls will allow to make rapid progress,
> but
> > > >>>>>>> it's
> > > >>>>>>>>>>>> probably a
> > > >>>>>>>>>>>>>> better idea to keep them separate since they'll involve
> a
> > > >>>>>> lot
> > > >>>>>>>> of
> > > >>>>>>>>>>>>>> runner-specific details)
> > > >>>>>>>>>>>>>>
> > >
> > >>>>>>>>>>>>>> PS - The completion of this in Dataflow streaming runner
> is
> > > >>>>>>>>>> currently
> > > >>>>>>>>>>>>>> waiting only on having a small service-side change
> > > >>>>>>> implemented
> > > >>>>>>>>> and
> > > >>>>>>>>>>>> rolled
> > > >>>>>>>>>>>>>> out for termination of streaming jobs.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles <
> > > >>>>>>>> klk@google.com>
> > > >>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>
> > >
> > >>>>>>>>>>>>>> I recommend proceeding with the runner-facing state &
> timer
> > > >>>>>>>> APIs;
> > > >>>>>>>>>>> they
> > > >>>>>>>>>>>> are
> > > >>>>>>>>>>>>>> lower-level and more appropriate for this. All runners
> > > >>>>>>> provide
> > > >>>>>>>>> them
> > > >>>>>>>>>>> or
> > > >>>>>>>>>>>> use
> > > >>>>>>>>>>>>>> runners/core implementations, as they are needed for
> > > >>>>>>>> triggering.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov <
> > > >>>>>>>>>>>> kirpichov@google.com>
> > > >>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks Aljoscha!
> > > >>>>>>>>>>>>>>
> > >
> > >>>>>>>>>>>>>> Minor note: I'm not familiar with what level of support
> for
> > > >>>>>>>>> timers
> > > >>>>>>>>>>>> Flink
> > > >>>>>>>>>>>>>> currently has - however SDF in Direct and Dataflow
> runner
> > > >>>>>>>>> currently
> > > >>>>>>>>>>>> does
> > > >>>>>>>>>>>>>> not use the user-facing state/timer APIs - rather, it
> uses
> > > >>>>>>> the
> > > >>>>>>>>>>>>>> runner-facing APIs (StateInternals and TimerInternals) -
> > > >>>>>>>> perhaps
> > > >>>>>>>>>>> Flink
> > > >>>>>>>>>>>>>> already implements these. We may want to change this,
> but
> > > >>>>>> for
> > > >>>>>>>> now
> > > >>>>>>>>>>> it's
> > > >>>>>>>>>>>> good
> > > >>>>>>>>>>>>>> enough (besides, SDF uses watermark holds, which are not
> > > >>>>>>>>> supported
> > > >>>>>>>>>> by
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> user-facing state API yet).
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek <
> > > >>>>>>>>>>>>>> aljoscha@data-artisans.com> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks for the motivation, Eugene! :-)
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I've wanted to do this for a while now but was waiting
> for
> > > >>>>>>> the
> > > >>>>>>>>>> Flink
> > > >>>>>>>>>>>> 1.2
> > > >>>>>>>>>>>>>> release (which happened this week)! There's some
> > > >>>>>> prerequisite
> > > >>>>>>>>> work
> > > >>>>>>>>>> to
> > > >>>>>>>>>>>> be
> > > >>>>>>>>>>>>>> done on the Flink runner: we'll move to the new timer
> > > >>>>>>>> interfaces
> > > >>>>>>>>>>>> introduced
> > >
> > >>>>>>>>>>>>>> in Flink 1.2 and implement support for both the user
> facing
> > > >>>>>>>> state
> > > >>>>>>>>>> and
> > > >>>>>>>>>>>> timer
> > > >>>>>>>>>>>>>> APIs. This should make implementation of SDF easier.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <
> > > >>>>>>>>>>> kirpichov@google.com
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks! Looking forward to this work.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <
> > > >>>>>>>>>> jb@nanthrax.net
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks for the update Eugene.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I will work on the spark runner with Amit.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Regards
> > > >>>>>>>>>>>>>> JB
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov
> > > >>>>>>>>>>>>>> <ki...@google.com.INVALID> wrote:
> > > >>>>>>>>>>>>>>> Hello,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> I'm almost done adding support for Splittable DoFn
> > > >>>>>>>>>>>>>>> http://s.apache.org/splittable-do-fn to Dataflow
> > > >>>>>> streaming
> > > >>>>>>>>>> runner*,
> > > >>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>> very excited about that. There's only 1 PR
> > > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1898> remaining,
> > > >>>>>> plus
> > > >>>>>>>>>> enabling
> > > >>>>>>>>>>>>>>> some
> > > >>>>>>>>>>>>>>> tests.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> * (batch runner is much harder because it's not yet
> quite
> > > >>>>>>>> clear
> > > >>>>>>>>> to
> > > >>>>>>>>>>> me
> > > >>>>>>>>>>>>>>> how
> > > >>>>>>>>>>>>>>> to properly implement liquid sharding
> > > >>>>>>>>>>>>>>> <
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>> https://cloud.google.com/blog/big-data/2016/05/no-shard-
> > > >>>>>>>>> left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> with
> > > >>>>>>>>>>>>>>> SDF - and the current API is not ready for that yet)
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> After implementing all the runner-agnostic parts of
> > > >>>>>>> Splittable
> > > >>>>>>>>>>> DoFn, I
> > >
> > >>>>>>>>>>>>>>> found them quite easy to integrate into Dataflow
> streaming
> > > >>>>>>>>> runner,
> > > >>>>>>>>>>> and
> > > >>>>>>>>>>>>>>> I
> > >
> > >>>>>>>>>>>>>>> think this means it should be easy to integrate into
> other
> > > >>>>>>>>> runners
> > > >>>>>>>>>>>> too.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> ====== Why it'd be cool ======
> > > >>>>>>>>>>>>>>> The general benefits of SDF are well-described in the
> > > >>>>>> design
> > > >>>>>>>> doc
> > > >>>>>>>>>>>>>>> (linked
> > > >>>>>>>>>>>>>>> above).
> > > >>>>>>>>>>>>>>> As for right now - if we integrated SDF with all
> runners,
> > > >>>>>>> it'd
> > > >>>>>>>>>>> already
> > > >>>>>>>>>>>>>>> enable us to start greatly simplifying the code of
> > > >>>>>> existing
> > > >>>>>>>>>>> streaming
> > >
> > >>>>>>>>>>>>>>> connectors (CountingInput, Kafka, Pubsub, JMS) and
> writing
> > > >>>>>>> new
> > > >>>>>>>>>>>>>>> connectors
> > > >>>>>>>>>>>>>>> (e.g. a really nice one to implement would be
> "directory
> > > >>>>>>>>> watcher",
> > > >>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>> continuously returns new files in a directory).
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> As a teaser, here's the complete implementation of an
> > > >>>>>>>> "unbounded
> > > >>>>>>>>>>>>>>> counter" I
> > > >>>>>>>>>>>>>>> used for my test of Dataflow runner integration:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>  class CountFn extends DoFn<String, String> {
> > > >>>>>>>>>>>>>>>    @ProcessElement
> > > >>>>>>>>>>>>>>> public ProcessContinuation process(ProcessContext c,
> > > >>>>>>>>>>>> OffsetRangeTracker
> > > >>>>>>>>>>>>>>> tracker) {
> > > >>>>>>>>>>>>>>>      for (int i =
> tracker.currentRestriction().getFrom();
> > > >>>>>>>>>>>>>>> tracker.tryClaim(i); ++i) c.output(i);
> > > >>>>>>>>>>>>>>>      return resume();
> > > >>>>>>>>>>>>>>>    }
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>    @GetInitialRestriction
> > > >>>>>>>>>>>>>>>    public OffsetRange getInitialRange(String element) {
> > > >>>>>>>> return
> > > >>>>>>>>>> new
> > > >>>>>>>>>>>>>>> OffsetRange(0, Integer.MAX_VALUE); }
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>    @NewTracker
> > > >>>>>>>>>>>>>>>   public OffsetRangeTracker newTracker(OffsetRange
> > > >>>>>> range) {
> > > >>>>>>>>>> return
> > > >>>>>>>>>>>> new
> > > >>>>>>>>>>>>>>> OffsetRangeTracker(range); }
> > > >>>>>>>>>>>>>>>  }
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> ====== What I'm asking ======
> > > >>>>>>>>>>>>>>> So, I'd like to ask for help integrating SDF into
> Spark,
> > > >>>>>>> Flink
> > > >>>>>>>>> and
> > > >>>>>>>>>>>> Apex
> > > >>>>>>>>>>>>>>> runners from people who are intimately familiar with
> them
> > > >>>>>> -
> > > >>>>>>>>>>>>>>> specifically, I
> > > >>>>>>>>>>>>>>> was hoping best-case I could nerd-snipe some of you
> into
> > > >>>>>>>> taking
> > > >>>>>>>>>> over
> > > >>>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>> integration of SDF with your favorite runner ;)
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> The proper set of people seems to be +Aljoscha Krettek
> > > >>>>>>>>>>>>>>> <al...@data-artisans.com> +Maximilian Michels
> > > >>>>>>>>>>>>>>> <ma...@data-artisans.com>
> > > >>>>>>>>>>>>>>> +iemejia@gmail.com <ie...@gmail.com> +Amit Sela
> > > >>>>>>>>>>>>>>> <am...@gmail.com> +Thomas
> > > >>>>>>>>>>>>>>> Weise unless I forgot somebody.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Average-case, I was looking for runner-specific
> guidance
> > > >>>>>> on
> > > >>>>>>>> how
> > > >>>>>>>>> to
> > > >>>>>>>>>>> do
> > > >>>>>>>>>>>>>>> it
> > > >>>>>>>>>>>>>>> myself.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> ====== If you want to help ======
> > >
> > >>>>>>>>>>>>>>> If somebody decides to take this over, in my absence
> (I'll
> > > >>>>>>> be
> > > >>>>>>>>>> mostly
> > > >>>>>>>>>>>>>>> gone
> > > >>>>>>>>>>>>>>> for ~the next month)., the best people to ask for
> > > >>>>>>>> implementation
> > > >>>>>>>>>>>>>>> advice are +Kenn
> > > >>>>>>>>>>>>>>> Knowles <kl...@google.com> and +Daniel Mills <
> > > >>>>>>> millsd@google.com
> > > >>>>>>>>>
> > > >>>>>>>>> .
> > > >>>>>>>>>>>>>>>
> > >
> > >>>>>>>>>>>>>>> For reference, here's how SDF is implemented in the
> direct
> > > >>>>>>>>> runner:
> > > >>>>>>>>>>>>>>> - Direct runner overrides
> > > >>>>>>>>>>>>>>> <
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b
> > > >>>>>>>>> 74a62d9b24/runners/direct-java/src/main/java/org/apache/
> > > >>>>>>>>> beam/runners/direct/ParDoMultiOverrideFactory.java
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> ParDo.of() for a splittable DoFn and replaces it with
> > > >>>>>>>>>>> SplittableParDo
> > > >>>>>>>>>>>>>>> <
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>> https://github.com/apache/beam/blob/master/runners/core-
> > >
> > >>>>>>>>>
> java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> (common
> > > >>>>>>>>>>>>>>> transform expansion)
> > > >>>>>>>>>>>>>>> - SplittableParDo uses two runner-specific primitive
> > > >>>>>>>> transforms:
> > > >>>>>>>>>>>>>>> "GBKIntoKeyedWorkItems" and
> "SplittableProcessElements".
> > > >>>>>>>> Direct
> > > >>>>>>>>>>> runner
> > > >>>>>>>>>>>>>>> overrides the first one like this
> > > >>>>>>>>>>>>>>> <
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> > > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
> > >
> > >>>>>>>>>
> beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
> > > >>>>>>>>>>>>>>> ,
> > > >>>>>>>>>>>>>>> and directly implements evaluation of the second one
> like
> > > >>>>>>> this
> > > >>>>>>>>>>>>>>> <
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> > > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
> > >
> > >>>>>>>>>
> beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
> > > >>>>>>>>>>>>>>> ,
> > > >>>>>>>>>>>>>>> using runner hooks introduced in this PR
> > > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1824>. At the
> core
> > > >>>>>> of
> > > >>>>>>>> the
> > > >>>>>>>>>>> hooks
> > > >>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>> "ProcessFn" which is like a regular DoFn but has to be
> > > >>>>>>>> prepared
> > > >>>>>>>>> at
> > > >>>>>>>>>>>>>>> runtime
> > > >>>>>>>>>>>>>>> with some hooks (state, timers, and runner access to
> > > >>>>>>>>>>>>>>> RestrictionTracker)
> > >
> > >>>>>>>>>>>>>>> before you invoke it. I added a convenience
> implementation
> > > >>>>>>> of
> > > >>>>>>>>> the
> > > >>>>>>>>>>> hook
> > > >>>>>>>>>>>>>>> mimicking behavior of UnboundedSource.
> > > >>>>>>>>>>>>>>> - The relevant runner-agnostic tests are in
> > > >>>>>>> SplittableDoFnTest
> > > >>>>>>>>>>>>>>> <
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> > > >>>>>>>>> 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/
> > > >>>>>>>>> transforms/SplittableDoFnTest.java
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> .
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> That's all it takes, really - the runner has to
> implement
> > > >>>>>>>> these
> > > >>>>>>>>>> two
> > > >>>>>>>>>>>>>>> transforms. When I looked at Spark and Flink runners,
> it
> > > >>>>>> was
> > > >>>>>>>> not
> > > >>>>>>>>>>> quite
> > > >>>>>>>>>>>>>>> clear to me how to implement the GBKIntoKeyedWorkItems
> > > >>>>>>>>> transform,
> > > >>>>>>>>>>> e.g.
> > > >>>>>>>>>>>>>>> Spark runner currently doesn't use KeyedWorkItem at
> all -
> > > >>>>>>> but
> > > >>>>>>>> it
> > > >>>>>>>>>>> seems
> > > >>>>>>>>>>>>>>> definitely possible.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Thanks!
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> --
> > > >>>>>>>>>>>>>> Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> info@data-artisans.com
> > >
> > >>>>>>>>>>>>>> +49-(0)30-55599146 <+49%2030%2055599146>
> <+49%2030%2055599146> <+49%2030%2055599146>
> > > >>>>>> <+49%2030%2055599146>
> > > >>>>>>>> <+49%2030%2055599146> <+49%2030%2055599146>
> > > >>>>>>>>>> <+49%2030%2055599146 <(205)%20559-9146>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg - HRB 158244 B
> > > >>>>>>>>>>>>>> Managing Directors: Kostas Tzoumas, Stephan Ewen
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>> --
> > > >>>> Jean-Baptiste Onofré
> > > >>>> jbonofre@apache.org
> > > >>>> http://blog.nanthrax.net
> > > >>>> Talend - http://www.talend.com
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > > >> --
> > > >> Jean-Baptiste Onofré
> > > >> jbonofre@apache.org
> > > >> http://blog.nanthrax.net
> > > >> Talend - http://www.talend.com
> > >
> > > --
> > > Jean-Baptiste Onofré
> > > jbonofre@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> >
>

Re: Call for help: let's add Splittable DoFn to Spark, Flink and Apex runners

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
Hi all,

Splittable DoFn is now available in Dataflow streaming runner, as of
https://github.com/apache/beam/pull/1898 !

Meanwhile, Flink support got disabled due to some churn as part of First
Stable Release, but it should be not hard to fix - tracked in
https://issues.apache.org/jira/browse/BEAM-2140 +Aljoscha Krettek
<al...@data-artisans.com>

Thomas - any news on Apex?

On Sat, Apr 8, 2017 at 1:28 PM Thomas Weise <th...@apache.org> wrote:

> Nice work Aljoscha!
>
> Update WRT ApexRunner: We merged some prep work in the ParDoOperator to
> weed out remnants of OldDoFn. I have almost all the changes ready to add
> the support for Splittable DoFn (for most part those follow the Flink
> runner changes). The final piece missing to support the feature (based on
> observation from the test failures) is the timer internals.
>
> Thanks,
> Thomas
>
>
> On Sat, Apr 1, 2017 at 1:17 AM, Eugene Kirpichov <
> kirpichov@google.com.invalid> wrote:
>
> > Hey all,
> >
> > The Flink PR has been merged, and thus - Flink becomes the first
> > distributed runner to support Splittable DoFn!!!
> > Thank you, Aljoscha!
> >
> > Looking forward to Spark and Apex, and continuing work on Dataflow.
> > I'll also send proposals about a couple of new ideas related to SDF next
> > week.
> >
> > On Thu, Mar 30, 2017 at 9:08 AM Amit Sela <am...@gmail.com> wrote:
> >
> > > I will not be able to make it this weekend, too busy. Let's chat at the
> > > beginning of next week and see what's on my plate.
> > >
> > > On Tue, Mar 28, 2017 at 5:44 PM Aljoscha Krettek <al...@apache.org>
> > > wrote:
> > >
> > > > Thanks for the offers, guys! The code is finished, though. I only
> need
> > > > to do the last touch ups.
> > > >
> > > > On Tue, Mar 28, 2017, at 09:16, JingsongLee wrote:
> > > > > Hi Aljoscha,
> > > > > I would like to work on the Flink runner with you.
> > > > >
> > > >
> > > Best,JingsongLee--------------------------------------------
> > ----------------------From:Jean-Baptiste
> > > > > Onofré <jb...@nanthrax.net>Time:2017 Mar 28 (Tue) 14:04To:dev
> > > > > <de...@beam.apache.org>Subject:Re: Call for help: let's add
> Splittable
> > > > DoFn
> > > > > to Spark, Flink and Apex runners
> > > > > Hi Aljoscha,
> > > > >
> > > > > do you need some help on this ?
> > > > >
> > > > > Regards
> > > > > JB
> > > > >
> > > > > On 03/28/2017 08:00 AM, Aljoscha Krettek wrote:
> > > > > > Hi,
> > > > > > sorry for being so slow but I’m currently traveling.
> > > > > >
> > > > > > The Flink code works but I think it could benefit from some
> > > refactoring
> > > > > > to make the code nice and maintainable.
> > > > > >
> > > > > > Best,
> > > > > > Aljoscha
> > > > > >
> > > > > > On Tue, Mar 28, 2017, at 07:40, Jean-Baptiste Onofré wrote:
> > > > > >> I add myself on the Spark runner.
> > > > > >>
> > > > > >> Regards
> > > > > >> JB
> > > > > >>
> > > > > >> On 03/27/2017 08:18 PM, Eugene Kirpichov wrote:
> > > > > >>> Hi all,
> > > > > >>>
> > > > > >>> Let's continue the ~bi-weekly sync-ups about state of SDF
> support
> > > in
> > > > > >>> Spark/Flink/Apex runners.
> > > > > >>>
> > > > > >>> Spark:
> > > > >
> > > > >>> Amit, Aviem, Ismaël - when would be a good time for you; does
> same
> > > time
> > > > > >>> work (8am PST this Friday)? Who else would like to join?
> > > > > >>>
> > > > > >>> Flink:
> > > > > >>> I pinged the PR, but - Aljoscha, do you think it's worth
> > discussing
> > > > > >>> anything there over a videocall?
> > > > > >>>
> > > > > >>> Apex:
> > > > >
> > > > >>> Thomas - how about same time next Monday? (9:30am PST) Who else
> > > would like
> > > > > >>> to join?
> > > > > >>>
> > > > > >>> On Mon, Mar 20, 2017 at 9:59 AM Eugene Kirpichov <
> > > > kirpichov@google.com>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>>> Meeting notes:
> > > > > >>>> Me and Thomas had a video call and we pretty much walked
> through
> > > the
> > > > >
> > > > >>>> implementation of SDF in the runner-agnostic part and in the
> > direct
> > > runner.
> > > > > >>>> Flink and Apex are pretty similar, so likely
> > > > > >>>> https://github.com/apache/beam/pull/2235
> > > >  (the Flink PR) will give a very
> > > > > >>>> good guideline as to how to do this in Apex.
> > > > > >>>> Will talk again in ~2 weeks; and will involve +David Yan
> > > > > >>>> <davidyan@google.com
> > > > > who is also on Apex and currently conveniently
> > > > >
> > > > >>>> works on the Google Dataflow team and, from in-person
> > conversation,
> > > was
> > > > > >>>> interested in being involved :)
> > > > > >>>>
> > > > > >>>> On Mon, Mar 20, 2017 at 7:34 AM Eugene Kirpichov <
> > > > kirpichov@google.com>
> > > > > >>>> wrote:
> > > > > >>>>
> > > > > >>>> Thomas - yes, 9:30 works, shall we do that?
> > > > > >>>>
> > > > >
> > > > >>>> JB - excellent! You can start experimenting already, using
> direct
> > > runner!
> > > > > >>>>
> > > > > >>>> On Mon, Mar 20, 2017, 2:26 AM Jean-Baptiste Onofré <
> > > jb@nanthrax.net
> > > > >
> > > > > >>>> wrote:
> > > > > >>>>
> > > > > >>>> Hi Eugene,
> > > > > >>>>
> > > > > >>>> Thanks for the meeting notes !
> > > > > >>>>
> > > > >
> > > > >>>> I will be in the next call and Ismaël also provided to me some
> > > updates.
> > > > > >>>>
> > > > >
> > > > >>>> I will sync with Amit on Spark runner and start to experiment
> and
> > > test SDF
> > > > > >>>> on
> > > > > >>>> the JMS IO.
> > > > > >>>>
> > > > > >>>> Thanks !
> > > > > >>>> Regards
> > > > > >>>> JB
> > > > > >>>>
> > > > > >>>> On 03/17/2017 04:36 PM, Eugene Kirpichov wrote:
> > > > > >>>>> Meeting notes from today's call with Amit, Aviem and Ismaël:
> > > > > >>>>>
> > > > > >>>>> Spark has 2 types of stateful operators; a cheap one intended
> > for
> > > > > >>>> updating
> > > > >
> > > > >>>>> elements (works with state but not with timers) and an
> expensive
> > > one.
> > > > > >>>> I.e.
> > > > >
> > > > >>>>> there's no efficient direct counterpart to Beam's keyed state
> > > model. In
> > > > >
> > > > >>>>> implementation of Beam State & Timers API, Spark runner will
> use
> > > the
> > > > >
> > > > >>>>> cheaper one for state and the expensive one for timers. So, for
> > > SDF,
> > > > > >>>> which
> > > > >
> > > > >>>>> in the runner-agnostic SplittableParDo expansion needs both
> state
> > > and
> > > > >
> > > > >>>>> timers, we'll need the expensive one - but this should be fine
> > > since with
> > > > >
> > > > >>>>> SDF the bottleneck should be in the ProcessElement call itself,
> > > not in
> > > > > >>>>> splitting/scheduling it.
> > > > > >>>>>
> > > > >
> > > > >>>>> For Spark batch runner, implementing SDF might be still
> simpler:
> > > runner
> > > > >
> > > > >>>>> will just not request any checkpointing. Hard parts about
> > > SDF/batch are
> > > > >
> > > > >>>>> dynamic rebalancing and size estimation APIs - they will be
> > > refined this
> > > > > >>>>> quarter, but it's ok to initially not have them.
> > > > > >>>>>
> > > > > >>>>> Spark runner might use a different expansion of SDF not
> > involving
> > > > > >>>>> KeyedWorkItem's (i.e. not overriding the
> GBKIntoKeyedWorkItems
> > > > > >>>> transform),
> > > > >
> > > > >>>>> though still striving to reuse as much code as possible from
> the
> > > standard
> > > > > >>>>> expansion implemented in SplittableParDo, at least ProcessFn.
> > > > > >>>>>
> > > > > >>>>> Testing questions:
> > > > > >>>>> - Spark runner already implements termination on
> > > > > >>>>> watermarks-reaching-infinity properly.
> > > > >
> > > > >>>>> - Q: How to test that the runner actually splits? A: The code
> > that
> > > splits
> > > > > >>>>> is in the runner-agnostic, so a runner would have to
> > deliberately
> > > > > >>>> sabotage
> > > > > >>>>> it in order to break it - unlikely. Also, for semantics we
> have
> > > > >
> > > > >>>>> runner-agnostic ROS tests; but at some point will need
> > performance
> > > tests
> > > > > >>>>> too.
> > > > > >>>>>
> > > > > >>>>> Next steps:
> > > > > >>>>> - Amit will look at the standard SplittableParDo expansion
> and
> > > > >
> > > > >>>>> implementation in Flink and Direct runner, will write up a doc
> > > about how
> > > > > >>>> to
> > > > > >>>>> do this in Spark.
> > > > > >>>>> - Another videotalk in 2 weeks to check on progress/issues.
> > > > > >>>>>
> > > > > >>>>> Thanks all!
> > > > > >>>>>
> > > > > >>>>> On Fri, Mar 17, 2017 at 8:29 AM Eugene Kirpichov <
> > > > kirpichov@google.com>
> > > > > >>>>> wrote:
> > > > > >>>>>
> > > > >
> > > > >>>>>> Yes, Monday morning works! How about also 8am PST, same
> Hangout
> > > link -
> > > > > >>>>>> does that work for you?
> > > > > >>>>>>
> > > > > >>>>>> On Fri, Mar 17, 2017 at 7:50 AM Thomas Weise <
> > > > thomas.weise@gmail.com>
> > > > > >>>>>> wrote:
> > > > > >>>>>>
> > > > > >>>>>> Eugene,
> > > > > >>>>>>
> > > > >
> > > > >>>>>> I cannot make it for the call today. Would Monday morning work
> > > for you
> > > > > >>>> to
> > > > > >>>>>> discuss the Apex changes?
> > > > > >>>>>>
> > > > > >>>>>> Thanks
> > > > > >>>>>>
> > > > > >>>>>> On Tue, Mar 14, 2017 at 7:27 PM, Eugene Kirpichov <
> > > > > >>>>>> kirpichov@google.com.invalid> wrote:
> > > > > >>>>>>
> > > > >
> > > > >>>>>>> Hi! Please feel free to join this call, but I think we'd be
> > > mostly
> > > > >
> > > > >>>>>>> discussing how to do it in the Spark runner in particular; so
> > > we'll
> > > > > >>>>>>> probably need another call for Apex anyway.
> > > > > >>>>>>>
> > > > > >>>>>>> On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <
> thw@apache.org
> > > > > wrote:
> > > > > >>>>>>>
> > > > > >>>>>>>> Hi Eugene,
> > > > > >>>>>>>>
> > > > >
> > > > >>>>>>>> This would work for me also. Please let me know if you want
> to
> > > keep
> > > > > >>>> the
> > > > > >>>>>>>> Apex related discussion separate or want me to join this
> > call.
> > > > > >>>>>>>>
> > > > > >>>>>>>> Thanks,
> > > > > >>>>>>>> Thomas
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov <
> > > > > >>>>>>>> kirpichov@google.com.invalid> wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>>> Sure, Friday morning sounds good. How about 9am Friday
> PST,
> > > at
> > > > > >>>>>>> videocall
> > > > > >>>>>>>> by
> > > > > >>>>>>>>> link
> > > > > >>>>>>
> > > https://hangouts.google.com/hangouts/_/google.com/splittabledofn
> > > > > >>>>>>> ?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <
> > > > amitsela33@gmail.com>
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>>>
> > > > >
> > > > >>>>>>>>>> PST mornings are better, because they are evening/nights
> for
> > > me.
> > > > > >>>>>>> Friday
> > > > > >>>>>>>>>> would work-out best for me.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov
> > > > > >>>>>>>>>> <ki...@google.com.invalid> wrote:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> Awesome!!!
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Amit - remind me your time zone? JB, do you want to
> join?
> > > > > >>>>>>>>>>> I'm free this week all afternoons (say after 2pm) in
> > > Pacific
> > > > > >>>>>> Time,
> > > > > >>>>>>>> and
> > > > > >>>>>>>>>>> mornings of Wed & Fri. We'll probably need half an hour
> > to
> > > an
> > > > > >>>>>> hour.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek <
> > > > > >>>>>>>> aljoscha@apache.org>
> > > > > >>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>> I whipped up a quick version for Flink that seems to
> > work:
> > > > > >>>>>>>>>>>> https://github.com/apache/beam/pull/2235
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> There are still two failing tests, as described in the
> > PR.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote:
> > > > > >>>>>>>>>>>>> +1 for a video call. I think it should be pretty
> > straight
> > > > > >>>>>>> forward
> > > > > >>>>>>>>> for
> > > > > >>>>>>>>>>> the
> > > > >
> > > > >>>>>>>>>>>>> Spark runner after the work on read from
> UnboundedSource
> > > and
> > > > > >>>>>>>> after
> > > > > >>>>>>>>>>>>> GroupAlsoByWindow, but from my experience such a call
> > > could
> > > > > >>>>>>> move
> > > > > >>>>>>>> us
> > > > > >>>>>>>>>>>>> forward
> > > > > >>>>>>>>>>>>> fast enough.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov <
> > > > > >>>>>>>> kirpichov@google.com
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Hi all,
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Let us continue working on this. I am back from
> > various
> > > > > >>>>>>> travels
> > > > > >>>>>>>>> and
> > > > > >>>>>>>>>>> am
> > > > > >>>>>>>>>>>>>> eager to help.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Amit, JB - would you like to perhaps have a
> videocall
> > to
> > > > > >>>>>> hash
> > > > > >>>>>>>>> this
> > > > > >>>>>>>>>>> out
> > > > > >>>>>>>>>>>> for
> > > > > >>>>>>>>>>>>>> the Spark runner?
> > > > > >>>>>>>>>>>>>>
> > > > >
> > > > >>>>>>>>>>>>>> Aljoscha - are the necessary Flink changes done / or
> is
> > > the
> > > > > >>>>>>>> need
> > > > > >>>>>>>>>> for
> > > > > >>>>>>>>>>>> them
> > > > > >>>>>>>>>>>>>> obviated by using the (existing) runner-facing
> > > state/timer
> > > > > >>>>>>>> APIs?
> > > > > >>>>>>>>>>>> Should we
> > > > > >>>>>>>>>>>>>> have a videocall too?
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Thomas - what do you think about getting this into
> > Apex
> > > > > >>>>>>> runner?
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> (I think videocalls will allow to make rapid
> progress,
> > > but
> > > > > >>>>>>> it's
> > > > > >>>>>>>>>>>> probably a
> > > > > >>>>>>>>>>>>>> better idea to keep them separate since they'll
> > involve
> > > a
> > > > > >>>>>> lot
> > > > > >>>>>>>> of
> > > > > >>>>>>>>>>>>>> runner-specific details)
> > > > > >>>>>>>>>>>>>>
> > > > >
> > > > >>>>>>>>>>>>>> PS - The completion of this in Dataflow streaming
> runner
> > > is
> > > > > >>>>>>>>>> currently
> > > > > >>>>>>>>>>>>>> waiting only on having a small service-side change
> > > > > >>>>>>> implemented
> > > > > >>>>>>>>> and
> > > > > >>>>>>>>>>>> rolled
> > > > > >>>>>>>>>>>>>> out for termination of streaming jobs.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles <
> > > > > >>>>>>>> klk@google.com>
> > > > > >>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > >
> > > > >>>>>>>>>>>>>> I recommend proceeding with the runner-facing state &
> > > timer
> > > > > >>>>>>>> APIs;
> > > > > >>>>>>>>>>> they
> > > > > >>>>>>>>>>>> are
> > > > > >>>>>>>>>>>>>> lower-level and more appropriate for this. All
> runners
> > > > > >>>>>>> provide
> > > > > >>>>>>>>> them
> > > > > >>>>>>>>>>> or
> > > > > >>>>>>>>>>>> use
> > > > > >>>>>>>>>>>>>> runners/core implementations, as they are needed for
> > > > > >>>>>>>> triggering.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov <
> > > > > >>>>>>>>>>>> kirpichov@google.com>
> > > > > >>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Thanks Aljoscha!
> > > > > >>>>>>>>>>>>>>
> > > > >
> > > > >>>>>>>>>>>>>> Minor note: I'm not familiar with what level of
> support
> > > for
> > > > > >>>>>>>>> timers
> > > > > >>>>>>>>>>>> Flink
> > > > > >>>>>>>>>>>>>> currently has - however SDF in Direct and Dataflow
> > > runner
> > > > > >>>>>>>>> currently
> > > > > >>>>>>>>>>>> does
> > > > > >>>>>>>>>>>>>> not use the user-facing state/timer APIs - rather,
> it
> > > uses
> > > > > >>>>>>> the
> > > > > >>>>>>>>>>>>>> runner-facing APIs (StateInternals and
> > TimerInternals) -
> > > > > >>>>>>>> perhaps
> > > > > >>>>>>>>>>> Flink
> > > > > >>>>>>>>>>>>>> already implements these. We may want to change
> this,
> > > but
> > > > > >>>>>> for
> > > > > >>>>>>>> now
> > > > > >>>>>>>>>>> it's
> > > > > >>>>>>>>>>>> good
> > > > > >>>>>>>>>>>>>> enough (besides, SDF uses watermark holds, which are
> > not
> > > > > >>>>>>>>> supported
> > > > > >>>>>>>>>> by
> > > > > >>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>> user-facing state API yet).
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek <
> > > > > >>>>>>>>>>>>>> aljoscha@data-artisans.com> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Thanks for the motivation, Eugene! :-)
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> I've wanted to do this for a while now but was
> waiting
> > > for
> > > > > >>>>>>> the
> > > > > >>>>>>>>>> Flink
> > > > > >>>>>>>>>>>> 1.2
> > > > > >>>>>>>>>>>>>> release (which happened this week)! There's some
> > > > > >>>>>> prerequisite
> > > > > >>>>>>>>> work
> > > > > >>>>>>>>>> to
> > > > > >>>>>>>>>>>> be
> > > > > >>>>>>>>>>>>>> done on the Flink runner: we'll move to the new
> timer
> > > > > >>>>>>>> interfaces
> > > > > >>>>>>>>>>>> introduced
> > > > >
> > > > >>>>>>>>>>>>>> in Flink 1.2 and implement support for both the user
> > > facing
> > > > > >>>>>>>> state
> > > > > >>>>>>>>>> and
> > > > > >>>>>>>>>>>> timer
> > > > > >>>>>>>>>>>>>> APIs. This should make implementation of SDF easier.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <
> > > > > >>>>>>>>>>> kirpichov@google.com
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Thanks! Looking forward to this work.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré
> <
> > > > > >>>>>>>>>> jb@nanthrax.net
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Thanks for the update Eugene.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> I will work on the spark runner with Amit.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Regards
> > > > > >>>>>>>>>>>>>> JB
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov
> > > > > >>>>>>>>>>>>>> <ki...@google.com.INVALID> wrote:
> > > > > >>>>>>>>>>>>>>> Hello,
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> I'm almost done adding support for Splittable DoFn
> > > > > >>>>>>>>>>>>>>> http://s.apache.org/splittable-do-fn to Dataflow
> > > > > >>>>>> streaming
> > > > > >>>>>>>>>> runner*,
> > > > > >>>>>>>>>>>> and
> > > > > >>>>>>>>>>>>>>> very excited about that. There's only 1 PR
> > > > > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1898>
> > remaining,
> > > > > >>>>>> plus
> > > > > >>>>>>>>>> enabling
> > > > > >>>>>>>>>>>>>>> some
> > > > > >>>>>>>>>>>>>>> tests.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> * (batch runner is much harder because it's not yet
> > > quite
> > > > > >>>>>>>> clear
> > > > > >>>>>>>>> to
> > > > > >>>>>>>>>>> me
> > > > > >>>>>>>>>>>>>>> how
> > > > > >>>>>>>>>>>>>>> to properly implement liquid sharding
> > > > > >>>>>>>>>>>>>>> <
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> https://cloud.google.com/blog/big-data/2016/05/no-shard-
> > > > > >>>>>>>>> left-behind-dynamic-work-rebalancing-in-google-cloud-
> > dataflow
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> with
> > > > > >>>>>>>>>>>>>>> SDF - and the current API is not ready for that
> yet)
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> After implementing all the runner-agnostic parts of
> > > > > >>>>>>> Splittable
> > > > > >>>>>>>>>>> DoFn, I
> > > > >
> > > > >>>>>>>>>>>>>>> found them quite easy to integrate into Dataflow
> > > streaming
> > > > > >>>>>>>>> runner,
> > > > > >>>>>>>>>>> and
> > > > > >>>>>>>>>>>>>>> I
> > > > >
> > > > >>>>>>>>>>>>>>> think this means it should be easy to integrate into
> > > other
> > > > > >>>>>>>>> runners
> > > > > >>>>>>>>>>>> too.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> ====== Why it'd be cool ======
> > > > > >>>>>>>>>>>>>>> The general benefits of SDF are well-described in
> the
> > > > > >>>>>> design
> > > > > >>>>>>>> doc
> > > > > >>>>>>>>>>>>>>> (linked
> > > > > >>>>>>>>>>>>>>> above).
> > > > > >>>>>>>>>>>>>>> As for right now - if we integrated SDF with all
> > > runners,
> > > > > >>>>>>> it'd
> > > > > >>>>>>>>>>> already
> > > > > >>>>>>>>>>>>>>> enable us to start greatly simplifying the code of
> > > > > >>>>>> existing
> > > > > >>>>>>>>>>> streaming
> > > > >
> > > > >>>>>>>>>>>>>>> connectors (CountingInput, Kafka, Pubsub, JMS) and
> > > writing
> > > > > >>>>>>> new
> > > > > >>>>>>>>>>>>>>> connectors
> > > > > >>>>>>>>>>>>>>> (e.g. a really nice one to implement would be
> > > "directory
> > > > > >>>>>>>>> watcher",
> > > > > >>>>>>>>>>>> that
> > > > > >>>>>>>>>>>>>>> continuously returns new files in a directory).
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> As a teaser, here's the complete implementation of
> an
> > > > > >>>>>>>> "unbounded
> > > > > >>>>>>>>>>>>>>> counter" I
> > > > > >>>>>>>>>>>>>>> used for my test of Dataflow runner integration:
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>  class CountFn extends DoFn<String, String> {
> > > > > >>>>>>>>>>>>>>>    @ProcessElement
> > > > > >>>>>>>>>>>>>>> public ProcessContinuation process(ProcessContext
> c,
> > > > > >>>>>>>>>>>> OffsetRangeTracker
> > > > > >>>>>>>>>>>>>>> tracker) {
> > > > > >>>>>>>>>>>>>>>      for (int i =
> > > tracker.currentRestriction().getFrom();
> > > > > >>>>>>>>>>>>>>> tracker.tryClaim(i); ++i) c.output(i);
> > > > > >>>>>>>>>>>>>>>      return resume();
> > > > > >>>>>>>>>>>>>>>    }
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>    @GetInitialRestriction
> > > > > >>>>>>>>>>>>>>>    public OffsetRange getInitialRange(String
> > element) {
> > > > > >>>>>>>> return
> > > > > >>>>>>>>>> new
> > > > > >>>>>>>>>>>>>>> OffsetRange(0, Integer.MAX_VALUE); }
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>    @NewTracker
> > > > > >>>>>>>>>>>>>>>   public OffsetRangeTracker newTracker(OffsetRange
> > > > > >>>>>> range) {
> > > > > >>>>>>>>>> return
> > > > > >>>>>>>>>>>> new
> > > > > >>>>>>>>>>>>>>> OffsetRangeTracker(range); }
> > > > > >>>>>>>>>>>>>>>  }
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> ====== What I'm asking ======
> > > > > >>>>>>>>>>>>>>> So, I'd like to ask for help integrating SDF into
> > > Spark,
> > > > > >>>>>>> Flink
> > > > > >>>>>>>>> and
> > > > > >>>>>>>>>>>> Apex
> > > > > >>>>>>>>>>>>>>> runners from people who are intimately familiar
> with
> > > them
> > > > > >>>>>> -
> > > > > >>>>>>>>>>>>>>> specifically, I
> > > > > >>>>>>>>>>>>>>> was hoping best-case I could nerd-snipe some of you
> > > into
> > > > > >>>>>>>> taking
> > > > > >>>>>>>>>> over
> > > > > >>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>> integration of SDF with your favorite runner ;)
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> The proper set of people seems to be +Aljoscha
> > Krettek
> > > > > >>>>>>>>>>>>>>> <al...@data-artisans.com> +Maximilian Michels
> > > > > >>>>>>>>>>>>>>> <ma...@data-artisans.com>
> > > > > >>>>>>>>>>>>>>> +iemejia@gmail.com <ie...@gmail.com> +Amit Sela
> > > > > >>>>>>>>>>>>>>> <am...@gmail.com> +Thomas
> > > > > >>>>>>>>>>>>>>> Weise unless I forgot somebody.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Average-case, I was looking for runner-specific
> > > guidance
> > > > > >>>>>> on
> > > > > >>>>>>>> how
> > > > > >>>>>>>>> to
> > > > > >>>>>>>>>>> do
> > > > > >>>>>>>>>>>>>>> it
> > > > > >>>>>>>>>>>>>>> myself.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> ====== If you want to help ======
> > > > >
> > > > >>>>>>>>>>>>>>> If somebody decides to take this over, in my absence
> > > (I'll
> > > > > >>>>>>> be
> > > > > >>>>>>>>>> mostly
> > > > > >>>>>>>>>>>>>>> gone
> > > > > >>>>>>>>>>>>>>> for ~the next month)., the best people to ask for
> > > > > >>>>>>>> implementation
> > > > > >>>>>>>>>>>>>>> advice are +Kenn
> > > > > >>>>>>>>>>>>>>> Knowles <kl...@google.com> and +Daniel Mills <
> > > > > >>>>>>> millsd@google.com
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> .
> > > > > >>>>>>>>>>>>>>>
> > > > >
> > > > >>>>>>>>>>>>>>> For reference, here's how SDF is implemented in the
> > > direct
> > > > > >>>>>>>>> runner:
> > > > > >>>>>>>>>>>>>>> - Direct runner overrides
> > > > > >>>>>>>>>>>>>>> <
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b
> > > > > >>>>>>>>> 74a62d9b24/runners/direct-java/src/main/java/org/apache/
> > > > > >>>>>>>>> beam/runners/direct/ParDoMultiOverrideFactory.java
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> ParDo.of() for a splittable DoFn and replaces it
> with
> > > > > >>>>>>>>>>> SplittableParDo
> > > > > >>>>>>>>>>>>>>> <
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> https://github.com/apache/beam/blob/master/runners/core-
> > > > >
> > > > >>>>>>>>>
> > > java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> (common
> > > > > >>>>>>>>>>>>>>> transform expansion)
> > > > > >>>>>>>>>>>>>>> - SplittableParDo uses two runner-specific
> primitive
> > > > > >>>>>>>> transforms:
> > > > > >>>>>>>>>>>>>>> "GBKIntoKeyedWorkItems" and
> > > "SplittableProcessElements".
> > > > > >>>>>>>> Direct
> > > > > >>>>>>>>>>> runner
> > > > > >>>>>>>>>>>>>>> overrides the first one like this
> > > > > >>>>>>>>>>>>>>> <
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> > > > > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
> > > > >
> > > > >>>>>>>>>
> > > beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
> > > > > >>>>>>>>>>>>>>> ,
> > > > > >>>>>>>>>>>>>>> and directly implements evaluation of the second
> one
> > > like
> > > > > >>>>>>> this
> > > > > >>>>>>>>>>>>>>> <
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> > > > > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
> > > > >
> > > > >>>>>>>>>
> > > beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
> > > > > >>>>>>>>>>>>>>> ,
> > > > > >>>>>>>>>>>>>>> using runner hooks introduced in this PR
> > > > > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1824>. At the
> > > core
> > > > > >>>>>> of
> > > > > >>>>>>>> the
> > > > > >>>>>>>>>>> hooks
> > > > > >>>>>>>>>>>> is
> > > > > >>>>>>>>>>>>>>> "ProcessFn" which is like a regular DoFn but has to
> > be
> > > > > >>>>>>>> prepared
> > > > > >>>>>>>>> at
> > > > > >>>>>>>>>>>>>>> runtime
> > > > > >>>>>>>>>>>>>>> with some hooks (state, timers, and runner access
> to
> > > > > >>>>>>>>>>>>>>> RestrictionTracker)
> > > > >
> > > > >>>>>>>>>>>>>>> before you invoke it. I added a convenience
> > > implementation
> > > > > >>>>>>> of
> > > > > >>>>>>>>> the
> > > > > >>>>>>>>>>> hook
> > > > > >>>>>>>>>>>>>>> mimicking behavior of UnboundedSource.
> > > > > >>>>>>>>>>>>>>> - The relevant runner-agnostic tests are in
> > > > > >>>>>>> SplittableDoFnTest
> > > > > >>>>>>>>>>>>>>> <
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> > > > > >>>>>>>>> 99024d3a1f/sdks/java/core/src/
> > test/java/org/apache/beam/sdk/
> > > > > >>>>>>>>> transforms/SplittableDoFnTest.java
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> .
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> That's all it takes, really - the runner has to
> > > implement
> > > > > >>>>>>>> these
> > > > > >>>>>>>>>> two
> > > > > >>>>>>>>>>>>>>> transforms. When I looked at Spark and Flink
> runners,
> > > it
> > > > > >>>>>> was
> > > > > >>>>>>>> not
> > > > > >>>>>>>>>>> quite
> > > > > >>>>>>>>>>>>>>> clear to me how to implement the
> > GBKIntoKeyedWorkItems
> > > > > >>>>>>>>> transform,
> > > > > >>>>>>>>>>> e.g.
> > > > > >>>>>>>>>>>>>>> Spark runner currently doesn't use KeyedWorkItem at
> > > all -
> > > > > >>>>>>> but
> > > > > >>>>>>>> it
> > > > > >>>>>>>>>>> seems
> > > > > >>>>>>>>>>>>>>> definitely possible.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Thanks!
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> --
> > > > > >>>>>>>>>>>>>> Data Artisans GmbH | Stresemannstr. 121A | 10963
> > Berlin
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> info@data-artisans.com
> > > > >
> > > > >>>>>>>>>>>>>> +49-(0)30-55599146 <+49%2030%2055599146>
> <+49%2030%2055599146>
> > > <+49%2030%2055599146> <+49%2030%2055599146>
> > > > > >>>>>> <+49%2030%2055599146>
> > > > > >>>>>>>> <+49%2030%2055599146> <+49%2030%2055599146>
> > > > > >>>>>>>>>> <+49%2030%2055599146 <(205)%20559-9146>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg - HRB
> 158244
> > B
> > > > > >>>>>>>>>>>>>> Managing Directors: Kostas Tzoumas, Stephan Ewen
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>> --
> > > > > >>>> Jean-Baptiste Onofré
> > > > > >>>> jbonofre@apache.org
> > > > > >>>> http://blog.nanthrax.net
> > > > > >>>> Talend - http://www.talend.com
> > > > > >>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >> --
> > > > > >> Jean-Baptiste Onofré
> > > > > >> jbonofre@apache.org
> > > > > >> http://blog.nanthrax.net
> > > > > >> Talend - http://www.talend.com
> > > > >
> > > > > --
> > > > > Jean-Baptiste Onofré
> > > > > jbonofre@apache.org
> > > > > http://blog.nanthrax.net
> > > > > Talend - http://www.talend.com
> > > >
> > >
> >
>

Re: Call for help: let's add Splittable DoFn to Spark, Flink and Apex runners

Posted by Thomas Weise <th...@apache.org>.
Nice work Aljoscha!

Update WRT ApexRunner: We merged some prep work in the ParDoOperator to
weed out remnants of OldDoFn. I have almost all the changes ready to add
the support for Splittable DoFn (for most part those follow the Flink
runner changes). The final piece missing to support the feature (based on
observation from the test failures) is the timer internals.

Thanks,
Thomas


On Sat, Apr 1, 2017 at 1:17 AM, Eugene Kirpichov <
kirpichov@google.com.invalid> wrote:

> Hey all,
>
> The Flink PR has been merged, and thus - Flink becomes the first
> distributed runner to support Splittable DoFn!!!
> Thank you, Aljoscha!
>
> Looking forward to Spark and Apex, and continuing work on Dataflow.
> I'll also send proposals about a couple of new ideas related to SDF next
> week.
>
> On Thu, Mar 30, 2017 at 9:08 AM Amit Sela <am...@gmail.com> wrote:
>
> > I will not be able to make it this weekend, too busy. Let's chat at the
> > beginning of next week and see what's on my plate.
> >
> > On Tue, Mar 28, 2017 at 5:44 PM Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Thanks for the offers, guys! The code is finished, though. I only need
> > > to do the last touch ups.
> > >
> > > On Tue, Mar 28, 2017, at 09:16, JingsongLee wrote:
> > > > Hi Aljoscha,
> > > > I would like to work on the Flink runner with you.
> > > >
> > >
> > Best,JingsongLee--------------------------------------------
> ----------------------From:Jean-Baptiste
> > > > Onofré <jb...@nanthrax.net>Time:2017 Mar 28 (Tue) 14:04To:dev
> > > > <de...@beam.apache.org>Subject:Re: Call for help: let's add Splittable
> > > DoFn
> > > > to Spark, Flink and Apex runners
> > > > Hi Aljoscha,
> > > >
> > > > do you need some help on this ?
> > > >
> > > > Regards
> > > > JB
> > > >
> > > > On 03/28/2017 08:00 AM, Aljoscha Krettek wrote:
> > > > > Hi,
> > > > > sorry for being so slow but I’m currently traveling.
> > > > >
> > > > > The Flink code works but I think it could benefit from some
> > refactoring
> > > > > to make the code nice and maintainable.
> > > > >
> > > > > Best,
> > > > > Aljoscha
> > > > >
> > > > > On Tue, Mar 28, 2017, at 07:40, Jean-Baptiste Onofré wrote:
> > > > >> I add myself on the Spark runner.
> > > > >>
> > > > >> Regards
> > > > >> JB
> > > > >>
> > > > >> On 03/27/2017 08:18 PM, Eugene Kirpichov wrote:
> > > > >>> Hi all,
> > > > >>>
> > > > >>> Let's continue the ~bi-weekly sync-ups about state of SDF support
> > in
> > > > >>> Spark/Flink/Apex runners.
> > > > >>>
> > > > >>> Spark:
> > > >
> > > >>> Amit, Aviem, Ismaël - when would be a good time for you; does same
> > time
> > > > >>> work (8am PST this Friday)? Who else would like to join?
> > > > >>>
> > > > >>> Flink:
> > > > >>> I pinged the PR, but - Aljoscha, do you think it's worth
> discussing
> > > > >>> anything there over a videocall?
> > > > >>>
> > > > >>> Apex:
> > > >
> > > >>> Thomas - how about same time next Monday? (9:30am PST) Who else
> > would like
> > > > >>> to join?
> > > > >>>
> > > > >>> On Mon, Mar 20, 2017 at 9:59 AM Eugene Kirpichov <
> > > kirpichov@google.com>
> > > > >>> wrote:
> > > > >>>
> > > > >>>> Meeting notes:
> > > > >>>> Me and Thomas had a video call and we pretty much walked through
> > the
> > > >
> > > >>>> implementation of SDF in the runner-agnostic part and in the
> direct
> > runner.
> > > > >>>> Flink and Apex are pretty similar, so likely
> > > > >>>> https://github.com/apache/beam/pull/2235
> > >  (the Flink PR) will give a very
> > > > >>>> good guideline as to how to do this in Apex.
> > > > >>>> Will talk again in ~2 weeks; and will involve +David Yan
> > > > >>>> <davidyan@google.com
> > > > who is also on Apex and currently conveniently
> > > >
> > > >>>> works on the Google Dataflow team and, from in-person
> conversation,
> > was
> > > > >>>> interested in being involved :)
> > > > >>>>
> > > > >>>> On Mon, Mar 20, 2017 at 7:34 AM Eugene Kirpichov <
> > > kirpichov@google.com>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>> Thomas - yes, 9:30 works, shall we do that?
> > > > >>>>
> > > >
> > > >>>> JB - excellent! You can start experimenting already, using direct
> > runner!
> > > > >>>>
> > > > >>>> On Mon, Mar 20, 2017, 2:26 AM Jean-Baptiste Onofré <
> > jb@nanthrax.net
> > > >
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>> Hi Eugene,
> > > > >>>>
> > > > >>>> Thanks for the meeting notes !
> > > > >>>>
> > > >
> > > >>>> I will be in the next call and Ismaël also provided to me some
> > updates.
> > > > >>>>
> > > >
> > > >>>> I will sync with Amit on Spark runner and start to experiment and
> > test SDF
> > > > >>>> on
> > > > >>>> the JMS IO.
> > > > >>>>
> > > > >>>> Thanks !
> > > > >>>> Regards
> > > > >>>> JB
> > > > >>>>
> > > > >>>> On 03/17/2017 04:36 PM, Eugene Kirpichov wrote:
> > > > >>>>> Meeting notes from today's call with Amit, Aviem and Ismaël:
> > > > >>>>>
> > > > >>>>> Spark has 2 types of stateful operators; a cheap one intended
> for
> > > > >>>> updating
> > > >
> > > >>>>> elements (works with state but not with timers) and an expensive
> > one.
> > > > >>>> I.e.
> > > >
> > > >>>>> there's no efficient direct counterpart to Beam's keyed state
> > model. In
> > > >
> > > >>>>> implementation of Beam State & Timers API, Spark runner will use
> > the
> > > >
> > > >>>>> cheaper one for state and the expensive one for timers. So, for
> > SDF,
> > > > >>>> which
> > > >
> > > >>>>> in the runner-agnostic SplittableParDo expansion needs both state
> > and
> > > >
> > > >>>>> timers, we'll need the expensive one - but this should be fine
> > since with
> > > >
> > > >>>>> SDF the bottleneck should be in the ProcessElement call itself,
> > not in
> > > > >>>>> splitting/scheduling it.
> > > > >>>>>
> > > >
> > > >>>>> For Spark batch runner, implementing SDF might be still simpler:
> > runner
> > > >
> > > >>>>> will just not request any checkpointing. Hard parts about
> > SDF/batch are
> > > >
> > > >>>>> dynamic rebalancing and size estimation APIs - they will be
> > refined this
> > > > >>>>> quarter, but it's ok to initially not have them.
> > > > >>>>>
> > > > >>>>> Spark runner might use a different expansion of SDF not
> involving
> > > > >>>>> KeyedWorkItem's (i.e. not overriding the GBKIntoKeyedWorkItems
> > > > >>>> transform),
> > > >
> > > >>>>> though still striving to reuse as much code as possible from the
> > standard
> > > > >>>>> expansion implemented in SplittableParDo, at least ProcessFn.
> > > > >>>>>
> > > > >>>>> Testing questions:
> > > > >>>>> - Spark runner already implements termination on
> > > > >>>>> watermarks-reaching-infinity properly.
> > > >
> > > >>>>> - Q: How to test that the runner actually splits? A: The code
> that
> > splits
> > > > >>>>> is in the runner-agnostic, so a runner would have to
> deliberately
> > > > >>>> sabotage
> > > > >>>>> it in order to break it - unlikely. Also, for semantics we have
> > > >
> > > >>>>> runner-agnostic ROS tests; but at some point will need
> performance
> > tests
> > > > >>>>> too.
> > > > >>>>>
> > > > >>>>> Next steps:
> > > > >>>>> - Amit will look at the standard SplittableParDo expansion and
> > > >
> > > >>>>> implementation in Flink and Direct runner, will write up a doc
> > about how
> > > > >>>> to
> > > > >>>>> do this in Spark.
> > > > >>>>> - Another videotalk in 2 weeks to check on progress/issues.
> > > > >>>>>
> > > > >>>>> Thanks all!
> > > > >>>>>
> > > > >>>>> On Fri, Mar 17, 2017 at 8:29 AM Eugene Kirpichov <
> > > kirpichov@google.com>
> > > > >>>>> wrote:
> > > > >>>>>
> > > >
> > > >>>>>> Yes, Monday morning works! How about also 8am PST, same Hangout
> > link -
> > > > >>>>>> does that work for you?
> > > > >>>>>>
> > > > >>>>>> On Fri, Mar 17, 2017 at 7:50 AM Thomas Weise <
> > > thomas.weise@gmail.com>
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>> Eugene,
> > > > >>>>>>
> > > >
> > > >>>>>> I cannot make it for the call today. Would Monday morning work
> > for you
> > > > >>>> to
> > > > >>>>>> discuss the Apex changes?
> > > > >>>>>>
> > > > >>>>>> Thanks
> > > > >>>>>>
> > > > >>>>>> On Tue, Mar 14, 2017 at 7:27 PM, Eugene Kirpichov <
> > > > >>>>>> kirpichov@google.com.invalid> wrote:
> > > > >>>>>>
> > > >
> > > >>>>>>> Hi! Please feel free to join this call, but I think we'd be
> > mostly
> > > >
> > > >>>>>>> discussing how to do it in the Spark runner in particular; so
> > we'll
> > > > >>>>>>> probably need another call for Apex anyway.
> > > > >>>>>>>
> > > > >>>>>>> On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <thw@apache.org
> > > > wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Hi Eugene,
> > > > >>>>>>>>
> > > >
> > > >>>>>>>> This would work for me also. Please let me know if you want to
> > keep
> > > > >>>> the
> > > > >>>>>>>> Apex related discussion separate or want me to join this
> call.
> > > > >>>>>>>>
> > > > >>>>>>>> Thanks,
> > > > >>>>>>>> Thomas
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov <
> > > > >>>>>>>> kirpichov@google.com.invalid> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>>> Sure, Friday morning sounds good. How about 9am Friday PST,
> > at
> > > > >>>>>>> videocall
> > > > >>>>>>>> by
> > > > >>>>>>>>> link
> > > > >>>>>>
> > https://hangouts.google.com/hangouts/_/google.com/splittabledofn
> > > > >>>>>>> ?
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <
> > > amitsela33@gmail.com>
> > > > >>>>>>> wrote:
> > > > >>>>>>>>>
> > > >
> > > >>>>>>>>>> PST mornings are better, because they are evening/nights for
> > me.
> > > > >>>>>>> Friday
> > > > >>>>>>>>>> would work-out best for me.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov
> > > > >>>>>>>>>> <ki...@google.com.invalid> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> Awesome!!!
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Amit - remind me your time zone? JB, do you want to join?
> > > > >>>>>>>>>>> I'm free this week all afternoons (say after 2pm) in
> > Pacific
> > > > >>>>>> Time,
> > > > >>>>>>>> and
> > > > >>>>>>>>>>> mornings of Wed & Fri. We'll probably need half an hour
> to
> > an
> > > > >>>>>> hour.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek <
> > > > >>>>>>>> aljoscha@apache.org>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> I whipped up a quick version for Flink that seems to
> work:
> > > > >>>>>>>>>>>> https://github.com/apache/beam/pull/2235
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> There are still two failing tests, as described in the
> PR.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote:
> > > > >>>>>>>>>>>>> +1 for a video call. I think it should be pretty
> straight
> > > > >>>>>>> forward
> > > > >>>>>>>>> for
> > > > >>>>>>>>>>> the
> > > >
> > > >>>>>>>>>>>>> Spark runner after the work on read from UnboundedSource
> > and
> > > > >>>>>>>> after
> > > > >>>>>>>>>>>>> GroupAlsoByWindow, but from my experience such a call
> > could
> > > > >>>>>>> move
> > > > >>>>>>>> us
> > > > >>>>>>>>>>>>> forward
> > > > >>>>>>>>>>>>> fast enough.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov <
> > > > >>>>>>>> kirpichov@google.com
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Hi all,
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Let us continue working on this. I am back from
> various
> > > > >>>>>>> travels
> > > > >>>>>>>>> and
> > > > >>>>>>>>>>> am
> > > > >>>>>>>>>>>>>> eager to help.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Amit, JB - would you like to perhaps have a videocall
> to
> > > > >>>>>> hash
> > > > >>>>>>>>> this
> > > > >>>>>>>>>>> out
> > > > >>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>> the Spark runner?
> > > > >>>>>>>>>>>>>>
> > > >
> > > >>>>>>>>>>>>>> Aljoscha - are the necessary Flink changes done / or is
> > the
> > > > >>>>>>>> need
> > > > >>>>>>>>>> for
> > > > >>>>>>>>>>>> them
> > > > >>>>>>>>>>>>>> obviated by using the (existing) runner-facing
> > state/timer
> > > > >>>>>>>> APIs?
> > > > >>>>>>>>>>>> Should we
> > > > >>>>>>>>>>>>>> have a videocall too?
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thomas - what do you think about getting this into
> Apex
> > > > >>>>>>> runner?
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> (I think videocalls will allow to make rapid progress,
> > but
> > > > >>>>>>> it's
> > > > >>>>>>>>>>>> probably a
> > > > >>>>>>>>>>>>>> better idea to keep them separate since they'll
> involve
> > a
> > > > >>>>>> lot
> > > > >>>>>>>> of
> > > > >>>>>>>>>>>>>> runner-specific details)
> > > > >>>>>>>>>>>>>>
> > > >
> > > >>>>>>>>>>>>>> PS - The completion of this in Dataflow streaming runner
> > is
> > > > >>>>>>>>>> currently
> > > > >>>>>>>>>>>>>> waiting only on having a small service-side change
> > > > >>>>>>> implemented
> > > > >>>>>>>>> and
> > > > >>>>>>>>>>>> rolled
> > > > >>>>>>>>>>>>>> out for termination of streaming jobs.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles <
> > > > >>>>>>>> klk@google.com>
> > > > >>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>
> > > >
> > > >>>>>>>>>>>>>> I recommend proceeding with the runner-facing state &
> > timer
> > > > >>>>>>>> APIs;
> > > > >>>>>>>>>>> they
> > > > >>>>>>>>>>>> are
> > > > >>>>>>>>>>>>>> lower-level and more appropriate for this. All runners
> > > > >>>>>>> provide
> > > > >>>>>>>>> them
> > > > >>>>>>>>>>> or
> > > > >>>>>>>>>>>> use
> > > > >>>>>>>>>>>>>> runners/core implementations, as they are needed for
> > > > >>>>>>>> triggering.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov <
> > > > >>>>>>>>>>>> kirpichov@google.com>
> > > > >>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks Aljoscha!
> > > > >>>>>>>>>>>>>>
> > > >
> > > >>>>>>>>>>>>>> Minor note: I'm not familiar with what level of support
> > for
> > > > >>>>>>>>> timers
> > > > >>>>>>>>>>>> Flink
> > > > >>>>>>>>>>>>>> currently has - however SDF in Direct and Dataflow
> > runner
> > > > >>>>>>>>> currently
> > > > >>>>>>>>>>>> does
> > > > >>>>>>>>>>>>>> not use the user-facing state/timer APIs - rather, it
> > uses
> > > > >>>>>>> the
> > > > >>>>>>>>>>>>>> runner-facing APIs (StateInternals and
> TimerInternals) -
> > > > >>>>>>>> perhaps
> > > > >>>>>>>>>>> Flink
> > > > >>>>>>>>>>>>>> already implements these. We may want to change this,
> > but
> > > > >>>>>> for
> > > > >>>>>>>> now
> > > > >>>>>>>>>>> it's
> > > > >>>>>>>>>>>> good
> > > > >>>>>>>>>>>>>> enough (besides, SDF uses watermark holds, which are
> not
> > > > >>>>>>>>> supported
> > > > >>>>>>>>>> by
> > > > >>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>> user-facing state API yet).
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek <
> > > > >>>>>>>>>>>>>> aljoscha@data-artisans.com> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks for the motivation, Eugene! :-)
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> I've wanted to do this for a while now but was waiting
> > for
> > > > >>>>>>> the
> > > > >>>>>>>>>> Flink
> > > > >>>>>>>>>>>> 1.2
> > > > >>>>>>>>>>>>>> release (which happened this week)! There's some
> > > > >>>>>> prerequisite
> > > > >>>>>>>>> work
> > > > >>>>>>>>>> to
> > > > >>>>>>>>>>>> be
> > > > >>>>>>>>>>>>>> done on the Flink runner: we'll move to the new timer
> > > > >>>>>>>> interfaces
> > > > >>>>>>>>>>>> introduced
> > > >
> > > >>>>>>>>>>>>>> in Flink 1.2 and implement support for both the user
> > facing
> > > > >>>>>>>> state
> > > > >>>>>>>>>> and
> > > > >>>>>>>>>>>> timer
> > > > >>>>>>>>>>>>>> APIs. This should make implementation of SDF easier.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <
> > > > >>>>>>>>>>> kirpichov@google.com
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks! Looking forward to this work.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <
> > > > >>>>>>>>>> jb@nanthrax.net
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks for the update Eugene.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> I will work on the spark runner with Amit.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Regards
> > > > >>>>>>>>>>>>>> JB
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov
> > > > >>>>>>>>>>>>>> <ki...@google.com.INVALID> wrote:
> > > > >>>>>>>>>>>>>>> Hello,
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> I'm almost done adding support for Splittable DoFn
> > > > >>>>>>>>>>>>>>> http://s.apache.org/splittable-do-fn to Dataflow
> > > > >>>>>> streaming
> > > > >>>>>>>>>> runner*,
> > > > >>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>> very excited about that. There's only 1 PR
> > > > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1898>
> remaining,
> > > > >>>>>> plus
> > > > >>>>>>>>>> enabling
> > > > >>>>>>>>>>>>>>> some
> > > > >>>>>>>>>>>>>>> tests.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> * (batch runner is much harder because it's not yet
> > quite
> > > > >>>>>>>> clear
> > > > >>>>>>>>> to
> > > > >>>>>>>>>>> me
> > > > >>>>>>>>>>>>>>> how
> > > > >>>>>>>>>>>>>>> to properly implement liquid sharding
> > > > >>>>>>>>>>>>>>> <
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>> https://cloud.google.com/blog/big-data/2016/05/no-shard-
> > > > >>>>>>>>> left-behind-dynamic-work-rebalancing-in-google-cloud-
> dataflow
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> with
> > > > >>>>>>>>>>>>>>> SDF - and the current API is not ready for that yet)
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> After implementing all the runner-agnostic parts of
> > > > >>>>>>> Splittable
> > > > >>>>>>>>>>> DoFn, I
> > > >
> > > >>>>>>>>>>>>>>> found them quite easy to integrate into Dataflow
> > streaming
> > > > >>>>>>>>> runner,
> > > > >>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>> I
> > > >
> > > >>>>>>>>>>>>>>> think this means it should be easy to integrate into
> > other
> > > > >>>>>>>>> runners
> > > > >>>>>>>>>>>> too.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> ====== Why it'd be cool ======
> > > > >>>>>>>>>>>>>>> The general benefits of SDF are well-described in the
> > > > >>>>>> design
> > > > >>>>>>>> doc
> > > > >>>>>>>>>>>>>>> (linked
> > > > >>>>>>>>>>>>>>> above).
> > > > >>>>>>>>>>>>>>> As for right now - if we integrated SDF with all
> > runners,
> > > > >>>>>>> it'd
> > > > >>>>>>>>>>> already
> > > > >>>>>>>>>>>>>>> enable us to start greatly simplifying the code of
> > > > >>>>>> existing
> > > > >>>>>>>>>>> streaming
> > > >
> > > >>>>>>>>>>>>>>> connectors (CountingInput, Kafka, Pubsub, JMS) and
> > writing
> > > > >>>>>>> new
> > > > >>>>>>>>>>>>>>> connectors
> > > > >>>>>>>>>>>>>>> (e.g. a really nice one to implement would be
> > "directory
> > > > >>>>>>>>> watcher",
> > > > >>>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>> continuously returns new files in a directory).
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> As a teaser, here's the complete implementation of an
> > > > >>>>>>>> "unbounded
> > > > >>>>>>>>>>>>>>> counter" I
> > > > >>>>>>>>>>>>>>> used for my test of Dataflow runner integration:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>  class CountFn extends DoFn<String, String> {
> > > > >>>>>>>>>>>>>>>    @ProcessElement
> > > > >>>>>>>>>>>>>>> public ProcessContinuation process(ProcessContext c,
> > > > >>>>>>>>>>>> OffsetRangeTracker
> > > > >>>>>>>>>>>>>>> tracker) {
> > > > >>>>>>>>>>>>>>>      for (int i =
> > tracker.currentRestriction().getFrom();
> > > > >>>>>>>>>>>>>>> tracker.tryClaim(i); ++i) c.output(i);
> > > > >>>>>>>>>>>>>>>      return resume();
> > > > >>>>>>>>>>>>>>>    }
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>    @GetInitialRestriction
> > > > >>>>>>>>>>>>>>>    public OffsetRange getInitialRange(String
> element) {
> > > > >>>>>>>> return
> > > > >>>>>>>>>> new
> > > > >>>>>>>>>>>>>>> OffsetRange(0, Integer.MAX_VALUE); }
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>    @NewTracker
> > > > >>>>>>>>>>>>>>>   public OffsetRangeTracker newTracker(OffsetRange
> > > > >>>>>> range) {
> > > > >>>>>>>>>> return
> > > > >>>>>>>>>>>> new
> > > > >>>>>>>>>>>>>>> OffsetRangeTracker(range); }
> > > > >>>>>>>>>>>>>>>  }
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> ====== What I'm asking ======
> > > > >>>>>>>>>>>>>>> So, I'd like to ask for help integrating SDF into
> > Spark,
> > > > >>>>>>> Flink
> > > > >>>>>>>>> and
> > > > >>>>>>>>>>>> Apex
> > > > >>>>>>>>>>>>>>> runners from people who are intimately familiar with
> > them
> > > > >>>>>> -
> > > > >>>>>>>>>>>>>>> specifically, I
> > > > >>>>>>>>>>>>>>> was hoping best-case I could nerd-snipe some of you
> > into
> > > > >>>>>>>> taking
> > > > >>>>>>>>>> over
> > > > >>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>> integration of SDF with your favorite runner ;)
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> The proper set of people seems to be +Aljoscha
> Krettek
> > > > >>>>>>>>>>>>>>> <al...@data-artisans.com> +Maximilian Michels
> > > > >>>>>>>>>>>>>>> <ma...@data-artisans.com>
> > > > >>>>>>>>>>>>>>> +iemejia@gmail.com <ie...@gmail.com> +Amit Sela
> > > > >>>>>>>>>>>>>>> <am...@gmail.com> +Thomas
> > > > >>>>>>>>>>>>>>> Weise unless I forgot somebody.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Average-case, I was looking for runner-specific
> > guidance
> > > > >>>>>> on
> > > > >>>>>>>> how
> > > > >>>>>>>>> to
> > > > >>>>>>>>>>> do
> > > > >>>>>>>>>>>>>>> it
> > > > >>>>>>>>>>>>>>> myself.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> ====== If you want to help ======
> > > >
> > > >>>>>>>>>>>>>>> If somebody decides to take this over, in my absence
> > (I'll
> > > > >>>>>>> be
> > > > >>>>>>>>>> mostly
> > > > >>>>>>>>>>>>>>> gone
> > > > >>>>>>>>>>>>>>> for ~the next month)., the best people to ask for
> > > > >>>>>>>> implementation
> > > > >>>>>>>>>>>>>>> advice are +Kenn
> > > > >>>>>>>>>>>>>>> Knowles <kl...@google.com> and +Daniel Mills <
> > > > >>>>>>> millsd@google.com
> > > > >>>>>>>>>
> > > > >>>>>>>>> .
> > > > >>>>>>>>>>>>>>>
> > > >
> > > >>>>>>>>>>>>>>> For reference, here's how SDF is implemented in the
> > direct
> > > > >>>>>>>>> runner:
> > > > >>>>>>>>>>>>>>> - Direct runner overrides
> > > > >>>>>>>>>>>>>>> <
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b
> > > > >>>>>>>>> 74a62d9b24/runners/direct-java/src/main/java/org/apache/
> > > > >>>>>>>>> beam/runners/direct/ParDoMultiOverrideFactory.java
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> ParDo.of() for a splittable DoFn and replaces it with
> > > > >>>>>>>>>>> SplittableParDo
> > > > >>>>>>>>>>>>>>> <
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>> https://github.com/apache/beam/blob/master/runners/core-
> > > >
> > > >>>>>>>>>
> > java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> (common
> > > > >>>>>>>>>>>>>>> transform expansion)
> > > > >>>>>>>>>>>>>>> - SplittableParDo uses two runner-specific primitive
> > > > >>>>>>>> transforms:
> > > > >>>>>>>>>>>>>>> "GBKIntoKeyedWorkItems" and
> > "SplittableProcessElements".
> > > > >>>>>>>> Direct
> > > > >>>>>>>>>>> runner
> > > > >>>>>>>>>>>>>>> overrides the first one like this
> > > > >>>>>>>>>>>>>>> <
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> > > > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
> > > >
> > > >>>>>>>>>
> > beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
> > > > >>>>>>>>>>>>>>> ,
> > > > >>>>>>>>>>>>>>> and directly implements evaluation of the second one
> > like
> > > > >>>>>>> this
> > > > >>>>>>>>>>>>>>> <
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> > > > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
> > > >
> > > >>>>>>>>>
> > beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
> > > > >>>>>>>>>>>>>>> ,
> > > > >>>>>>>>>>>>>>> using runner hooks introduced in this PR
> > > > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1824>. At the
> > core
> > > > >>>>>> of
> > > > >>>>>>>> the
> > > > >>>>>>>>>>> hooks
> > > > >>>>>>>>>>>> is
> > > > >>>>>>>>>>>>>>> "ProcessFn" which is like a regular DoFn but has to
> be
> > > > >>>>>>>> prepared
> > > > >>>>>>>>> at
> > > > >>>>>>>>>>>>>>> runtime
> > > > >>>>>>>>>>>>>>> with some hooks (state, timers, and runner access to
> > > > >>>>>>>>>>>>>>> RestrictionTracker)
> > > >
> > > >>>>>>>>>>>>>>> before you invoke it. I added a convenience
> > implementation
> > > > >>>>>>> of
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>> hook
> > > > >>>>>>>>>>>>>>> mimicking behavior of UnboundedSource.
> > > > >>>>>>>>>>>>>>> - The relevant runner-agnostic tests are in
> > > > >>>>>>> SplittableDoFnTest
> > > > >>>>>>>>>>>>>>> <
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> > > > >>>>>>>>> 99024d3a1f/sdks/java/core/src/
> test/java/org/apache/beam/sdk/
> > > > >>>>>>>>> transforms/SplittableDoFnTest.java
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> .
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> That's all it takes, really - the runner has to
> > implement
> > > > >>>>>>>> these
> > > > >>>>>>>>>> two
> > > > >>>>>>>>>>>>>>> transforms. When I looked at Spark and Flink runners,
> > it
> > > > >>>>>> was
> > > > >>>>>>>> not
> > > > >>>>>>>>>>> quite
> > > > >>>>>>>>>>>>>>> clear to me how to implement the
> GBKIntoKeyedWorkItems
> > > > >>>>>>>>> transform,
> > > > >>>>>>>>>>> e.g.
> > > > >>>>>>>>>>>>>>> Spark runner currently doesn't use KeyedWorkItem at
> > all -
> > > > >>>>>>> but
> > > > >>>>>>>> it
> > > > >>>>>>>>>>> seems
> > > > >>>>>>>>>>>>>>> definitely possible.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Thanks!
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> --
> > > > >>>>>>>>>>>>>> Data Artisans GmbH | Stresemannstr. 121A | 10963
> Berlin
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> info@data-artisans.com
> > > >
> > > >>>>>>>>>>>>>> +49-(0)30-55599146 <+49%2030%2055599146>
> > <+49%2030%2055599146> <+49%2030%2055599146>
> > > > >>>>>> <+49%2030%2055599146>
> > > > >>>>>>>> <+49%2030%2055599146> <+49%2030%2055599146>
> > > > >>>>>>>>>> <+49%2030%2055599146 <(205)%20559-9146>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg - HRB 158244
> B
> > > > >>>>>>>>>>>>>> Managing Directors: Kostas Tzoumas, Stephan Ewen
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>> --
> > > > >>>> Jean-Baptiste Onofré
> > > > >>>> jbonofre@apache.org
> > > > >>>> http://blog.nanthrax.net
> > > > >>>> Talend - http://www.talend.com
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >> --
> > > > >> Jean-Baptiste Onofré
> > > > >> jbonofre@apache.org
> > > > >> http://blog.nanthrax.net
> > > > >> Talend - http://www.talend.com
> > > >
> > > > --
> > > > Jean-Baptiste Onofré
> > > > jbonofre@apache.org
> > > > http://blog.nanthrax.net
> > > > Talend - http://www.talend.com
> > >
> >
>