You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Kenneth Knowles <kl...@google.com> on 2018/03/09 21:48:50 UTC

Re: Advice on parallelizing network calls in DoFn

I will start with the "exciting futuristic" answer, which is that we
envision the new DoFn to be able to provide an automatic ExecutorService
parameters that you can use as you wish.

    new DoFn<>() {
      @ProcessElement
      public void process(ProcessContext ctx, ExecutorService
executorService) {
          ... launch some futures, put them in instance vars ...
      }

      @FinishBundle
      public void finish(...) {
         ... block on futures, output results if appropriate ...
      }
    }

This way, the Java SDK harness can use its overarching knowledge of what is
going on in a computation to, for example, share a thread pool between
different bits. This was one reason to delete IntraBundleParallelization -
it didn't allow the runner and user code to properly manage how many things
were going on concurrently. And mostly the runner should own parallelizing
to max out cores and what user code needs is asynchrony hooks that can
interact with that. However, this feature is not thoroughly considered. TBD
how much the harness itself manages blocking on outstanding requests versus
it being your responsibility in FinishBundle, etc.

I haven't explored rolling your own here, if you are willing to do the knob
tuning to get the threading acceptable for your particular use case.
Perhaps someone else can weigh in.

Kenn

On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <jo...@bounceexchange.com>
wrote:

> Hello all:
>
> Our team has a pipeline that make external network calls. These pipelines
> are currently super slow, and the hypothesis is that they are slow because
> we are not threading for our network calls. The github issue below provides
> some discussion around this:
>
> https://github.com/apache/beam/pull/957
>
> In beam 1.0, there was IntraBundleParallelization, which helped with this.
> However, this was removed because it didn't comply with a few BEAM
> paradigms.
>
> Questions going forward:
>
> What is advised for jobs that make blocking network calls? It seems
> bundling the elements into groups of size X prior to passing to the DoFn,
> and managing the threading within the function might work. thoughts?
> Are these types of jobs even suitable for beam?
> Are there any plans to develop features that help with this?
>
> Thanks
>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
Le 10 mars 2018 20:09, "Kenneth Knowles" <kl...@google.com> a écrit :

Nice! I agree that providing a CompletionStage for chaining is much better
than an ExecutorService, and very clear.

It is very feasible to add support that looks like

  new DoFn<InputT, OutputT>() {
    @ProcessElement
    public void process(@Element CompletionStage<InputT> element, ...) {
      element.thenApply(...)
    }
  }

If we had this available, I think users could even experiment with this
often as it might help even where it isn't obvious.

My main hesitation is that big part of Beam is giving a basic/imperative
style of programming a DoFn that executes in a very smart
functional/parallel way. Full future-oriented programming is not explored
much outside of Javascript (and maybe Haskell) and requires greater
discipline in programming in a functional manner - if you are mutating
stuff in your callback you are going to have bugs, and then when you add
concurrency control you are going to have bad performance and deadlocks. So
I definitely wouldn't make it the default or want to spend all our support
effort on teaching advanced programming technique.


This is true but isnt it true that batch or perf related softs need
discipline as well? Add big data is often perf and I guess it can be ok
after all. But I ack beam can somehow embrace both models, reactive style
will just not be much beneficial until the full chain is reactive ;).

I need to push for some pending PRs - on beam side - next week but
hopefully i can poc what i have in mind next month to share more (it is not
only about reactive programming but mainly java 8 enhancements).



Kenn

On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <rm...@gmail.com>
wrote:

>
>
> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>
>> Have you considered drafting in detail what you think this API might look
>> like?
>>
>
>
> Yes, but it is after the "enhancements" - for my use cases - and "bugs"
> list so didn't started to work on it much.
>
>
>>
>> If it's a radically different API, it might be more appropriate as an
>> alternative parallel Beam API rather than a replacement for the current API
>> (there is also one such fluent API in the works).
>>
>
> What I plan is to draft it on top of beam (so the "useless" case I spoke
> about before) and then propose to impl it ~natively and move it as main API
> for another major.
>
>
>
>
>>
>>
>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <rm...@gmail.com>
>> wrote:
>>
>>>
>>>
>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>
>>>> This is another version (maybe a better, Java 8 idiomatic one?) of what
>>>> Kenn suggested.
>>>>
>>>> Note that with NewDoFn this need not be incompatible (so might not
>>>> require waiting till Beam 3.0). We can recognize new parameters to
>>>> processElement and populate add needed.
>>>>
>>>
>>> This is right however in my head it was a single way movemenent to
>>> enforce the design to be reactive and not fake a reactive API with a sync
>>> and not reactive impl which is what would be done today with both support I
>>> fear.
>>>
>>>
>>>>
>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>> rmannibucau@gmail.com> wrote:
>>>>
>>>>> Yes, for the dofn for instance, instead of having
>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>> it as well.
>>>>>
>>>>> This way you register an execution chain. Mixed with streams you get a
>>>>> big data java 8/9/10 API which enabkes any connectivity in a wel performing
>>>>> manner ;).
>>>>>
>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>>
>>>>>> So you mean the user should have a way of registering asynchronous
>>>>>> activity with a callback (the callback must be registered with Beam,
>>>>>> because Beam needs to know not to mark the element as done until all
>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>> was suggesting, unless I'm missing something.
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>
>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>> when remote IO are involved.
>>>>>>>
>>>>>>> However it requires to break source, sdf design to use
>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>> an unified fashion.
>>>>>>>
>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>
>>>>>>> If you're talking about reactive programming, at a certain level
>>>>>>> beam is already reactive. Are you referring to a specific way of writing
>>>>>>> the code?
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> What do you mean by reactive?
>>>>>>>>
>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to
>>>>>>>>> scale way more without having to hardly synchronize multithreading. Elegant
>>>>>>>>> and efficient :). Beam 3?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>>>>>>
>>>>>>>>>> I will start with the "exciting futuristic" answer, which is that
>>>>>>>>>> we envision the new DoFn to be able to provide an automatic ExecutorService
>>>>>>>>>> parameters that you can use as you wish.
>>>>>>>>>>
>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>       @ProcessElement
>>>>>>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>>>>>>> executorService) {
>>>>>>>>>>           ... launch some futures, put them in instance vars ...
>>>>>>>>>>       }
>>>>>>>>>>
>>>>>>>>>>       @FinishBundle
>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>          ... block on futures, output results if appropriate ...
>>>>>>>>>>       }
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>> This way, the Java SDK harness can use its overarching knowledge
>>>>>>>>>> of what is going on in a computation to, for example, share a thread pool
>>>>>>>>>> between different bits. This was one reason to delete
>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>
>>>>>>>>>> I haven't explored rolling your own here, if you are willing to
>>>>>>>>>> do the knob tuning to get the threading acceptable for your particular use
>>>>>>>>>> case. Perhaps someone else can weigh in.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello all:
>>>>>>>>>>>
>>>>>>>>>>> Our team has a pipeline that make external network calls. These
>>>>>>>>>>> pipelines are currently super slow, and the hypothesis is that they are
>>>>>>>>>>> slow because we are not threading for our network calls. The github issue
>>>>>>>>>>> below provides some discussion around this:
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>
>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which helped
>>>>>>>>>>> with this. However, this was removed because it didn't comply with a few
>>>>>>>>>>> BEAM paradigms.
>>>>>>>>>>>
>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>
>>>>>>>>>>> What is advised for jobs that make blocking network calls? It
>>>>>>>>>>> seems bundling the elements into groups of size X prior to passing to the
>>>>>>>>>>> DoFn, and managing the threading within the function might work. thoughts?
>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
Le 11 mars 2018 23:29, "Kenneth Knowles" <kl...@google.com> a écrit :

SPI style is a huge penalty for clarity, hence reliability. You have to
weigh the pros/cons. Unless you absolutely must use it* you are almost
always better off without it.

For DoFn:

 - not very many types of parameters


Cause you cant so it is a constraint and not a need IMHO.

 - all known statically


Same.

 - parameters are not independent to validate or execute


No real reason it is the case, technically at least.

 - no reason for them to be extended by others


Workaround is to use static utilities with a state in the fn, not that neat.



So SPI is a poor fit. The current solution already works well and addresses
your proposal easily - it was designed for it. Performance is also an issue
that you may struggle with if you try to rewrite the whole thing.


Current solution doesnt work, i cant ude my own types which would avoid
workarounds to impl custom logic as old style utilities :(.

Perf overhead is "null" since you load the spi once.



I'm not saying you can't try things out - feel free. But I just want to
warn you that it will be a large and complex task where the result is very
likely to not work out.

Kenn


* Example of a good uses: runner+option registry, non-primitive transform
translation registry, filesystem registry. Notably, the use for coder
registry is only good if it is for new types - using it to clobber existing
types creates unclear control flow.

On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <rm...@gmail.com>
wrote:

> makes me think beam should maybe do 2 internals changes before moving
> forward on (s)df API changes:
>
> 1. define a beam singleton per JVM (classloader hierarchy actually but you
> get the idea I think) which can be used to store things locally for reuse -
> see 2 for an example or metrics pusher work Etienne does could benefit from
> it too
> 2. define a SPI to load (s)dofn parameter provider instead of having an
> ArgProvider which provides everything which is supported. This way you can
> use any kind of parameter and the parameterproviders can use 1. to handle
> their own state. First impl of the parameterprovider SPI would be a) state
> b) timer c) reactive handlers and potentially user parameter providers
> (service like which can be singleton in the scope of a "JVM" thanks to 1).
>
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
>
>> Yep. Introduce OutputEmitter, and Process context no longer has much use.
>>
>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <rm...@gmail.com>
>> wrote:
>>
>>> Which is still a key feature for sdf but agree it can be dropped for an
>>> outputemitter pattern and the dofn moved to a plain parameters injection
>>> based pattern. Both (which completionstage) stays compatible :).
>>>
>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>>
>>>> I think process context should go away completely. At that point it has
>>>> little use except for a way to send output downstream.
>>>>
>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <rm...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hmm, thinking out loud but completionstage should/could be extended to
>>>>> replace processcontext since it represents element and output at the same
>>>>> time no?
>>>>>
>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>>
>>>>>> Yea, I think it could. But it is probably more readable to not
>>>>>> overload the term, plus certainly a bit simpler in implementation. So
>>>>>> perhaps @AsyncElement to make it very clear.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Ken, can NewDoFn distinguish at generation time the difference
>>>>>>> between:
>>>>>>>
>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>> ...) {
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>>     public void process(@Element Input element, ...) {
>>>>>>>
>>>>>>> If not, then we would probably need separate annotations....
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is much
>>>>>>>> better than an ExecutorService, and very clear.
>>>>>>>>
>>>>>>>> It is very feasible to add support that looks like
>>>>>>>>
>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>     @ProcessElement
>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>> ...) {
>>>>>>>>       element.thenApply(...)
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>>
>>>>>>>> If we had this available, I think users could even experiment with
>>>>>>>> this often as it might help even where it isn't obvious.
>>>>>>>>
>>>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>>>> basic/imperative style of programming a DoFn that executes in a very smart
>>>>>>>> functional/parallel way. Full future-oriented programming is not
>>>>>>>> explored much outside of Javascript (and maybe Haskell) and requires
>>>>>>>> greater discipline in programming in a functional manner - if you are
>>>>>>>> mutating stuff in your callback you are going to have bugs, and then when
>>>>>>>> you add concurrency control you are going to have bad performance and
>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>
>>>>>>>>>> Have you considered drafting in detail what you think this API
>>>>>>>>>> might look like?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases - and
>>>>>>>>> "bugs" list so didn't started to work on it much.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> If it's a radically different API, it might be more appropriate
>>>>>>>>>> as an alternative parallel Beam API rather than a replacement for the
>>>>>>>>>> current API (there is also one such fluent API in the works).
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What I plan is to draft it on top of beam (so the "useless" case I
>>>>>>>>> spoke about before) and then propose to impl it ~natively and move it as
>>>>>>>>> main API for another major.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>
>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic one?)
>>>>>>>>>>>> of what Kenn suggested.
>>>>>>>>>>>>
>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so might
>>>>>>>>>>>> not require waiting till Beam 3.0). We can recognize new parameters to
>>>>>>>>>>>> processElement and populate add needed.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This is right however in my head it was a single way movemenent
>>>>>>>>>>> to enforce the design to be reactive and not fake a reactive API with a
>>>>>>>>>>> sync and not reactive impl which is what would be done today with both
>>>>>>>>>>> support I fear.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>>>>>>>> it as well.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This way you register an execution chain. Mixed with streams
>>>>>>>>>>>>> you get a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>>>>>>>>> performing manner ;).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>
>>>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be registered with
>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done until all
>>>>>>>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If you're talking about reactive programming, at a certain
>>>>>>>>>>>>>>> level beam is already reactive. Are you referring to a specific way of
>>>>>>>>>>>>>>> writing the code?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would
>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly synchronize multithreading.
>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com>
>>>>>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which
>>>>>>>>>>>>>>>>>> is that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in instance
>>>>>>>>>>>>>>>>>> vars ...
>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, share a
>>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading acceptable for your
>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network
>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the hypothesis is that
>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our network calls. The
>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which
>>>>>>>>>>>>>>>>>>> helped with this. However, this was removed because it didn't comply with a
>>>>>>>>>>>>>>>>>>> few BEAM paradigms.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network
>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of size X prior to
>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within the function might
>>>>>>>>>>>>>>>>>>> work. thoughts?
>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help with
>>>>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Kenneth Knowles <kl...@google.com>.
SPI style is a huge penalty for clarity, hence reliability. You have to
weigh the pros/cons. Unless you absolutely must use it* you are almost
always better off without it.

For DoFn:

 - not very many types of parameters
 - all known statically
 - parameters are not independent to validate or execute
 - no reason for them to be extended by others

So SPI is a poor fit. The current solution already works well and addresses
your proposal easily - it was designed for it. Performance is also an issue
that you may struggle with if you try to rewrite the whole thing.

I'm not saying you can't try things out - feel free. But I just want to
warn you that it will be a large and complex task where the result is very
likely to not work out.

Kenn


* Example of a good uses: runner+option registry, non-primitive transform
translation registry, filesystem registry. Notably, the use for coder
registry is only good if it is for new types - using it to clobber existing
types creates unclear control flow.

On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <rm...@gmail.com>
wrote:

> makes me think beam should maybe do 2 internals changes before moving
> forward on (s)df API changes:
>
> 1. define a beam singleton per JVM (classloader hierarchy actually but you
> get the idea I think) which can be used to store things locally for reuse -
> see 2 for an example or metrics pusher work Etienne does could benefit from
> it too
> 2. define a SPI to load (s)dofn parameter provider instead of having an
> ArgProvider which provides everything which is supported. This way you can
> use any kind of parameter and the parameterproviders can use 1. to handle
> their own state. First impl of the parameterprovider SPI would be a) state
> b) timer c) reactive handlers and potentially user parameter providers
> (service like which can be singleton in the scope of a "JVM" thanks to 1).
>
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
>
>> Yep. Introduce OutputEmitter, and Process context no longer has much use.
>>
>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <rm...@gmail.com>
>> wrote:
>>
>>> Which is still a key feature for sdf but agree it can be dropped for an
>>> outputemitter pattern and the dofn moved to a plain parameters injection
>>> based pattern. Both (which completionstage) stays compatible :).
>>>
>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>>
>>>> I think process context should go away completely. At that point it has
>>>> little use except for a way to send output downstream.
>>>>
>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <rm...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hmm, thinking out loud but completionstage should/could be extended to
>>>>> replace processcontext since it represents element and output at the same
>>>>> time no?
>>>>>
>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>>
>>>>>> Yea, I think it could. But it is probably more readable to not
>>>>>> overload the term, plus certainly a bit simpler in implementation. So
>>>>>> perhaps @AsyncElement to make it very clear.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Ken, can NewDoFn distinguish at generation time the difference
>>>>>>> between:
>>>>>>>
>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>> ...) {
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>>     public void process(@Element Input element, ...) {
>>>>>>>
>>>>>>> If not, then we would probably need separate annotations....
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is much
>>>>>>>> better than an ExecutorService, and very clear.
>>>>>>>>
>>>>>>>> It is very feasible to add support that looks like
>>>>>>>>
>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>     @ProcessElement
>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>> ...) {
>>>>>>>>       element.thenApply(...)
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>>
>>>>>>>> If we had this available, I think users could even experiment with
>>>>>>>> this often as it might help even where it isn't obvious.
>>>>>>>>
>>>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>>>> basic/imperative style of programming a DoFn that executes in a very smart
>>>>>>>> functional/parallel way. Full future-oriented programming is not
>>>>>>>> explored much outside of Javascript (and maybe Haskell) and requires
>>>>>>>> greater discipline in programming in a functional manner - if you are
>>>>>>>> mutating stuff in your callback you are going to have bugs, and then when
>>>>>>>> you add concurrency control you are going to have bad performance and
>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>
>>>>>>>>>> Have you considered drafting in detail what you think this API
>>>>>>>>>> might look like?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases - and
>>>>>>>>> "bugs" list so didn't started to work on it much.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> If it's a radically different API, it might be more appropriate
>>>>>>>>>> as an alternative parallel Beam API rather than a replacement for the
>>>>>>>>>> current API (there is also one such fluent API in the works).
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What I plan is to draft it on top of beam (so the "useless" case I
>>>>>>>>> spoke about before) and then propose to impl it ~natively and move it as
>>>>>>>>> main API for another major.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>
>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic one?)
>>>>>>>>>>>> of what Kenn suggested.
>>>>>>>>>>>>
>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so might
>>>>>>>>>>>> not require waiting till Beam 3.0). We can recognize new parameters to
>>>>>>>>>>>> processElement and populate add needed.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This is right however in my head it was a single way movemenent
>>>>>>>>>>> to enforce the design to be reactive and not fake a reactive API with a
>>>>>>>>>>> sync and not reactive impl which is what would be done today with both
>>>>>>>>>>> support I fear.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>>>>>>>> it as well.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This way you register an execution chain. Mixed with streams
>>>>>>>>>>>>> you get a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>>>>>>>>> performing manner ;).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>
>>>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be registered with
>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done until all
>>>>>>>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If you're talking about reactive programming, at a certain
>>>>>>>>>>>>>>> level beam is already reactive. Are you referring to a specific way of
>>>>>>>>>>>>>>> writing the code?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would
>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly synchronize multithreading.
>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com>
>>>>>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which
>>>>>>>>>>>>>>>>>> is that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in instance
>>>>>>>>>>>>>>>>>> vars ...
>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, share a
>>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading acceptable for your
>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network
>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the hypothesis is that
>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our network calls. The
>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which
>>>>>>>>>>>>>>>>>>> helped with this. However, this was removed because it didn't comply with a
>>>>>>>>>>>>>>>>>>> few BEAM paradigms.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network
>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of size X prior to
>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within the function might
>>>>>>>>>>>>>>>>>>> work. thoughts?
>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help with
>>>>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
Le 13 mars 2018 18:45, "Lukasz Cwik" <lc...@google.com> a écrit :

Thanks for the data as it is clear that the fast completion stage doubles
the overhead of the code segment that your going through and the regular
completion stage quadruples the overhead.

Its good that you also added a simple function and compared the run since
it gives relative overhead that could be used as a baseline for simple
lambdas (which are pretty common). When I ran your benchmark, I found that
the stringLength function using the beam implementation will be 2.5% faster
then the beamFastCompletionStage.

From experience I have found that performance suffers because of many
little bits of codes that seemingly don't add much but when added together
represent a pretty big system inefficiency.


Yes and no, multiply by 1000 the number of nodes of the dag - which is
already a big one - and you stay at a very high injection rate only a fes
backend can absorb.

Also dont forget this bench ignores the beam wrappers which are here in all
runners.

So at the end if beam becomes reactive it will still benefit from it, if it
doesnt it will be a bit slower but lukely not significantly enough for real
pipelines which are way under 100000ops/s/thread in general.






On Tue, Mar 13, 2018 at 2:43 AM, Romain Manni-Bucau <rm...@gmail.com>
wrote:

> Here are some figures (small warn is I did priviledge beam a lot in this
> benchmark, a bit more than it should in a real impl, I'll say more about it
> after):
>
> I copied the code at:
>
> https://gist.github.com/rmannibucau/fd98fb6a10f9557613fb145c8e7e2de1
>
> And results at:
>
> https://gist.github.com/rmannibucau/3d3d0d61e85d45f2959900f208e71c5e
>
> Summary is:
>
> Benchmark Mode Cnt Score Error Units
> Comparison.beam thrpt 5 217003888,048 ± 4951801,578 ops/s
> Comparison.beamCompletionStage thrpt 5 55437442,544 ± 1750845,680 ops/s
> Comparison.beamFastCompletionStage thrpt 5 128422225,642 ± 3215651,832
> ops/s
> Comparison.defaultCompletionStage thrpt 5 57284761,644 ± 1022305,051 ops/s
> Comparison.fastCompletionStage thrpt 5 202739102,801 ± 5384078,170 ops/s
> Comparison.passthrough thrpt 5 391503537,773 ± 6147672,843 ops/s
>
>
> Comments:
> 1. completionstage then*(Function|Consumer) overhead is visible and
> creating more objects than beam it is a bit slower
> 2. it is trivial to make completionstage faster than the default in all
> the "precomputed" cases beam can know about (we mentionned it earlier) and
> it falls in the "fastCompletionStage" benchmarks which are x2 adapted to
> beam case (and allows to dropp one instantiation and compare and swap logic)
> 3. the diff between the fast completionstage and beam is mainly about the
> creation of one more wrapper + lambda handling
> 4. however figures stays high (milllions ops/s!). if you add any IO or
> real computation into the mix you will converge to a sensitively close
> number
> 5. don't forget that being based on CompletionStage we can become
> reactive/async and scale more appropriately if runners embrace that and at
> the end be faster anyway (the old NIO vs BIO topic ;))
>
> To summarize: I don't think these figures means that completionstage is a
> bad fit for beam but actually the opposite even if the raw difference is
> significative. It still shows that the completionstage based throughoutput
> is high enough to fit real world batches IMHO.
>
> Just to illustrate that point, if I just do a System.getProperty("
> user.name").length() (my username being rmannibucau):
>
> @Benchmark
> public void stringLength(final Blackhole blackhole) {
>     blackhole.consume(System.getProperty("user.name").length());
> }
>
> the execution is slower than completionfuture in beam case:
>
> Comparison.stringLength             thrpt    5   42428432,508 ±
> 4555866,693  ops/s
>
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com/> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
> 2018-03-12 18:58 GMT+01:00 Romain Manni-Bucau <rm...@gmail.com>:
> >
> > No more but can try to gather some figures and compare it to beam dofn
> overhead which should be at the same level or a bit more here since it is
> never unwrapped whereas completionfuture is a raw code chain without beam
> in the middle.
> >
> > Le 12 mars 2018 18:18, "Lukasz Cwik" <lc...@google.com> a écrit :
> >>
> >> Do you have data that supports this?
> >>
> >> Note that in reality for something like passing an element between
> DoFns, the constant in o(1) actually matters. Decreasing SDK harness
> overhead is a good thing though.
> >>
> >> On Mon, Mar 12, 2018 at 10:14 AM, Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>
> >>> By itself just the overhead of instantiating a wrapper (so nothing
> with the recent JVM GC improvement done for the stream/optional usages).
> After if you use the chaining you have a light overhead but still o(1) you
> can desire to skip when doing sync code but which will enable you to run
> way faster doing IO/async code by optimizing the CPU usage properly when
> you tune your slaves/workers. So tempted to summarize it as "has an
> overhead allowing to not run slower". It doesn't prevent beam to still
> expose a synchronous API collapse at evaluation time in a single fn which
> will give you the best of both worlds.
> >>>
> >>>
> >>> Romain Manni-Bucau
> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
> >>>
> >>> 2018-03-12 18:08 GMT+01:00 Lukasz Cwik <lc...@google.com>:
> >>>>
> >>>> It is expected that SDKs will have all their cores fully utilized by
> processing bundles in parallel and not by performing intrabundle
> parallelization. This allows for DoFns to be chained together via regular
> method calls because the overhead to pass a single element through all the
> DoFn's should be as minimal as possible
> >>>>
> >>>> What is the overhead of using a completion stage vs using a regular
> method call?
> >>>>
> >>>> On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>
> >>>>>
> >>>>>
> >>>>> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit :
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>
> >>>>>> makes me think beam should maybe do 2 internals changes before
> moving forward on (s)df API changes:
> >>>>>>
> >>>>>> 1. define a beam singleton per JVM (classloader hierarchy actually
> but you get the idea I think) which can be used to store things locally for
> reuse - see 2 for an example or metrics pusher work Etienne does could
> benefit from it too
> >>>>>
> >>>>>
> >>>>> I think we do need something like this, but it needs to be a bit
> more than a singleton per JVM. For one thing it needs to be at least per
> pipeline within a JVM. You might run multiple tests in a single JVM, and it
> should also be possible to run those tests in parallel without the static
> state interfering with each other. I also think the state needs to be
> addressable per step (i.e. a ParDo can look up its static state without
> caring about static state belonging to another ParDo).
> >>>>>
> >>>>>
> >>>>> Agree but you can register in a singleton the pipeline (as "ref" not
> as instance) and therefore hit the same need. +1 to have scopes (singleton,
> pipeline, thread) but it still requires a single singleton to handle
> serialization ;).
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>> 2. define a SPI to load (s)dofn parameter provider instead of
> having an ArgProvider which provides everything which is supported. This
> way you can use any kind of parameter and the parameterproviders can use 1.
> to handle their own state. First impl of the parameterprovider SPI would be
> a) state b) timer c) reactive handlers and potentially user parameter
> providers (service like which can be singleton in the scope of a "JVM"
> thanks to 1).
> >>>>>>
> >>>>>>
> >>>>>> Romain Manni-Bucau
> >>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
> >>>>>>
> >>>>>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
> >>>>>>>
> >>>>>>> Yep. Introduce OutputEmitter, and Process context no longer has
> much use.
> >>>>>>>
> >>>>>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>> Which is still a key feature for sdf but agree it can be dropped
> for an outputemitter pattern and the dofn moved to a plain parameters
> injection based pattern. Both (which completionstage) stays compatible :).
> >>>>>>>>
> >>>>>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
> >>>>>>>>>
> >>>>>>>>> I think process context should go away completely. At that point
> it has little use except for a way to send output downstream.
> >>>>>>>>>
> >>>>>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hmm, thinking out loud but completionstage should/could be
> extended to replace processcontext since it represents element and output
> at the same time no?
> >>>>>>>>>>
> >>>>>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a
> écrit :
> >>>>>>>>>>>
> >>>>>>>>>>> Yea, I think it could. But it is probably more readable to not
> overload the term, plus certainly a bit simpler in implementation. So
> perhaps @AsyncElement to make it very clear.
> >>>>>>>>>>>
> >>>>>>>>>>> Kenn
> >>>>>>>>>>>
> >>>>>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com>
> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Ken, can NewDoFn distinguish at generation time the
> difference between:
> >>>>>>>>>>>>
> >>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
> element, ...) {
> >>>>>>>>>>>>
> >>>>>>>>>>>> and
> >>>>>>>>>>>>
> >>>>>>>>>>>>     public void process(@Element Input element, ...) {
> >>>>>>>>>>>>
> >>>>>>>>>>>> If not, then we would probably need separate annotations....
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <
> klk@google.com> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining
> is much better than an ExecutorService, and very clear.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It is very feasible to add support that looks like
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>   new DoFn<InputT, OutputT>() {
> >>>>>>>>>>>>>     @ProcessElement
> >>>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
> element, ...) {
> >>>>>>>>>>>>>       element.thenApply(...)
> >>>>>>>>>>>>>     }
> >>>>>>>>>>>>>   }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If we had this available, I think users could even
> experiment with this often as it might help even where it isn't obvious.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> My main hesitation is that big part of Beam is giving a
> basic/imperative style of programming a DoFn that executes in a very smart
> functional/parallel way. Full future-oriented programming is not explored
> much outside of Javascript (and maybe Haskell) and requires greater
> discipline in programming in a functional manner - if you are mutating
> stuff in your callback you are going to have bugs, and then when you add
> concurrency control you are going to have bad performance and deadlocks. So
> I definitely wouldn't make it the default or want to spend all our support
> effort on teaching advanced programming technique.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Kenn
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Have you considered drafting in detail what you think this
> API might look like?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases
> - and "bugs" list so didn't started to work on it much.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> If it's a radically different API, it might be more
> appropriate as an alternative parallel Beam API rather than a replacement
> for the current API (there is also one such fluent API in the works).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless"
> case I spoke about before) and then propose to impl it ~natively and move
> it as main API for another major.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> This is another version (maybe a better, Java 8
> idiomatic one?) of what Kenn suggested.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so
> might not require waiting till Beam 3.0). We can recognize new parameters
> to processElement and populate add needed.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> This is right however in my head it was a single way
> movemenent to enforce the design to be reactive and not fake a reactive API
> with a sync and not reactive impl which is what would be done today with
> both support I fear.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
> processcontext.element()=<T> you get a CompletionStage<T> and output gets
> it as well.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> This way you register an execution chain. Mixed with
> streams you get a big data java 8/9/10 API which enabkes any connectivity
> in a wel performing manner ;).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com>
> a écrit :
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> So you mean the user should have a way of registering
> asynchronous activity with a callback (the callback must be registered with
> Beam, because Beam needs to know not to mark the element as done until all
> associated callbacks have completed). I think that's basically what Kenn
> was suggesting, unless I'm missing something.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and
> until bundles+combines are reactive friendly, beam will be synchronous
> whatever other parts do. Becoming reactive will enable to manage the
> threading issues properly and to have better scalability on the overall
> execution when remote IO are involved.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> However it requires to break source, sdf design to
> use completionstage - or equivalent - to chain the processing properly and
> in an unified fashion.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com>
> a écrit :
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a
> certain level beam is already reactive. Are you referring to a specific way
> of writing the code?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <
> relax@google.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> What do you mean by reactive?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive?
> Would alow to scale way more without having to hardly synchronize
> multithreading. Elegant and efficient :). Beam 3?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <
> klk@google.com> a écrit :
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic"
> answer, which is that we envision the new DoFn to be able to provide an
> automatic ExecutorService parameters that you can use as you wish.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
> >>>>>>>>>>>>>>>>>>>>>>>       @ProcessElement
> >>>>>>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
> ExecutorService executorService) {
> >>>>>>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in
> instance vars ...
> >>>>>>>>>>>>>>>>>>>>>>>       }
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>       @FinishBundle
> >>>>>>>>>>>>>>>>>>>>>>>       public void finish(...) {
> >>>>>>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
> appropriate ...
> >>>>>>>>>>>>>>>>>>>>>>>       }
> >>>>>>>>>>>>>>>>>>>>>>>     }
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its
> overarching knowledge of what is going on in a computation to, for example,
> share a thread pool between different bits. This was one reason to delete
> IntraBundleParallelization - it didn't allow the runner and user code to
> properly manage how many things were going on concurrently. And mostly the
> runner should own parallelizing to max out cores and what user code needs
> is asynchrony hooks that can interact with that. However, this feature is
> not thoroughly considered. TBD how much the harness itself manages blocking
> on outstanding requests versus it being your responsibility in
> FinishBundle, etc.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you
> are willing to do the knob tuning to get the threading acceptable for your
> particular use case. Perhaps someone else can weigh in.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Kenn
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
> josh.ferge@bounceexchange.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hello all:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external
> network calls. These pipelines are currently super slow, and the hypothesis
> is that they are slow because we are not threading for our network calls.
> The github issue below provides some discussion around this:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was
> IntraBundleParallelization, which helped with this. However, this was
> removed because it didn't comply with a few BEAM paradigms.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Questions going forward:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking
> network calls? It seems bundling the elements into groups of size X prior
> to passing to the DoFn, and managing the threading within the function
> might work. thoughts?
> >>>>>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
> >>>>>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help
> with this?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Lukasz Cwik <lc...@google.com>.
Thanks for the data as it is clear that the fast completion stage doubles
the overhead of the code segment that your going through and the regular
completion stage quadruples the overhead.

Its good that you also added a simple function and compared the run since
it gives relative overhead that could be used as a baseline for simple
lambdas (which are pretty common). When I ran your benchmark, I found that
the stringLength function using the beam implementation will be 2.5% faster
then the beamFastCompletionStage.

From experience I have found that performance suffers because of many
little bits of codes that seemingly don't add much but when added together
represent a pretty big system inefficiency.





On Tue, Mar 13, 2018 at 2:43 AM, Romain Manni-Bucau <rm...@gmail.com>
wrote:

> Here are some figures (small warn is I did priviledge beam a lot in this
> benchmark, a bit more than it should in a real impl, I'll say more about it
> after):
>
> I copied the code at:
>
> https://gist.github.com/rmannibucau/fd98fb6a10f9557613fb145c8e7e2de1
>
> And results at:
>
> https://gist.github.com/rmannibucau/3d3d0d61e85d45f2959900f208e71c5e
>
> Summary is:
>
> Benchmark Mode Cnt Score Error Units
> Comparison.beam thrpt 5 217003888,048 ± 4951801,578 ops/s
> Comparison.beamCompletionStage thrpt 5 55437442,544 ± 1750845,680 ops/s
> Comparison.beamFastCompletionStage thrpt 5 128422225,642 ± 3215651,832
> ops/s
> Comparison.defaultCompletionStage thrpt 5 57284761,644 ± 1022305,051 ops/s
> Comparison.fastCompletionStage thrpt 5 202739102,801 ± 5384078,170 ops/s
> Comparison.passthrough thrpt 5 391503537,773 ± 6147672,843 ops/s
>
>
> Comments:
> 1. completionstage then*(Function|Consumer) overhead is visible and
> creating more objects than beam it is a bit slower
> 2. it is trivial to make completionstage faster than the default in all
> the "precomputed" cases beam can know about (we mentionned it earlier) and
> it falls in the "fastCompletionStage" benchmarks which are x2 adapted to
> beam case (and allows to dropp one instantiation and compare and swap logic)
> 3. the diff between the fast completionstage and beam is mainly about the
> creation of one more wrapper + lambda handling
> 4. however figures stays high (milllions ops/s!). if you add any IO or
> real computation into the mix you will converge to a sensitively close
> number
> 5. don't forget that being based on CompletionStage we can become
> reactive/async and scale more appropriately if runners embrace that and at
> the end be faster anyway (the old NIO vs BIO topic ;))
>
> To summarize: I don't think these figures means that completionstage is a
> bad fit for beam but actually the opposite even if the raw difference is
> significative. It still shows that the completionstage based throughoutput
> is high enough to fit real world batches IMHO.
>
> Just to illustrate that point, if I just do a System.getProperty("
> user.name").length() (my username being rmannibucau):
>
> @Benchmark
> public void stringLength(final Blackhole blackhole) {
>     blackhole.consume(System.getProperty("user.name").length());
> }
>
> the execution is slower than completionfuture in beam case:
>
> Comparison.stringLength             thrpt    5   42428432,508 ±
> 4555866,693  ops/s
>
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com/> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
> 2018-03-12 18:58 GMT+01:00 Romain Manni-Bucau <rm...@gmail.com>:
> >
> > No more but can try to gather some figures and compare it to beam dofn
> overhead which should be at the same level or a bit more here since it is
> never unwrapped whereas completionfuture is a raw code chain without beam
> in the middle.
> >
> > Le 12 mars 2018 18:18, "Lukasz Cwik" <lc...@google.com> a écrit :
> >>
> >> Do you have data that supports this?
> >>
> >> Note that in reality for something like passing an element between
> DoFns, the constant in o(1) actually matters. Decreasing SDK harness
> overhead is a good thing though.
> >>
> >> On Mon, Mar 12, 2018 at 10:14 AM, Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>
> >>> By itself just the overhead of instantiating a wrapper (so nothing
> with the recent JVM GC improvement done for the stream/optional usages).
> After if you use the chaining you have a light overhead but still o(1) you
> can desire to skip when doing sync code but which will enable you to run
> way faster doing IO/async code by optimizing the CPU usage properly when
> you tune your slaves/workers. So tempted to summarize it as "has an
> overhead allowing to not run slower". It doesn't prevent beam to still
> expose a synchronous API collapse at evaluation time in a single fn which
> will give you the best of both worlds.
> >>>
> >>>
> >>> Romain Manni-Bucau
> >>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
> >>>
> >>> 2018-03-12 18:08 GMT+01:00 Lukasz Cwik <lc...@google.com>:
> >>>>
> >>>> It is expected that SDKs will have all their cores fully utilized by
> processing bundles in parallel and not by performing intrabundle
> parallelization. This allows for DoFns to be chained together via regular
> method calls because the overhead to pass a single element through all the
> DoFn's should be as minimal as possible
> >>>>
> >>>> What is the overhead of using a completion stage vs using a regular
> method call?
> >>>>
> >>>> On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>
> >>>>>
> >>>>>
> >>>>> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit :
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>
> >>>>>> makes me think beam should maybe do 2 internals changes before
> moving forward on (s)df API changes:
> >>>>>>
> >>>>>> 1. define a beam singleton per JVM (classloader hierarchy actually
> but you get the idea I think) which can be used to store things locally for
> reuse - see 2 for an example or metrics pusher work Etienne does could
> benefit from it too
> >>>>>
> >>>>>
> >>>>> I think we do need something like this, but it needs to be a bit
> more than a singleton per JVM. For one thing it needs to be at least per
> pipeline within a JVM. You might run multiple tests in a single JVM, and it
> should also be possible to run those tests in parallel without the static
> state interfering with each other. I also think the state needs to be
> addressable per step (i.e. a ParDo can look up its static state without
> caring about static state belonging to another ParDo).
> >>>>>
> >>>>>
> >>>>> Agree but you can register in a singleton the pipeline (as "ref" not
> as instance) and therefore hit the same need. +1 to have scopes (singleton,
> pipeline, thread) but it still requires a single singleton to handle
> serialization ;).
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>> 2. define a SPI to load (s)dofn parameter provider instead of
> having an ArgProvider which provides everything which is supported. This
> way you can use any kind of parameter and the parameterproviders can use 1.
> to handle their own state. First impl of the parameterprovider SPI would be
> a) state b) timer c) reactive handlers and potentially user parameter
> providers (service like which can be singleton in the scope of a "JVM"
> thanks to 1).
> >>>>>>
> >>>>>>
> >>>>>> Romain Manni-Bucau
> >>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
> >>>>>>
> >>>>>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
> >>>>>>>
> >>>>>>> Yep. Introduce OutputEmitter, and Process context no longer has
> much use.
> >>>>>>>
> >>>>>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>> Which is still a key feature for sdf but agree it can be dropped
> for an outputemitter pattern and the dofn moved to a plain parameters
> injection based pattern. Both (which completionstage) stays compatible :).
> >>>>>>>>
> >>>>>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
> >>>>>>>>>
> >>>>>>>>> I think process context should go away completely. At that point
> it has little use except for a way to send output downstream.
> >>>>>>>>>
> >>>>>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hmm, thinking out loud but completionstage should/could be
> extended to replace processcontext since it represents element and output
> at the same time no?
> >>>>>>>>>>
> >>>>>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a
> écrit :
> >>>>>>>>>>>
> >>>>>>>>>>> Yea, I think it could. But it is probably more readable to not
> overload the term, plus certainly a bit simpler in implementation. So
> perhaps @AsyncElement to make it very clear.
> >>>>>>>>>>>
> >>>>>>>>>>> Kenn
> >>>>>>>>>>>
> >>>>>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com>
> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Ken, can NewDoFn distinguish at generation time the
> difference between:
> >>>>>>>>>>>>
> >>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
> element, ...) {
> >>>>>>>>>>>>
> >>>>>>>>>>>> and
> >>>>>>>>>>>>
> >>>>>>>>>>>>     public void process(@Element Input element, ...) {
> >>>>>>>>>>>>
> >>>>>>>>>>>> If not, then we would probably need separate annotations....
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <
> klk@google.com> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining
> is much better than an ExecutorService, and very clear.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It is very feasible to add support that looks like
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>   new DoFn<InputT, OutputT>() {
> >>>>>>>>>>>>>     @ProcessElement
> >>>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
> element, ...) {
> >>>>>>>>>>>>>       element.thenApply(...)
> >>>>>>>>>>>>>     }
> >>>>>>>>>>>>>   }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If we had this available, I think users could even
> experiment with this often as it might help even where it isn't obvious.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> My main hesitation is that big part of Beam is giving a
> basic/imperative style of programming a DoFn that executes in a very smart
> functional/parallel way. Full future-oriented programming is not explored
> much outside of Javascript (and maybe Haskell) and requires greater
> discipline in programming in a functional manner - if you are mutating
> stuff in your callback you are going to have bugs, and then when you add
> concurrency control you are going to have bad performance and deadlocks. So
> I definitely wouldn't make it the default or want to spend all our support
> effort on teaching advanced programming technique.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Kenn
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Have you considered drafting in detail what you think this
> API might look like?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases
> - and "bugs" list so didn't started to work on it much.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> If it's a radically different API, it might be more
> appropriate as an alternative parallel Beam API rather than a replacement
> for the current API (there is also one such fluent API in the works).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless"
> case I spoke about before) and then propose to impl it ~natively and move
> it as main API for another major.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> This is another version (maybe a better, Java 8
> idiomatic one?) of what Kenn suggested.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so
> might not require waiting till Beam 3.0). We can recognize new parameters
> to processElement and populate add needed.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> This is right however in my head it was a single way
> movemenent to enforce the design to be reactive and not fake a reactive API
> with a sync and not reactive impl which is what would be done today with
> both support I fear.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
> processcontext.element()=<T> you get a CompletionStage<T> and output gets
> it as well.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> This way you register an execution chain. Mixed with
> streams you get a big data java 8/9/10 API which enabkes any connectivity
> in a wel performing manner ;).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com>
> a écrit :
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> So you mean the user should have a way of registering
> asynchronous activity with a callback (the callback must be registered with
> Beam, because Beam needs to know not to mark the element as done until all
> associated callbacks have completed). I think that's basically what Kenn
> was suggesting, unless I'm missing something.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and
> until bundles+combines are reactive friendly, beam will be synchronous
> whatever other parts do. Becoming reactive will enable to manage the
> threading issues properly and to have better scalability on the overall
> execution when remote IO are involved.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> However it requires to break source, sdf design to
> use completionstage - or equivalent - to chain the processing properly and
> in an unified fashion.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com>
> a écrit :
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a
> certain level beam is already reactive. Are you referring to a specific way
> of writing the code?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <
> relax@google.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> What do you mean by reactive?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive?
> Would alow to scale way more without having to hardly synchronize
> multithreading. Elegant and efficient :). Beam 3?
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <
> klk@google.com> a écrit :
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic"
> answer, which is that we envision the new DoFn to be able to provide an
> automatic ExecutorService parameters that you can use as you wish.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
> >>>>>>>>>>>>>>>>>>>>>>>       @ProcessElement
> >>>>>>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
> ExecutorService executorService) {
> >>>>>>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in
> instance vars ...
> >>>>>>>>>>>>>>>>>>>>>>>       }
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>       @FinishBundle
> >>>>>>>>>>>>>>>>>>>>>>>       public void finish(...) {
> >>>>>>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
> appropriate ...
> >>>>>>>>>>>>>>>>>>>>>>>       }
> >>>>>>>>>>>>>>>>>>>>>>>     }
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its
> overarching knowledge of what is going on in a computation to, for example,
> share a thread pool between different bits. This was one reason to delete
> IntraBundleParallelization - it didn't allow the runner and user code to
> properly manage how many things were going on concurrently. And mostly the
> runner should own parallelizing to max out cores and what user code needs
> is asynchrony hooks that can interact with that. However, this feature is
> not thoroughly considered. TBD how much the harness itself manages blocking
> on outstanding requests versus it being your responsibility in
> FinishBundle, etc.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you
> are willing to do the knob tuning to get the threading acceptable for your
> particular use case. Perhaps someone else can weigh in.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Kenn
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
> josh.ferge@bounceexchange.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hello all:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external
> network calls. These pipelines are currently super slow, and the hypothesis
> is that they are slow because we are not threading for our network calls.
> The github issue below provides some discussion around this:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was
> IntraBundleParallelization, which helped with this. However, this was
> removed because it didn't comply with a few BEAM paradigms.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Questions going forward:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking
> network calls? It seems bundling the elements into groups of size X prior
> to passing to the DoFn, and managing the threading within the function
> might work. thoughts?
> >>>>>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
> >>>>>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help
> with this?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
Here are some figures (small warn is I did priviledge beam a lot in this
benchmark, a bit more than it should in a real impl, I'll say more about it
after):

I copied the code at:

https://gist.github.com/rmannibucau/fd98fb6a10f9557613fb145c8e7e2de1

And results at:

https://gist.github.com/rmannibucau/3d3d0d61e85d45f2959900f208e71c5e

Summary is:

Benchmark Mode Cnt Score Error Units
Comparison.beam thrpt 5 217003888,048 ± 4951801,578 ops/s
Comparison.beamCompletionStage thrpt 5 55437442,544 ± 1750845,680 ops/s
Comparison.beamFastCompletionStage thrpt 5 128422225,642 ± 3215651,832 ops/s
Comparison.defaultCompletionStage thrpt 5 57284761,644 ± 1022305,051 ops/s
Comparison.fastCompletionStage thrpt 5 202739102,801 ± 5384078,170 ops/s
Comparison.passthrough thrpt 5 391503537,773 ± 6147672,843 ops/s


Comments:
1. completionstage then*(Function|Consumer) overhead is visible and
creating more objects than beam it is a bit slower
2. it is trivial to make completionstage faster than the default in all the
"precomputed" cases beam can know about (we mentionned it earlier) and it
falls in the "fastCompletionStage" benchmarks which are x2 adapted to beam
case (and allows to dropp one instantiation and compare and swap logic)
3. the diff between the fast completionstage and beam is mainly about the
creation of one more wrapper + lambda handling
4. however figures stays high (milllions ops/s!). if you add any IO or real
computation into the mix you will converge to a sensitively close number
5. don't forget that being based on CompletionStage we can become
reactive/async and scale more appropriately if runners embrace that and at
the end be faster anyway (the old NIO vs BIO topic ;))

To summarize: I don't think these figures means that completionstage is a
bad fit for beam but actually the opposite even if the raw difference is
significative. It still shows that the completionstage based throughoutput
is high enough to fit real world batches IMHO.

Just to illustrate that point, if I just do a
System.getProperty("user.name").length()
(my username being rmannibucau):

@Benchmark
public void stringLength(final Blackhole blackhole) {
    blackhole.consume(System.getProperty("user.name").length());
}

the execution is slower than completionfuture in beam case:

Comparison.stringLength             thrpt    5   42428432,508 ±
4555866,693  ops/s


Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com/> | Github
<https://github.com/rmannibucau> | LinkedIn
<https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>

2018-03-12 18:58 GMT+01:00 Romain Manni-Bucau <rm...@gmail.com>:
>
> No more but can try to gather some figures and compare it to beam dofn
overhead which should be at the same level or a bit more here since it is
never unwrapped whereas completionfuture is a raw code chain without beam
in the middle.
>
> Le 12 mars 2018 18:18, "Lukasz Cwik" <lc...@google.com> a écrit :
>>
>> Do you have data that supports this?
>>
>> Note that in reality for something like passing an element between
DoFns, the constant in o(1) actually matters. Decreasing SDK harness
overhead is a good thing though.
>>
>> On Mon, Mar 12, 2018 at 10:14 AM, Romain Manni-Bucau <
rmannibucau@gmail.com> wrote:
>>>
>>> By itself just the overhead of instantiating a wrapper (so nothing with
the recent JVM GC improvement done for the stream/optional usages). After
if you use the chaining you have a light overhead but still o(1) you can
desire to skip when doing sync code but which will enable you to run way
faster doing IO/async code by optimizing the CPU usage properly when you
tune your slaves/workers. So tempted to summarize it as "has an overhead
allowing to not run slower". It doesn't prevent beam to still expose a
synchronous API collapse at evaluation time in a single fn which will give
you the best of both worlds.
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
>>>
>>> 2018-03-12 18:08 GMT+01:00 Lukasz Cwik <lc...@google.com>:
>>>>
>>>> It is expected that SDKs will have all their cores fully utilized by
processing bundles in parallel and not by performing intrabundle
parallelization. This allows for DoFns to be chained together via regular
method calls because the overhead to pass a single element through all the
DoFn's should be as minimal as possible
>>>>
>>>> What is the overhead of using a completion stage vs using a regular
method call?
>>>>
>>>> On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau <
rmannibucau@gmail.com> wrote:
>>>>>
>>>>>
>>>>>
>>>>> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit :
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <
rmannibucau@gmail.com> wrote:
>>>>>>
>>>>>> makes me think beam should maybe do 2 internals changes before
moving forward on (s)df API changes:
>>>>>>
>>>>>> 1. define a beam singleton per JVM (classloader hierarchy actually
but you get the idea I think) which can be used to store things locally for
reuse - see 2 for an example or metrics pusher work Etienne does could
benefit from it too
>>>>>
>>>>>
>>>>> I think we do need something like this, but it needs to be a bit more
than a singleton per JVM. For one thing it needs to be at least per
pipeline within a JVM. You might run multiple tests in a single JVM, and it
should also be possible to run those tests in parallel without the static
state interfering with each other. I also think the state needs to be
addressable per step (i.e. a ParDo can look up its static state without
caring about static state belonging to another ParDo).
>>>>>
>>>>>
>>>>> Agree but you can register in a singleton the pipeline (as "ref" not
as instance) and therefore hit the same need. +1 to have scopes (singleton,
pipeline, thread) but it still requires a single singleton to handle
serialization ;).
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>> 2. define a SPI to load (s)dofn parameter provider instead of having
an ArgProvider which provides everything which is supported. This way you
can use any kind of parameter and the parameterproviders can use 1. to
handle their own state. First impl of the parameterprovider SPI would be a)
state b) timer c) reactive handlers and potentially user parameter
providers (service like which can be singleton in the scope of a "JVM"
thanks to 1).
>>>>>>
>>>>>>
>>>>>> Romain Manni-Bucau
>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
>>>>>>
>>>>>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>
>>>>>>> Yep. Introduce OutputEmitter, and Process context no longer has
much use.
>>>>>>>
>>>>>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <
rmannibucau@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Which is still a key feature for sdf but agree it can be dropped
for an outputemitter pattern and the dofn moved to a plain parameters
injection based pattern. Both (which completionstage) stays compatible :).
>>>>>>>>
>>>>>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>>
>>>>>>>>> I think process context should go away completely. At that point
it has little use except for a way to send output downstream.
>>>>>>>>>
>>>>>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <
rmannibucau@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Hmm, thinking out loud but completionstage should/could be
extended to replace processcontext since it represents element and output
at the same time no?
>>>>>>>>>>
>>>>>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a
écrit :
>>>>>>>>>>>
>>>>>>>>>>> Yea, I think it could. But it is probably more readable to not
overload the term, plus certainly a bit simpler in implementation. So
perhaps @AsyncElement to make it very clear.
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com>
wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Ken, can NewDoFn distinguish at generation time the difference
between:
>>>>>>>>>>>>
>>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
element, ...) {
>>>>>>>>>>>>
>>>>>>>>>>>> and
>>>>>>>>>>>>
>>>>>>>>>>>>     public void process(@Element Input element, ...) {
>>>>>>>>>>>>
>>>>>>>>>>>> If not, then we would probably need separate annotations....
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <
klk@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining
is much better than an ExecutorService, and very clear.
>>>>>>>>>>>>>
>>>>>>>>>>>>> It is very feasible to add support that looks like
>>>>>>>>>>>>>
>>>>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>>>>>     @ProcessElement
>>>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
element, ...) {
>>>>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>   }
>>>>>>>>>>>>>
>>>>>>>>>>>>> If we had this available, I think users could even experiment
with this often as it might help even where it isn't obvious.
>>>>>>>>>>>>>
>>>>>>>>>>>>> My main hesitation is that big part of Beam is giving a
basic/imperative style of programming a DoFn that executes in a very smart
functional/parallel way. Full future-oriented programming is not explored
much outside of Javascript (and maybe Haskell) and requires greater
discipline in programming in a functional manner - if you are mutating
stuff in your callback you are going to have bugs, and then when you add
concurrency control you are going to have bad performance and deadlocks. So
I definitely wouldn't make it the default or want to spend all our support
effort on teaching advanced programming technique.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Have you considered drafting in detail what you think this
API might look like?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases -
and "bugs" list so didn't started to work on it much.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If it's a radically different API, it might be more
appropriate as an alternative parallel Beam API rather than a replacement
for the current API (there is also one such fluent API in the works).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless"
case I spoke about before) and then propose to impl it ~natively and move
it as main API for another major.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic
one?) of what Kenn suggested.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so
might not require waiting till Beam 3.0). We can recognize new parameters
to processElement and populate add needed.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This is right however in my head it was a single way
movemenent to enforce the design to be reactive and not fake a reactive API
with a sync and not reactive impl which is what would be done today with
both support I fear.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
processcontext.element()=<T> you get a CompletionStage<T> and output gets
it as well.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This way you register an execution chain. Mixed with
streams you get a big data java 8/9/10 API which enabkes any connectivity
in a wel performing manner ;).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
écrit :
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> So you mean the user should have a way of registering
asynchronous activity with a callback (the callback must be registered with
Beam, because Beam needs to know not to mark the element as done until all
associated callbacks have completed). I think that's basically what Kenn
was suggesting, unless I'm missing something.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and
until bundles+combines are reactive friendly, beam will be synchronous
whatever other parts do. Becoming reactive will enable to manage the
threading issues properly and to have better scalability on the overall
execution when remote IO are involved.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> However it requires to break source, sdf design to use
completionstage - or equivalent - to chain the processing properly and in
an unified fashion.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com>
a écrit :
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a
certain level beam is already reactive. Are you referring to a specific way
of writing the code?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <
relax@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive?
Would alow to scale way more without having to hardly synchronize
multithreading. Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <
klk@google.com> a écrit :
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer,
which is that we envision the new DoFn to be able to provide an automatic
ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
ExecutorService executorService) {
>>>>>>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in
instance vars ...
>>>>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
appropriate ...
>>>>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its
overarching knowledge of what is going on in a computation to, for example,
share a thread pool between different bits. This was one reason to delete
IntraBundleParallelization - it didn't allow the runner and user code to
properly manage how many things were going on concurrently. And mostly the
runner should own parallelizing to max out cores and what user code needs
is asynchrony hooks that can interact with that. However, this feature is
not thoroughly considered. TBD how much the harness itself manages blocking
on outstanding requests versus it being your responsibility in
FinishBundle, etc.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you
are willing to do the knob tuning to get the threading acceptable for your
particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network
calls. These pipelines are currently super slow, and the hypothesis is that
they are slow because we are not threading for our network calls. The
github issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization,
which helped with this. However, this was removed because it didn't comply
with a few BEAM paradigms.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking
network calls? It seems bundling the elements into groups of size X prior
to passing to the DoFn, and managing the threading within the function
might work. thoughts?
>>>>>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help
with this?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
No more but can try to gather some figures and compare it to beam dofn
overhead which should be at the same level or a bit more here since it is
never unwrapped whereas completionfuture is a raw code chain without beam
in the middle.

Le 12 mars 2018 18:18, "Lukasz Cwik" <lc...@google.com> a écrit :

> Do you have data that supports this?
>
> Note that in reality for something like passing an element between DoFns,
> the constant in o(1) actually matters. Decreasing SDK harness overhead is a
> good thing though.
>
> On Mon, Mar 12, 2018 at 10:14 AM, Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
>
>> By itself just the overhead of instantiating a wrapper (so nothing with
>> the recent JVM GC improvement done for the stream/optional usages). After
>> if you use the chaining you have a light overhead but still o(1) you can
>> desire to skip when doing sync code but which will enable you to run way
>> faster doing IO/async code by optimizing the CPU usage properly when you
>> tune your slaves/workers. So tempted to summarize it as "has an overhead
>> allowing to not run slower". It doesn't prevent beam to still expose a
>> synchronous API collapse at evaluation time in a single fn which will give
>> you the best of both worlds.
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>> <https://rmannibucau.metawerx.net/> | Old Blog
>> <http://rmannibucau.wordpress.com> | Github
>> <https://github.com/rmannibucau> | LinkedIn
>> <https://www.linkedin.com/in/rmannibucau> | Book
>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>
>> 2018-03-12 18:08 GMT+01:00 Lukasz Cwik <lc...@google.com>:
>>
>>> It is expected that SDKs will have all their cores fully utilized by
>>> processing bundles in parallel and not by performing intrabundle
>>> parallelization. This allows for DoFns to be chained together via regular
>>> method calls because the overhead to pass a single element through all the
>>> DoFn's should be as minimal as possible
>>>
>>> What is the overhead of using a completion stage vs using a regular
>>> method call?
>>>
>>> On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau <
>>> rmannibucau@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit :
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <
>>>> rmannibucau@gmail.com> wrote:
>>>>
>>>>> makes me think beam should maybe do 2 internals changes before moving
>>>>> forward on (s)df API changes:
>>>>>
>>>>> 1. define a beam singleton per JVM (classloader hierarchy actually but
>>>>> you get the idea I think) which can be used to store things locally for
>>>>> reuse - see 2 for an example or metrics pusher work Etienne does could
>>>>> benefit from it too
>>>>>
>>>>
>>>> I think we do need something like this, but it needs to be a bit more
>>>> than a singleton per JVM. For one thing it needs to be at least per
>>>> pipeline within a JVM. You might run multiple tests in a single JVM, and it
>>>> should also be possible to run those tests in parallel without the static
>>>> state interfering with each other. I also think the state needs to be
>>>> addressable per step (i.e. a ParDo can look up its static state without
>>>> caring about static state belonging to another ParDo).
>>>>
>>>>
>>>> Agree but you can register in a singleton the pipeline (as "ref" not as
>>>> instance) and therefore hit the same need. +1 to have scopes (singleton,
>>>> pipeline, thread) but it still requires a single singleton to handle
>>>> serialization ;).
>>>>
>>>>
>>>>
>>>>
>>>> 2. define a SPI to load (s)dofn parameter provider instead of having an
>>>>> ArgProvider which provides everything which is supported. This way you can
>>>>> use any kind of parameter and the parameterproviders can use 1. to handle
>>>>> their own state. First impl of the parameterprovider SPI would be a) state
>>>>> b) timer c) reactive handlers and potentially user parameter providers
>>>>> (service like which can be singleton in the scope of a "JVM" thanks to 1).
>>>>>
>>>>>
>>>>> Romain Manni-Bucau
>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>
>>>>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>
>>>>>> Yep. Introduce OutputEmitter, and Process context no longer has much
>>>>>> use.
>>>>>>
>>>>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <
>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>
>>>>>>> Which is still a key feature for sdf but agree it can be dropped for
>>>>>>> an outputemitter pattern and the dofn moved to a plain parameters injection
>>>>>>> based pattern. Both (which completionstage) stays compatible :).
>>>>>>>
>>>>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>
>>>>>>>> I think process context should go away completely. At that point it
>>>>>>>> has little use except for a way to send output downstream.
>>>>>>>>
>>>>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <
>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hmm, thinking out loud but completionstage should/could be
>>>>>>>>> extended to replace processcontext since it represents element and output
>>>>>>>>> at the same time no?
>>>>>>>>>
>>>>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a
>>>>>>>>> écrit :
>>>>>>>>>
>>>>>>>>>> Yea, I think it could. But it is probably more readable to not
>>>>>>>>>> overload the term, plus certainly a bit simpler in implementation. So
>>>>>>>>>> perhaps @AsyncElement to make it very clear.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ken, can NewDoFn distinguish at generation time the difference
>>>>>>>>>>> between:
>>>>>>>>>>>
>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>>>>>>>>>>> element, ...) {
>>>>>>>>>>>
>>>>>>>>>>> and
>>>>>>>>>>>
>>>>>>>>>>>     public void process(@Element Input element, ...) {
>>>>>>>>>>>
>>>>>>>>>>> If not, then we would probably need separate annotations....
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is
>>>>>>>>>>>> much better than an ExecutorService, and very clear.
>>>>>>>>>>>>
>>>>>>>>>>>> It is very feasible to add support that looks like
>>>>>>>>>>>>
>>>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>>>>     @ProcessElement
>>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>>>>>>>>>>>> element, ...) {
>>>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>>>     }
>>>>>>>>>>>>   }
>>>>>>>>>>>>
>>>>>>>>>>>> If we had this available, I think users could even experiment
>>>>>>>>>>>> with this often as it might help even where it isn't obvious.
>>>>>>>>>>>>
>>>>>>>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>>>>>>>> basic/imperative style of programming a DoFn that executes in a very smart
>>>>>>>>>>>> functional/parallel way. Full future-oriented programming is
>>>>>>>>>>>> not explored much outside of Javascript (and maybe Haskell) and requires
>>>>>>>>>>>> greater discipline in programming in a functional manner - if you are
>>>>>>>>>>>> mutating stuff in your callback you are going to have bugs, and then when
>>>>>>>>>>>> you add concurrency control you are going to have bad performance and
>>>>>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>>>>>>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Have you considered drafting in detail what you think this
>>>>>>>>>>>>>> API might look like?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases -
>>>>>>>>>>>>> and "bugs" list so didn't started to work on it much.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If it's a radically different API, it might be more
>>>>>>>>>>>>>> appropriate as an alternative parallel Beam API rather than a replacement
>>>>>>>>>>>>>> for the current API (there is also one such fluent API in the works).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless"
>>>>>>>>>>>>> case I spoke about before) and then propose to impl it ~natively and move
>>>>>>>>>>>>> it as main API for another major.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic
>>>>>>>>>>>>>>>> one?) of what Kenn suggested.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so
>>>>>>>>>>>>>>>> might not require waiting till Beam 3.0). We can recognize new parameters
>>>>>>>>>>>>>>>> to processElement and populate add needed.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is right however in my head it was a single way
>>>>>>>>>>>>>>> movemenent to enforce the design to be reactive and not fake a reactive API
>>>>>>>>>>>>>>> with a sync and not reactive impl which is what would be done today with
>>>>>>>>>>>>>>> both support I fear.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>>>>>>>>>>>> it as well.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This way you register an execution chain. Mixed with
>>>>>>>>>>>>>>>>> streams you get a big data java 8/9/10 API which enabkes any connectivity
>>>>>>>>>>>>>>>>> in a wel performing manner ;).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be registered with
>>>>>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done until all
>>>>>>>>>>>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>>>>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a
>>>>>>>>>>>>>>>>>>> certain level beam is already reactive. Are you referring to a specific way
>>>>>>>>>>>>>>>>>>> of writing the code?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>> relax@google.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would
>>>>>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly synchronize multithreading.
>>>>>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <
>>>>>>>>>>>>>>>>>>>>> klk@google.com> a écrit :
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer,
>>>>>>>>>>>>>>>>>>>>>> which is that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in
>>>>>>>>>>>>>>>>>>>>>> instance vars ...
>>>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its
>>>>>>>>>>>>>>>>>>>>>> overarching knowledge of what is going on in a computation to, for example,
>>>>>>>>>>>>>>>>>>>>>> share a thread pool between different bits. This was one reason to delete
>>>>>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading acceptable for your
>>>>>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network
>>>>>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the hypothesis is that
>>>>>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our network calls. The
>>>>>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization,
>>>>>>>>>>>>>>>>>>>>>>> which helped with this. However, this was removed because it didn't comply
>>>>>>>>>>>>>>>>>>>>>>> with a few BEAM paradigms.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network
>>>>>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of size X prior to
>>>>>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within the function might
>>>>>>>>>>>>>>>>>>>>>>> work. thoughts?
>>>>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help
>>>>>>>>>>>>>>>>>>>>>>> with this?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Lukasz Cwik <lc...@google.com>.
Do you have data that supports this?

Note that in reality for something like passing an element between DoFns,
the constant in o(1) actually matters. Decreasing SDK harness overhead is a
good thing though.

On Mon, Mar 12, 2018 at 10:14 AM, Romain Manni-Bucau <rm...@gmail.com>
wrote:

> By itself just the overhead of instantiating a wrapper (so nothing with
> the recent JVM GC improvement done for the stream/optional usages). After
> if you use the chaining you have a light overhead but still o(1) you can
> desire to skip when doing sync code but which will enable you to run way
> faster doing IO/async code by optimizing the CPU usage properly when you
> tune your slaves/workers. So tempted to summarize it as "has an overhead
> allowing to not run slower". It doesn't prevent beam to still expose a
> synchronous API collapse at evaluation time in a single fn which will give
> you the best of both worlds.
>
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
> 2018-03-12 18:08 GMT+01:00 Lukasz Cwik <lc...@google.com>:
>
>> It is expected that SDKs will have all their cores fully utilized by
>> processing bundles in parallel and not by performing intrabundle
>> parallelization. This allows for DoFns to be chained together via regular
>> method calls because the overhead to pass a single element through all the
>> DoFn's should be as minimal as possible
>>
>> What is the overhead of using a completion stage vs using a regular
>> method call?
>>
>> On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau <
>> rmannibucau@gmail.com> wrote:
>>
>>>
>>>
>>> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit :
>>>
>>>
>>>
>>>
>>> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <
>>> rmannibucau@gmail.com> wrote:
>>>
>>>> makes me think beam should maybe do 2 internals changes before moving
>>>> forward on (s)df API changes:
>>>>
>>>> 1. define a beam singleton per JVM (classloader hierarchy actually but
>>>> you get the idea I think) which can be used to store things locally for
>>>> reuse - see 2 for an example or metrics pusher work Etienne does could
>>>> benefit from it too
>>>>
>>>
>>> I think we do need something like this, but it needs to be a bit more
>>> than a singleton per JVM. For one thing it needs to be at least per
>>> pipeline within a JVM. You might run multiple tests in a single JVM, and it
>>> should also be possible to run those tests in parallel without the static
>>> state interfering with each other. I also think the state needs to be
>>> addressable per step (i.e. a ParDo can look up its static state without
>>> caring about static state belonging to another ParDo).
>>>
>>>
>>> Agree but you can register in a singleton the pipeline (as "ref" not as
>>> instance) and therefore hit the same need. +1 to have scopes (singleton,
>>> pipeline, thread) but it still requires a single singleton to handle
>>> serialization ;).
>>>
>>>
>>>
>>>
>>> 2. define a SPI to load (s)dofn parameter provider instead of having an
>>>> ArgProvider which provides everything which is supported. This way you can
>>>> use any kind of parameter and the parameterproviders can use 1. to handle
>>>> their own state. First impl of the parameterprovider SPI would be a) state
>>>> b) timer c) reactive handlers and potentially user parameter providers
>>>> (service like which can be singleton in the scope of a "JVM" thanks to 1).
>>>>
>>>>
>>>> Romain Manni-Bucau
>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>> <http://rmannibucau.wordpress.com> | Github
>>>> <https://github.com/rmannibucau> | LinkedIn
>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>
>>>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>
>>>>> Yep. Introduce OutputEmitter, and Process context no longer has much
>>>>> use.
>>>>>
>>>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <
>>>>> rmannibucau@gmail.com> wrote:
>>>>>
>>>>>> Which is still a key feature for sdf but agree it can be dropped for
>>>>>> an outputemitter pattern and the dofn moved to a plain parameters injection
>>>>>> based pattern. Both (which completionstage) stays compatible :).
>>>>>>
>>>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>
>>>>>>> I think process context should go away completely. At that point it
>>>>>>> has little use except for a way to send output downstream.
>>>>>>>
>>>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <
>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hmm, thinking out loud but completionstage should/could be extended
>>>>>>>> to replace processcontext since it represents element and output at the
>>>>>>>> same time no?
>>>>>>>>
>>>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>>>>>
>>>>>>>>> Yea, I think it could. But it is probably more readable to not
>>>>>>>>> overload the term, plus certainly a bit simpler in implementation. So
>>>>>>>>> perhaps @AsyncElement to make it very clear.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Ken, can NewDoFn distinguish at generation time the difference
>>>>>>>>>> between:
>>>>>>>>>>
>>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>>> ...) {
>>>>>>>>>>
>>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>>>     public void process(@Element Input element, ...) {
>>>>>>>>>>
>>>>>>>>>> If not, then we would probably need separate annotations....
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is
>>>>>>>>>>> much better than an ExecutorService, and very clear.
>>>>>>>>>>>
>>>>>>>>>>> It is very feasible to add support that looks like
>>>>>>>>>>>
>>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>>>     @ProcessElement
>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>>>>>>>>>>> element, ...) {
>>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>>     }
>>>>>>>>>>>   }
>>>>>>>>>>>
>>>>>>>>>>> If we had this available, I think users could even experiment
>>>>>>>>>>> with this often as it might help even where it isn't obvious.
>>>>>>>>>>>
>>>>>>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>>>>>>> basic/imperative style of programming a DoFn that executes in a very smart
>>>>>>>>>>> functional/parallel way. Full future-oriented programming is
>>>>>>>>>>> not explored much outside of Javascript (and maybe Haskell) and requires
>>>>>>>>>>> greater discipline in programming in a functional manner - if you are
>>>>>>>>>>> mutating stuff in your callback you are going to have bugs, and then when
>>>>>>>>>>> you add concurrency control you are going to have bad performance and
>>>>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>>>>>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Have you considered drafting in detail what you think this API
>>>>>>>>>>>>> might look like?
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases -
>>>>>>>>>>>> and "bugs" list so didn't started to work on it much.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> If it's a radically different API, it might be more
>>>>>>>>>>>>> appropriate as an alternative parallel Beam API rather than a replacement
>>>>>>>>>>>>> for the current API (there is also one such fluent API in the works).
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless"
>>>>>>>>>>>> case I spoke about before) and then propose to impl it ~natively and move
>>>>>>>>>>>> it as main API for another major.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic
>>>>>>>>>>>>>>> one?) of what Kenn suggested.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so
>>>>>>>>>>>>>>> might not require waiting till Beam 3.0). We can recognize new parameters
>>>>>>>>>>>>>>> to processElement and populate add needed.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is right however in my head it was a single way
>>>>>>>>>>>>>> movemenent to enforce the design to be reactive and not fake a reactive API
>>>>>>>>>>>>>> with a sync and not reactive impl which is what would be done today with
>>>>>>>>>>>>>> both support I fear.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>>>>>>>>>>> it as well.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This way you register an execution chain. Mixed with
>>>>>>>>>>>>>>>> streams you get a big data java 8/9/10 API which enabkes any connectivity
>>>>>>>>>>>>>>>> in a wel performing manner ;).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be registered with
>>>>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done until all
>>>>>>>>>>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>>>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a
>>>>>>>>>>>>>>>>>> certain level beam is already reactive. Are you referring to a specific way
>>>>>>>>>>>>>>>>>> of writing the code?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <
>>>>>>>>>>>>>>>>>> relax@google.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would
>>>>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly synchronize multithreading.
>>>>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com>
>>>>>>>>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer,
>>>>>>>>>>>>>>>>>>>>> which is that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in
>>>>>>>>>>>>>>>>>>>>> instance vars ...
>>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, share a
>>>>>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading acceptable for your
>>>>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network
>>>>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the hypothesis is that
>>>>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our network calls. The
>>>>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization,
>>>>>>>>>>>>>>>>>>>>>> which helped with this. However, this was removed because it didn't comply
>>>>>>>>>>>>>>>>>>>>>> with a few BEAM paradigms.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network
>>>>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of size X prior to
>>>>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within the function might
>>>>>>>>>>>>>>>>>>>>>> work. thoughts?
>>>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help
>>>>>>>>>>>>>>>>>>>>>> with this?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>
>>>
>>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
By itself just the overhead of instantiating a wrapper (so nothing with the
recent JVM GC improvement done for the stream/optional usages). After if
you use the chaining you have a light overhead but still o(1) you can
desire to skip when doing sync code but which will enable you to run way
faster doing IO/async code by optimizing the CPU usage properly when you
tune your slaves/workers. So tempted to summarize it as "has an overhead
allowing to not run slower". It doesn't prevent beam to still expose a
synchronous API collapse at evaluation time in a single fn which will give
you the best of both worlds.


Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>

2018-03-12 18:08 GMT+01:00 Lukasz Cwik <lc...@google.com>:

> It is expected that SDKs will have all their cores fully utilized by
> processing bundles in parallel and not by performing intrabundle
> parallelization. This allows for DoFns to be chained together via regular
> method calls because the overhead to pass a single element through all the
> DoFn's should be as minimal as possible
>
> What is the overhead of using a completion stage vs using a regular method
> call?
>
> On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau <
> rmannibucau@gmail.com> wrote:
>
>>
>>
>> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit :
>>
>>
>>
>>
>> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <rm...@gmail.com>
>> wrote:
>>
>>> makes me think beam should maybe do 2 internals changes before moving
>>> forward on (s)df API changes:
>>>
>>> 1. define a beam singleton per JVM (classloader hierarchy actually but
>>> you get the idea I think) which can be used to store things locally for
>>> reuse - see 2 for an example or metrics pusher work Etienne does could
>>> benefit from it too
>>>
>>
>> I think we do need something like this, but it needs to be a bit more
>> than a singleton per JVM. For one thing it needs to be at least per
>> pipeline within a JVM. You might run multiple tests in a single JVM, and it
>> should also be possible to run those tests in parallel without the static
>> state interfering with each other. I also think the state needs to be
>> addressable per step (i.e. a ParDo can look up its static state without
>> caring about static state belonging to another ParDo).
>>
>>
>> Agree but you can register in a singleton the pipeline (as "ref" not as
>> instance) and therefore hit the same need. +1 to have scopes (singleton,
>> pipeline, thread) but it still requires a single singleton to handle
>> serialization ;).
>>
>>
>>
>>
>> 2. define a SPI to load (s)dofn parameter provider instead of having an
>>> ArgProvider which provides everything which is supported. This way you can
>>> use any kind of parameter and the parameterproviders can use 1. to handle
>>> their own state. First impl of the parameterprovider SPI would be a) state
>>> b) timer c) reactive handlers and potentially user parameter providers
>>> (service like which can be singleton in the scope of a "JVM" thanks to 1).
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>> <http://rmannibucau.wordpress.com> | Github
>>> <https://github.com/rmannibucau> | LinkedIn
>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>
>>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
>>>
>>>> Yep. Introduce OutputEmitter, and Process context no longer has much
>>>> use.
>>>>
>>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <
>>>> rmannibucau@gmail.com> wrote:
>>>>
>>>>> Which is still a key feature for sdf but agree it can be dropped for
>>>>> an outputemitter pattern and the dofn moved to a plain parameters injection
>>>>> based pattern. Both (which completionstage) stays compatible :).
>>>>>
>>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>>>>
>>>>>> I think process context should go away completely. At that point it
>>>>>> has little use except for a way to send output downstream.
>>>>>>
>>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <
>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>
>>>>>>> Hmm, thinking out loud but completionstage should/could be extended
>>>>>>> to replace processcontext since it represents element and output at the
>>>>>>> same time no?
>>>>>>>
>>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>>>>
>>>>>>>> Yea, I think it could. But it is probably more readable to not
>>>>>>>> overload the term, plus certainly a bit simpler in implementation. So
>>>>>>>> perhaps @AsyncElement to make it very clear.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Ken, can NewDoFn distinguish at generation time the difference
>>>>>>>>> between:
>>>>>>>>>
>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>> ...) {
>>>>>>>>>
>>>>>>>>> and
>>>>>>>>>
>>>>>>>>>     public void process(@Element Input element, ...) {
>>>>>>>>>
>>>>>>>>> If not, then we would probably need separate annotations....
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is
>>>>>>>>>> much better than an ExecutorService, and very clear.
>>>>>>>>>>
>>>>>>>>>> It is very feasible to add support that looks like
>>>>>>>>>>
>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>>     @ProcessElement
>>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>>> ...) {
>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>     }
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> If we had this available, I think users could even experiment
>>>>>>>>>> with this often as it might help even where it isn't obvious.
>>>>>>>>>>
>>>>>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>>>>>> basic/imperative style of programming a DoFn that executes in a very smart
>>>>>>>>>> functional/parallel way. Full future-oriented programming is not
>>>>>>>>>> explored much outside of Javascript (and maybe Haskell) and requires
>>>>>>>>>> greater discipline in programming in a functional manner - if you are
>>>>>>>>>> mutating stuff in your callback you are going to have bugs, and then when
>>>>>>>>>> you add concurrency control you are going to have bad performance and
>>>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>>>>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Have you considered drafting in detail what you think this API
>>>>>>>>>>>> might look like?
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases - and
>>>>>>>>>>> "bugs" list so didn't started to work on it much.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> If it's a radically different API, it might be more appropriate
>>>>>>>>>>>> as an alternative parallel Beam API rather than a replacement for the
>>>>>>>>>>>> current API (there is also one such fluent API in the works).
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless" case
>>>>>>>>>>> I spoke about before) and then propose to impl it ~natively and move it as
>>>>>>>>>>> main API for another major.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic
>>>>>>>>>>>>>> one?) of what Kenn suggested.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so
>>>>>>>>>>>>>> might not require waiting till Beam 3.0). We can recognize new parameters
>>>>>>>>>>>>>> to processElement and populate add needed.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is right however in my head it was a single way
>>>>>>>>>>>>> movemenent to enforce the design to be reactive and not fake a reactive API
>>>>>>>>>>>>> with a sync and not reactive impl which is what would be done today with
>>>>>>>>>>>>> both support I fear.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>>>>>>>>>> it as well.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This way you register an execution chain. Mixed with streams
>>>>>>>>>>>>>>> you get a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>>>>>>>>>>> performing manner ;).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be registered with
>>>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done until all
>>>>>>>>>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a certain
>>>>>>>>>>>>>>>>> level beam is already reactive. Are you referring to a specific way of
>>>>>>>>>>>>>>>>> writing the code?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <
>>>>>>>>>>>>>>>>> relax@google.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would
>>>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly synchronize multithreading.
>>>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com>
>>>>>>>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer,
>>>>>>>>>>>>>>>>>>>> which is that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in instance
>>>>>>>>>>>>>>>>>>>> vars ...
>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, share a
>>>>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading acceptable for your
>>>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network
>>>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the hypothesis is that
>>>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our network calls. The
>>>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization,
>>>>>>>>>>>>>>>>>>>>> which helped with this. However, this was removed because it didn't comply
>>>>>>>>>>>>>>>>>>>>> with a few BEAM paradigms.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network
>>>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of size X prior to
>>>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within the function might
>>>>>>>>>>>>>>>>>>>>> work. thoughts?
>>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help with
>>>>>>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>
>>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Lukasz Cwik <lc...@google.com>.
It is expected that SDKs will have all their cores fully utilized by
processing bundles in parallel and not by performing intrabundle
parallelization. This allows for DoFns to be chained together via regular
method calls because the overhead to pass a single element through all the
DoFn's should be as minimal as possible

What is the overhead of using a completion stage vs using a regular method
call?

On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau <rm...@gmail.com>
wrote:

>
>
> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit :
>
>
>
>
> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <rm...@gmail.com>
> wrote:
>
>> makes me think beam should maybe do 2 internals changes before moving
>> forward on (s)df API changes:
>>
>> 1. define a beam singleton per JVM (classloader hierarchy actually but
>> you get the idea I think) which can be used to store things locally for
>> reuse - see 2 for an example or metrics pusher work Etienne does could
>> benefit from it too
>>
>
> I think we do need something like this, but it needs to be a bit more than
> a singleton per JVM. For one thing it needs to be at least per pipeline
> within a JVM. You might run multiple tests in a single JVM, and it should
> also be possible to run those tests in parallel without the static state
> interfering with each other. I also think the state needs to be addressable
> per step (i.e. a ParDo can look up its static state without caring about
> static state belonging to another ParDo).
>
>
> Agree but you can register in a singleton the pipeline (as "ref" not as
> instance) and therefore hit the same need. +1 to have scopes (singleton,
> pipeline, thread) but it still requires a single singleton to handle
> serialization ;).
>
>
>
>
> 2. define a SPI to load (s)dofn parameter provider instead of having an
>> ArgProvider which provides everything which is supported. This way you can
>> use any kind of parameter and the parameterproviders can use 1. to handle
>> their own state. First impl of the parameterprovider SPI would be a) state
>> b) timer c) reactive handlers and potentially user parameter providers
>> (service like which can be singleton in the scope of a "JVM" thanks to 1).
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>> <https://rmannibucau.metawerx.net/> | Old Blog
>> <http://rmannibucau.wordpress.com> | Github
>> <https://github.com/rmannibucau> | LinkedIn
>> <https://www.linkedin.com/in/rmannibucau> | Book
>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>
>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
>>
>>> Yep. Introduce OutputEmitter, and Process context no longer has much use.
>>>
>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <rm...@gmail.com>
>>> wrote:
>>>
>>>> Which is still a key feature for sdf but agree it can be dropped for an
>>>> outputemitter pattern and the dofn moved to a plain parameters injection
>>>> based pattern. Both (which completionstage) stays compatible :).
>>>>
>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>>>
>>>>> I think process context should go away completely. At that point it
>>>>> has little use except for a way to send output downstream.
>>>>>
>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <
>>>>> rmannibucau@gmail.com> wrote:
>>>>>
>>>>>> Hmm, thinking out loud but completionstage should/could be extended
>>>>>> to replace processcontext since it represents element and output at the
>>>>>> same time no?
>>>>>>
>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>>>
>>>>>>> Yea, I think it could. But it is probably more readable to not
>>>>>>> overload the term, plus certainly a bit simpler in implementation. So
>>>>>>> perhaps @AsyncElement to make it very clear.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Ken, can NewDoFn distinguish at generation time the difference
>>>>>>>> between:
>>>>>>>>
>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>> ...) {
>>>>>>>>
>>>>>>>> and
>>>>>>>>
>>>>>>>>     public void process(@Element Input element, ...) {
>>>>>>>>
>>>>>>>> If not, then we would probably need separate annotations....
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is
>>>>>>>>> much better than an ExecutorService, and very clear.
>>>>>>>>>
>>>>>>>>> It is very feasible to add support that looks like
>>>>>>>>>
>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>     @ProcessElement
>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>> ...) {
>>>>>>>>>       element.thenApply(...)
>>>>>>>>>     }
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>> If we had this available, I think users could even experiment with
>>>>>>>>> this often as it might help even where it isn't obvious.
>>>>>>>>>
>>>>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>>>>> basic/imperative style of programming a DoFn that executes in a very smart
>>>>>>>>> functional/parallel way. Full future-oriented programming is not
>>>>>>>>> explored much outside of Javascript (and maybe Haskell) and requires
>>>>>>>>> greater discipline in programming in a functional manner - if you are
>>>>>>>>> mutating stuff in your callback you are going to have bugs, and then when
>>>>>>>>> you add concurrency control you are going to have bad performance and
>>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>>>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>
>>>>>>>>>>> Have you considered drafting in detail what you think this API
>>>>>>>>>>> might look like?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases - and
>>>>>>>>>> "bugs" list so didn't started to work on it much.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> If it's a radically different API, it might be more appropriate
>>>>>>>>>>> as an alternative parallel Beam API rather than a replacement for the
>>>>>>>>>>> current API (there is also one such fluent API in the works).
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless" case
>>>>>>>>>> I spoke about before) and then propose to impl it ~natively and move it as
>>>>>>>>>> main API for another major.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic
>>>>>>>>>>>>> one?) of what Kenn suggested.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so might
>>>>>>>>>>>>> not require waiting till Beam 3.0). We can recognize new parameters to
>>>>>>>>>>>>> processElement and populate add needed.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This is right however in my head it was a single way movemenent
>>>>>>>>>>>> to enforce the design to be reactive and not fake a reactive API with a
>>>>>>>>>>>> sync and not reactive impl which is what would be done today with both
>>>>>>>>>>>> support I fear.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>>>>>>>>> it as well.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This way you register an execution chain. Mixed with streams
>>>>>>>>>>>>>> you get a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>>>>>>>>>> performing manner ;).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be registered with
>>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done until all
>>>>>>>>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a certain
>>>>>>>>>>>>>>>> level beam is already reactive. Are you referring to a specific way of
>>>>>>>>>>>>>>>> writing the code?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would
>>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly synchronize multithreading.
>>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com>
>>>>>>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer,
>>>>>>>>>>>>>>>>>>> which is that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in instance
>>>>>>>>>>>>>>>>>>> vars ...
>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, share a
>>>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading acceptable for your
>>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network
>>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the hypothesis is that
>>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our network calls. The
>>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization,
>>>>>>>>>>>>>>>>>>>> which helped with this. However, this was removed because it didn't comply
>>>>>>>>>>>>>>>>>>>> with a few BEAM paradigms.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network
>>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of size X prior to
>>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within the function might
>>>>>>>>>>>>>>>>>>>> work. thoughts?
>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help with
>>>>>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit :




On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <rm...@gmail.com>
wrote:

> makes me think beam should maybe do 2 internals changes before moving
> forward on (s)df API changes:
>
> 1. define a beam singleton per JVM (classloader hierarchy actually but you
> get the idea I think) which can be used to store things locally for reuse -
> see 2 for an example or metrics pusher work Etienne does could benefit from
> it too
>

I think we do need something like this, but it needs to be a bit more than
a singleton per JVM. For one thing it needs to be at least per pipeline
within a JVM. You might run multiple tests in a single JVM, and it should
also be possible to run those tests in parallel without the static state
interfering with each other. I also think the state needs to be addressable
per step (i.e. a ParDo can look up its static state without caring about
static state belonging to another ParDo).


Agree but you can register in a singleton the pipeline (as "ref" not as
instance) and therefore hit the same need. +1 to have scopes (singleton,
pipeline, thread) but it still requires a single singleton to handle
serialization ;).




2. define a SPI to load (s)dofn parameter provider instead of having an
> ArgProvider which provides everything which is supported. This way you can
> use any kind of parameter and the parameterproviders can use 1. to handle
> their own state. First impl of the parameterprovider SPI would be a) state
> b) timer c) reactive handlers and potentially user parameter providers
> (service like which can be singleton in the scope of a "JVM" thanks to 1).
>
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
>
>> Yep. Introduce OutputEmitter, and Process context no longer has much use.
>>
>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <rm...@gmail.com>
>> wrote:
>>
>>> Which is still a key feature for sdf but agree it can be dropped for an
>>> outputemitter pattern and the dofn moved to a plain parameters injection
>>> based pattern. Both (which completionstage) stays compatible :).
>>>
>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>>
>>>> I think process context should go away completely. At that point it has
>>>> little use except for a way to send output downstream.
>>>>
>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <rm...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hmm, thinking out loud but completionstage should/could be extended to
>>>>> replace processcontext since it represents element and output at the same
>>>>> time no?
>>>>>
>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>>
>>>>>> Yea, I think it could. But it is probably more readable to not
>>>>>> overload the term, plus certainly a bit simpler in implementation. So
>>>>>> perhaps @AsyncElement to make it very clear.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Ken, can NewDoFn distinguish at generation time the difference
>>>>>>> between:
>>>>>>>
>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>> ...) {
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>>     public void process(@Element Input element, ...) {
>>>>>>>
>>>>>>> If not, then we would probably need separate annotations....
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is much
>>>>>>>> better than an ExecutorService, and very clear.
>>>>>>>>
>>>>>>>> It is very feasible to add support that looks like
>>>>>>>>
>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>     @ProcessElement
>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>> ...) {
>>>>>>>>       element.thenApply(...)
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>>
>>>>>>>> If we had this available, I think users could even experiment with
>>>>>>>> this often as it might help even where it isn't obvious.
>>>>>>>>
>>>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>>>> basic/imperative style of programming a DoFn that executes in a very smart
>>>>>>>> functional/parallel way. Full future-oriented programming is not
>>>>>>>> explored much outside of Javascript (and maybe Haskell) and requires
>>>>>>>> greater discipline in programming in a functional manner - if you are
>>>>>>>> mutating stuff in your callback you are going to have bugs, and then when
>>>>>>>> you add concurrency control you are going to have bad performance and
>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>
>>>>>>>>>> Have you considered drafting in detail what you think this API
>>>>>>>>>> might look like?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases - and
>>>>>>>>> "bugs" list so didn't started to work on it much.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> If it's a radically different API, it might be more appropriate
>>>>>>>>>> as an alternative parallel Beam API rather than a replacement for the
>>>>>>>>>> current API (there is also one such fluent API in the works).
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What I plan is to draft it on top of beam (so the "useless" case I
>>>>>>>>> spoke about before) and then propose to impl it ~natively and move it as
>>>>>>>>> main API for another major.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>
>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic one?)
>>>>>>>>>>>> of what Kenn suggested.
>>>>>>>>>>>>
>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so might
>>>>>>>>>>>> not require waiting till Beam 3.0). We can recognize new parameters to
>>>>>>>>>>>> processElement and populate add needed.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This is right however in my head it was a single way movemenent
>>>>>>>>>>> to enforce the design to be reactive and not fake a reactive API with a
>>>>>>>>>>> sync and not reactive impl which is what would be done today with both
>>>>>>>>>>> support I fear.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>>>>>>>> it as well.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This way you register an execution chain. Mixed with streams
>>>>>>>>>>>>> you get a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>>>>>>>>> performing manner ;).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>
>>>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be registered with
>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done until all
>>>>>>>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If you're talking about reactive programming, at a certain
>>>>>>>>>>>>>>> level beam is already reactive. Are you referring to a specific way of
>>>>>>>>>>>>>>> writing the code?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would
>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly synchronize multithreading.
>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com>
>>>>>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which
>>>>>>>>>>>>>>>>>> is that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in instance
>>>>>>>>>>>>>>>>>> vars ...
>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, share a
>>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading acceptable for your
>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network
>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the hypothesis is that
>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our network calls. The
>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which
>>>>>>>>>>>>>>>>>>> helped with this. However, this was removed because it didn't comply with a
>>>>>>>>>>>>>>>>>>> few BEAM paradigms.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network
>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of size X prior to
>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within the function might
>>>>>>>>>>>>>>>>>>> work. thoughts?
>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help with
>>>>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Reuven Lax <re...@google.com>.
On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <rm...@gmail.com>
wrote:

> makes me think beam should maybe do 2 internals changes before moving
> forward on (s)df API changes:
>
> 1. define a beam singleton per JVM (classloader hierarchy actually but you
> get the idea I think) which can be used to store things locally for reuse -
> see 2 for an example or metrics pusher work Etienne does could benefit from
> it too
>

I think we do need something like this, but it needs to be a bit more than
a singleton per JVM. For one thing it needs to be at least per pipeline
within a JVM. You might run multiple tests in a single JVM, and it should
also be possible to run those tests in parallel without the static state
interfering with each other. I also think the state needs to be addressable
per step (i.e. a ParDo can look up its static state without caring about
static state belonging to another ParDo).


2. define a SPI to load (s)dofn parameter provider instead of having an
> ArgProvider which provides everything which is supported. This way you can
> use any kind of parameter and the parameterproviders can use 1. to handle
> their own state. First impl of the parameterprovider SPI would be a) state
> b) timer c) reactive handlers and potentially user parameter providers
> (service like which can be singleton in the scope of a "JVM" thanks to 1).
>
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
>
>> Yep. Introduce OutputEmitter, and Process context no longer has much use.
>>
>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <rm...@gmail.com>
>> wrote:
>>
>>> Which is still a key feature for sdf but agree it can be dropped for an
>>> outputemitter pattern and the dofn moved to a plain parameters injection
>>> based pattern. Both (which completionstage) stays compatible :).
>>>
>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>>
>>>> I think process context should go away completely. At that point it has
>>>> little use except for a way to send output downstream.
>>>>
>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <rm...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hmm, thinking out loud but completionstage should/could be extended to
>>>>> replace processcontext since it represents element and output at the same
>>>>> time no?
>>>>>
>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>>
>>>>>> Yea, I think it could. But it is probably more readable to not
>>>>>> overload the term, plus certainly a bit simpler in implementation. So
>>>>>> perhaps @AsyncElement to make it very clear.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Ken, can NewDoFn distinguish at generation time the difference
>>>>>>> between:
>>>>>>>
>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>> ...) {
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>>     public void process(@Element Input element, ...) {
>>>>>>>
>>>>>>> If not, then we would probably need separate annotations....
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is much
>>>>>>>> better than an ExecutorService, and very clear.
>>>>>>>>
>>>>>>>> It is very feasible to add support that looks like
>>>>>>>>
>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>     @ProcessElement
>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>> ...) {
>>>>>>>>       element.thenApply(...)
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>>
>>>>>>>> If we had this available, I think users could even experiment with
>>>>>>>> this often as it might help even where it isn't obvious.
>>>>>>>>
>>>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>>>> basic/imperative style of programming a DoFn that executes in a very smart
>>>>>>>> functional/parallel way. Full future-oriented programming is not
>>>>>>>> explored much outside of Javascript (and maybe Haskell) and requires
>>>>>>>> greater discipline in programming in a functional manner - if you are
>>>>>>>> mutating stuff in your callback you are going to have bugs, and then when
>>>>>>>> you add concurrency control you are going to have bad performance and
>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>
>>>>>>>>>> Have you considered drafting in detail what you think this API
>>>>>>>>>> might look like?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases - and
>>>>>>>>> "bugs" list so didn't started to work on it much.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> If it's a radically different API, it might be more appropriate
>>>>>>>>>> as an alternative parallel Beam API rather than a replacement for the
>>>>>>>>>> current API (there is also one such fluent API in the works).
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What I plan is to draft it on top of beam (so the "useless" case I
>>>>>>>>> spoke about before) and then propose to impl it ~natively and move it as
>>>>>>>>> main API for another major.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>
>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic one?)
>>>>>>>>>>>> of what Kenn suggested.
>>>>>>>>>>>>
>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so might
>>>>>>>>>>>> not require waiting till Beam 3.0). We can recognize new parameters to
>>>>>>>>>>>> processElement and populate add needed.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This is right however in my head it was a single way movemenent
>>>>>>>>>>> to enforce the design to be reactive and not fake a reactive API with a
>>>>>>>>>>> sync and not reactive impl which is what would be done today with both
>>>>>>>>>>> support I fear.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>>>>>>>> it as well.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This way you register an execution chain. Mixed with streams
>>>>>>>>>>>>> you get a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>>>>>>>>> performing manner ;).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>
>>>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be registered with
>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done until all
>>>>>>>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If you're talking about reactive programming, at a certain
>>>>>>>>>>>>>>> level beam is already reactive. Are you referring to a specific way of
>>>>>>>>>>>>>>> writing the code?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would
>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly synchronize multithreading.
>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com>
>>>>>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which
>>>>>>>>>>>>>>>>>> is that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in instance
>>>>>>>>>>>>>>>>>> vars ...
>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, share a
>>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading acceptable for your
>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network
>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the hypothesis is that
>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our network calls. The
>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which
>>>>>>>>>>>>>>>>>>> helped with this. However, this was removed because it didn't comply with a
>>>>>>>>>>>>>>>>>>> few BEAM paradigms.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network
>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of size X prior to
>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within the function might
>>>>>>>>>>>>>>>>>>> work. thoughts?
>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help with
>>>>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
makes me think beam should maybe do 2 internals changes before moving
forward on (s)df API changes:

1. define a beam singleton per JVM (classloader hierarchy actually but you
get the idea I think) which can be used to store things locally for reuse -
see 2 for an example or metrics pusher work Etienne does could benefit from
it too
2. define a SPI to load (s)dofn parameter provider instead of having an
ArgProvider which provides everything which is supported. This way you can
use any kind of parameter and the parameterproviders can use 1. to handle
their own state. First impl of the parameterprovider SPI would be a) state
b) timer c) reactive handlers and potentially user parameter providers
(service like which can be singleton in the scope of a "JVM" thanks to 1).


Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>

2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:

> Yep. Introduce OutputEmitter, and Process context no longer has much use.
>
> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <rm...@gmail.com>
> wrote:
>
>> Which is still a key feature for sdf but agree it can be dropped for an
>> outputemitter pattern and the dofn moved to a plain parameters injection
>> based pattern. Both (which completionstage) stays compatible :).
>>
>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>
>>> I think process context should go away completely. At that point it has
>>> little use except for a way to send output downstream.
>>>
>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <rm...@gmail.com>
>>> wrote:
>>>
>>>> Hmm, thinking out loud but completionstage should/could be extended to
>>>> replace processcontext since it represents element and output at the same
>>>> time no?
>>>>
>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>
>>>>> Yea, I think it could. But it is probably more readable to not
>>>>> overload the term, plus certainly a bit simpler in implementation. So
>>>>> perhaps @AsyncElement to make it very clear.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Ken, can NewDoFn distinguish at generation time the difference
>>>>>> between:
>>>>>>
>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>> ...) {
>>>>>>
>>>>>> and
>>>>>>
>>>>>>     public void process(@Element Input element, ...) {
>>>>>>
>>>>>> If not, then we would probably need separate annotations....
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Nice! I agree that providing a CompletionStage for chaining is much
>>>>>>> better than an ExecutorService, and very clear.
>>>>>>>
>>>>>>> It is very feasible to add support that looks like
>>>>>>>
>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>     @ProcessElement
>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>> ...) {
>>>>>>>       element.thenApply(...)
>>>>>>>     }
>>>>>>>   }
>>>>>>>
>>>>>>> If we had this available, I think users could even experiment with
>>>>>>> this often as it might help even where it isn't obvious.
>>>>>>>
>>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>>> basic/imperative style of programming a DoFn that executes in a very smart
>>>>>>> functional/parallel way. Full future-oriented programming is not
>>>>>>> explored much outside of Javascript (and maybe Haskell) and requires
>>>>>>> greater discipline in programming in a functional manner - if you are
>>>>>>> mutating stuff in your callback you are going to have bugs, and then when
>>>>>>> you add concurrency control you are going to have bad performance and
>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>
>>>>>>>>> Have you considered drafting in detail what you think this API
>>>>>>>>> might look like?
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Yes, but it is after the "enhancements" - for my use cases - and
>>>>>>>> "bugs" list so didn't started to work on it much.
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> If it's a radically different API, it might be more appropriate as
>>>>>>>>> an alternative parallel Beam API rather than a replacement for the current
>>>>>>>>> API (there is also one such fluent API in the works).
>>>>>>>>>
>>>>>>>>
>>>>>>>> What I plan is to draft it on top of beam (so the "useless" case I
>>>>>>>> spoke about before) and then propose to impl it ~natively and move it as
>>>>>>>> main API for another major.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>
>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic one?)
>>>>>>>>>>> of what Kenn suggested.
>>>>>>>>>>>
>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so might
>>>>>>>>>>> not require waiting till Beam 3.0). We can recognize new parameters to
>>>>>>>>>>> processElement and populate add needed.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This is right however in my head it was a single way movemenent
>>>>>>>>>> to enforce the design to be reactive and not fake a reactive API with a
>>>>>>>>>> sync and not reactive impl which is what would be done today with both
>>>>>>>>>> support I fear.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>>>>>>> it as well.
>>>>>>>>>>>>
>>>>>>>>>>>> This way you register an execution chain. Mixed with streams
>>>>>>>>>>>> you get a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>>>>>>>> performing manner ;).
>>>>>>>>>>>>
>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>> écrit :
>>>>>>>>>>>>
>>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be registered with
>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done until all
>>>>>>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If you're talking about reactive programming, at a certain
>>>>>>>>>>>>>> level beam is already reactive. Are you referring to a specific way of
>>>>>>>>>>>>>> writing the code?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow
>>>>>>>>>>>>>>>> to scale way more without having to hardly synchronize multithreading.
>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a
>>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which
>>>>>>>>>>>>>>>>> is that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>>           ... launch some futures, put them in instance
>>>>>>>>>>>>>>>>> vars ...
>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, share a
>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading acceptable for your
>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network calls.
>>>>>>>>>>>>>>>>>> These pipelines are currently super slow, and the hypothesis is that they
>>>>>>>>>>>>>>>>>> are slow because we are not threading for our network calls. The github
>>>>>>>>>>>>>>>>>> issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which
>>>>>>>>>>>>>>>>>> helped with this. However, this was removed because it didn't comply with a
>>>>>>>>>>>>>>>>>> few BEAM paradigms.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network
>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of size X prior to
>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within the function might
>>>>>>>>>>>>>>>>>> work. thoughts?
>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help with
>>>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>

Re: Advice on parallelizing network calls in DoFn

Posted by Reuven Lax <re...@google.com>.
Yep. Introduce OutputEmitter, and Process context no longer has much use.

On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <rm...@gmail.com>
wrote:

> Which is still a key feature for sdf but agree it can be dropped for an
> outputemitter pattern and the dofn moved to a plain parameters injection
> based pattern. Both (which completionstage) stays compatible :).
>
> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>
>> I think process context should go away completely. At that point it has
>> little use except for a way to send output downstream.
>>
>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <rm...@gmail.com>
>> wrote:
>>
>>> Hmm, thinking out loud but completionstage should/could be extended to
>>> replace processcontext since it represents element and output at the same
>>> time no?
>>>
>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>
>>>> Yea, I think it could. But it is probably more readable to not overload
>>>> the term, plus certainly a bit simpler in implementation. So perhaps
>>>> @AsyncElement to make it very clear.
>>>>
>>>> Kenn
>>>>
>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Ken, can NewDoFn distinguish at generation time the difference between:
>>>>>
>>>>>     public void process(@Element CompletionStage<InputT> element, ...)
>>>>> {
>>>>>
>>>>> and
>>>>>
>>>>>     public void process(@Element Input element, ...) {
>>>>>
>>>>> If not, then we would probably need separate annotations....
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Nice! I agree that providing a CompletionStage for chaining is much
>>>>>> better than an ExecutorService, and very clear.
>>>>>>
>>>>>> It is very feasible to add support that looks like
>>>>>>
>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>     @ProcessElement
>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>> ...) {
>>>>>>       element.thenApply(...)
>>>>>>     }
>>>>>>   }
>>>>>>
>>>>>> If we had this available, I think users could even experiment with
>>>>>> this often as it might help even where it isn't obvious.
>>>>>>
>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>> basic/imperative style of programming a DoFn that executes in a very smart
>>>>>> functional/parallel way. Full future-oriented programming is not
>>>>>> explored much outside of Javascript (and maybe Haskell) and requires
>>>>>> greater discipline in programming in a functional manner - if you are
>>>>>> mutating stuff in your callback you are going to have bugs, and then when
>>>>>> you add concurrency control you are going to have bad performance and
>>>>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>
>>>>>>>> Have you considered drafting in detail what you think this API
>>>>>>>> might look like?
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Yes, but it is after the "enhancements" - for my use cases - and
>>>>>>> "bugs" list so didn't started to work on it much.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> If it's a radically different API, it might be more appropriate as
>>>>>>>> an alternative parallel Beam API rather than a replacement for the current
>>>>>>>> API (there is also one such fluent API in the works).
>>>>>>>>
>>>>>>>
>>>>>>> What I plan is to draft it on top of beam (so the "useless" case I
>>>>>>> spoke about before) and then propose to impl it ~natively and move it as
>>>>>>> main API for another major.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>
>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic one?)
>>>>>>>>>> of what Kenn suggested.
>>>>>>>>>>
>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so might
>>>>>>>>>> not require waiting till Beam 3.0). We can recognize new parameters to
>>>>>>>>>> processElement and populate add needed.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is right however in my head it was a single way movemenent to
>>>>>>>>> enforce the design to be reactive and not fake a reactive API with a sync
>>>>>>>>> and not reactive impl which is what would be done today with both support I
>>>>>>>>> fear.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>>>>>> it as well.
>>>>>>>>>>>
>>>>>>>>>>> This way you register an execution chain. Mixed with streams you
>>>>>>>>>>> get a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>>>>>>> performing manner ;).
>>>>>>>>>>>
>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>>>>
>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>> asynchronous activity with a callback (the callback must be registered with
>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done until all
>>>>>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>
>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>
>>>>>>>>>>>>> If you're talking about reactive programming, at a certain
>>>>>>>>>>>>> level beam is already reactive. Are you referring to a specific way of
>>>>>>>>>>>>> writing the code?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow
>>>>>>>>>>>>>>> to scale way more without having to hardly synchronize multithreading.
>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a
>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which
>>>>>>>>>>>>>>>> is that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>           ... launch some futures, put them in instance
>>>>>>>>>>>>>>>> vars ...
>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, share a
>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading acceptable for your
>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network calls.
>>>>>>>>>>>>>>>>> These pipelines are currently super slow, and the hypothesis is that they
>>>>>>>>>>>>>>>>> are slow because we are not threading for our network calls. The github
>>>>>>>>>>>>>>>>> issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which
>>>>>>>>>>>>>>>>> helped with this. However, this was removed because it didn't comply with a
>>>>>>>>>>>>>>>>> few BEAM paradigms.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network calls?
>>>>>>>>>>>>>>>>> It seems bundling the elements into groups of size X prior to passing to
>>>>>>>>>>>>>>>>> the DoFn, and managing the threading within the function might work.
>>>>>>>>>>>>>>>>> thoughts?
>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>> Are there any plans to develop features that help with
>>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
Which is still a key feature for sdf but agree it can be dropped for an
outputemitter pattern and the dofn moved to a plain parameters injection
based pattern. Both (which completionstage) stays compatible :).

Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :

> I think process context should go away completely. At that point it has
> little use except for a way to send output downstream.
>
> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <rm...@gmail.com>
> wrote:
>
>> Hmm, thinking out loud but completionstage should/could be extended to
>> replace processcontext since it represents element and output at the same
>> time no?
>>
>> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a écrit :
>>
>>> Yea, I think it could. But it is probably more readable to not overload
>>> the term, plus certainly a bit simpler in implementation. So perhaps
>>> @AsyncElement to make it very clear.
>>>
>>> Kenn
>>>
>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Ken, can NewDoFn distinguish at generation time the difference between:
>>>>
>>>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>>>
>>>> and
>>>>
>>>>     public void process(@Element Input element, ...) {
>>>>
>>>> If not, then we would probably need separate annotations....
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com>
>>>> wrote:
>>>>
>>>>> Nice! I agree that providing a CompletionStage for chaining is much
>>>>> better than an ExecutorService, and very clear.
>>>>>
>>>>> It is very feasible to add support that looks like
>>>>>
>>>>>   new DoFn<InputT, OutputT>() {
>>>>>     @ProcessElement
>>>>>     public void process(@Element CompletionStage<InputT> element, ...)
>>>>> {
>>>>>       element.thenApply(...)
>>>>>     }
>>>>>   }
>>>>>
>>>>> If we had this available, I think users could even experiment with
>>>>> this often as it might help even where it isn't obvious.
>>>>>
>>>>> My main hesitation is that big part of Beam is giving a
>>>>> basic/imperative style of programming a DoFn that executes in a very smart
>>>>> functional/parallel way. Full future-oriented programming is not
>>>>> explored much outside of Javascript (and maybe Haskell) and requires
>>>>> greater discipline in programming in a functional manner - if you are
>>>>> mutating stuff in your callback you are going to have bugs, and then when
>>>>> you add concurrency control you are going to have bad performance and
>>>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>>>> all our support effort on teaching advanced programming technique.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>> rmannibucau@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>
>>>>>>> Have you considered drafting in detail what you think this API might
>>>>>>> look like?
>>>>>>>
>>>>>>
>>>>>>
>>>>>> Yes, but it is after the "enhancements" - for my use cases - and
>>>>>> "bugs" list so didn't started to work on it much.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> If it's a radically different API, it might be more appropriate as
>>>>>>> an alternative parallel Beam API rather than a replacement for the current
>>>>>>> API (there is also one such fluent API in the works).
>>>>>>>
>>>>>>
>>>>>> What I plan is to draft it on top of beam (so the "useless" case I
>>>>>> spoke about before) and then propose to impl it ~natively and move it as
>>>>>> main API for another major.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>
>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic one?) of
>>>>>>>>> what Kenn suggested.
>>>>>>>>>
>>>>>>>>> Note that with NewDoFn this need not be incompatible (so might not
>>>>>>>>> require waiting till Beam 3.0). We can recognize new parameters to
>>>>>>>>> processElement and populate add needed.
>>>>>>>>>
>>>>>>>>
>>>>>>>> This is right however in my head it was a single way movemenent to
>>>>>>>> enforce the design to be reactive and not fake a reactive API with a sync
>>>>>>>> and not reactive impl which is what would be done today with both support I
>>>>>>>> fear.
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>>>>> it as well.
>>>>>>>>>>
>>>>>>>>>> This way you register an execution chain. Mixed with streams you
>>>>>>>>>> get a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>>>>>> performing manner ;).
>>>>>>>>>>
>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>>>
>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>> asynchronous activity with a callback (the callback must be registered with
>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done until all
>>>>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>
>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>
>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>>>>>
>>>>>>>>>>>> If you're talking about reactive programming, at a certain
>>>>>>>>>>>> level beam is already reactive. Are you referring to a specific way of
>>>>>>>>>>>> writing the code?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow
>>>>>>>>>>>>>> to scale way more without having to hardly synchronize multithreading.
>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a
>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which is
>>>>>>>>>>>>>>> that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>           ... launch some futures, put them in instance vars
>>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>          ... block on futures, output results if appropriate
>>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, share a
>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are willing
>>>>>>>>>>>>>>> to do the knob tuning to get the threading acceptable for your particular
>>>>>>>>>>>>>>> use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Our team has a pipeline that make external network calls.
>>>>>>>>>>>>>>>> These pipelines are currently super slow, and the hypothesis is that they
>>>>>>>>>>>>>>>> are slow because we are not threading for our network calls. The github
>>>>>>>>>>>>>>>> issue below provides some discussion around this:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which
>>>>>>>>>>>>>>>> helped with this. However, this was removed because it didn't comply with a
>>>>>>>>>>>>>>>> few BEAM paradigms.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network calls?
>>>>>>>>>>>>>>>> It seems bundling the elements into groups of size X prior to passing to
>>>>>>>>>>>>>>>> the DoFn, and managing the threading within the function might work.
>>>>>>>>>>>>>>>> thoughts?
>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>
>>>>>>

Re: Advice on parallelizing network calls in DoFn

Posted by Reuven Lax <re...@google.com>.
I think process context should go away completely. At that point it has
little use except for a way to send output downstream.

On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <rm...@gmail.com>
wrote:

> Hmm, thinking out loud but completionstage should/could be extended to
> replace processcontext since it represents element and output at the same
> time no?
>
> Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a écrit :
>
>> Yea, I think it could. But it is probably more readable to not overload
>> the term, plus certainly a bit simpler in implementation. So perhaps
>> @AsyncElement to make it very clear.
>>
>> Kenn
>>
>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Ken, can NewDoFn distinguish at generation time the difference between:
>>>
>>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>>
>>> and
>>>
>>>     public void process(@Element Input element, ...) {
>>>
>>> If not, then we would probably need separate annotations....
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com> wrote:
>>>
>>>> Nice! I agree that providing a CompletionStage for chaining is much
>>>> better than an ExecutorService, and very clear.
>>>>
>>>> It is very feasible to add support that looks like
>>>>
>>>>   new DoFn<InputT, OutputT>() {
>>>>     @ProcessElement
>>>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>>>       element.thenApply(...)
>>>>     }
>>>>   }
>>>>
>>>> If we had this available, I think users could even experiment with this
>>>> often as it might help even where it isn't obvious.
>>>>
>>>> My main hesitation is that big part of Beam is giving a
>>>> basic/imperative style of programming a DoFn that executes in a very smart
>>>> functional/parallel way. Full future-oriented programming is not
>>>> explored much outside of Javascript (and maybe Haskell) and requires
>>>> greater discipline in programming in a functional manner - if you are
>>>> mutating stuff in your callback you are going to have bugs, and then when
>>>> you add concurrency control you are going to have bad performance and
>>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>>> all our support effort on teaching advanced programming technique.
>>>>
>>>> Kenn
>>>>
>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>> rmannibucau@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>
>>>>>> Have you considered drafting in detail what you think this API might
>>>>>> look like?
>>>>>>
>>>>>
>>>>>
>>>>> Yes, but it is after the "enhancements" - for my use cases - and
>>>>> "bugs" list so didn't started to work on it much.
>>>>>
>>>>>
>>>>>>
>>>>>> If it's a radically different API, it might be more appropriate as an
>>>>>> alternative parallel Beam API rather than a replacement for the current API
>>>>>> (there is also one such fluent API in the works).
>>>>>>
>>>>>
>>>>> What I plan is to draft it on top of beam (so the "useless" case I
>>>>> spoke about before) and then propose to impl it ~natively and move it as
>>>>> main API for another major.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>
>>>>>>>> This is another version (maybe a better, Java 8 idiomatic one?) of
>>>>>>>> what Kenn suggested.
>>>>>>>>
>>>>>>>> Note that with NewDoFn this need not be incompatible (so might not
>>>>>>>> require waiting till Beam 3.0). We can recognize new parameters to
>>>>>>>> processElement and populate add needed.
>>>>>>>>
>>>>>>>
>>>>>>> This is right however in my head it was a single way movemenent to
>>>>>>> enforce the design to be reactive and not fake a reactive API with a sync
>>>>>>> and not reactive impl which is what would be done today with both support I
>>>>>>> fear.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>>>> it as well.
>>>>>>>>>
>>>>>>>>> This way you register an execution chain. Mixed with streams you
>>>>>>>>> get a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>>>>> performing manner ;).
>>>>>>>>>
>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>>
>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>> asynchronous activity with a callback (the callback must be registered with
>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done until all
>>>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>
>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>
>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>>>>
>>>>>>>>>>> If you're talking about reactive programming, at a certain level
>>>>>>>>>>> beam is already reactive. Are you referring to a specific way of writing
>>>>>>>>>>> the code?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to
>>>>>>>>>>>>> scale way more without having to hardly synchronize multithreading. Elegant
>>>>>>>>>>>>> and efficient :). Beam 3?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a
>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which is
>>>>>>>>>>>>>> that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>>>>>>>>>>> executorService) {
>>>>>>>>>>>>>>           ... launch some futures, put them in instance vars
>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>          ... block on futures, output results if appropriate
>>>>>>>>>>>>>> ...
>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, share a
>>>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are willing
>>>>>>>>>>>>>> to do the knob tuning to get the threading acceptable for your particular
>>>>>>>>>>>>>> use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Our team has a pipeline that make external network calls.
>>>>>>>>>>>>>>> These pipelines are currently super slow, and the hypothesis is that they
>>>>>>>>>>>>>>> are slow because we are not threading for our network calls. The github
>>>>>>>>>>>>>>> issue below provides some discussion around this:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which
>>>>>>>>>>>>>>> helped with this. However, this was removed because it didn't comply with a
>>>>>>>>>>>>>>> few BEAM paradigms.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What is advised for jobs that make blocking network calls?
>>>>>>>>>>>>>>> It seems bundling the elements into groups of size X prior to passing to
>>>>>>>>>>>>>>> the DoFn, and managing the threading within the function might work.
>>>>>>>>>>>>>>> thoughts?
>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>
>>>>>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
Hmm, thinking out loud but completionstage should/could be extended to
replace processcontext since it represents element and output at the same
time no?

Le 11 mars 2018 00:57, "Kenneth Knowles" <kl...@google.com> a écrit :

> Yea, I think it could. But it is probably more readable to not overload
> the term, plus certainly a bit simpler in implementation. So perhaps
> @AsyncElement to make it very clear.
>
> Kenn
>
> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:
>
>> Ken, can NewDoFn distinguish at generation time the difference between:
>>
>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>
>> and
>>
>>     public void process(@Element Input element, ...) {
>>
>> If not, then we would probably need separate annotations....
>>
>>
>>
>>
>>
>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com> wrote:
>>
>>> Nice! I agree that providing a CompletionStage for chaining is much
>>> better than an ExecutorService, and very clear.
>>>
>>> It is very feasible to add support that looks like
>>>
>>>   new DoFn<InputT, OutputT>() {
>>>     @ProcessElement
>>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>>       element.thenApply(...)
>>>     }
>>>   }
>>>
>>> If we had this available, I think users could even experiment with this
>>> often as it might help even where it isn't obvious.
>>>
>>> My main hesitation is that big part of Beam is giving a
>>> basic/imperative style of programming a DoFn that executes in a very smart
>>> functional/parallel way. Full future-oriented programming is not
>>> explored much outside of Javascript (and maybe Haskell) and requires
>>> greater discipline in programming in a functional manner - if you are
>>> mutating stuff in your callback you are going to have bugs, and then when
>>> you add concurrency control you are going to have bad performance and
>>> deadlocks. So I definitely wouldn't make it the default or want to spend
>>> all our support effort on teaching advanced programming technique.
>>>
>>> Kenn
>>>
>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>> rmannibucau@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>
>>>>> Have you considered drafting in detail what you think this API might
>>>>> look like?
>>>>>
>>>>
>>>>
>>>> Yes, but it is after the "enhancements" - for my use cases - and "bugs"
>>>> list so didn't started to work on it much.
>>>>
>>>>
>>>>>
>>>>> If it's a radically different API, it might be more appropriate as an
>>>>> alternative parallel Beam API rather than a replacement for the current API
>>>>> (there is also one such fluent API in the works).
>>>>>
>>>>
>>>> What I plan is to draft it on top of beam (so the "useless" case I
>>>> spoke about before) and then propose to impl it ~natively and move it as
>>>> main API for another major.
>>>>
>>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>> rmannibucau@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>
>>>>>>> This is another version (maybe a better, Java 8 idiomatic one?) of
>>>>>>> what Kenn suggested.
>>>>>>>
>>>>>>> Note that with NewDoFn this need not be incompatible (so might not
>>>>>>> require waiting till Beam 3.0). We can recognize new parameters to
>>>>>>> processElement and populate add needed.
>>>>>>>
>>>>>>
>>>>>> This is right however in my head it was a single way movemenent to
>>>>>> enforce the design to be reactive and not fake a reactive API with a sync
>>>>>> and not reactive impl which is what would be done today with both support I
>>>>>> fear.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>
>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>>> it as well.
>>>>>>>>
>>>>>>>> This way you register an execution chain. Mixed with streams you
>>>>>>>> get a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>>>> performing manner ;).
>>>>>>>>
>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>
>>>>>>>>> So you mean the user should have a way of registering asynchronous
>>>>>>>>> activity with a callback (the callback must be registered with Beam,
>>>>>>>>> because Beam needs to know not to mark the element as done until all
>>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>
>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>>> an unified fashion.
>>>>>>>>>>
>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>>>
>>>>>>>>>> If you're talking about reactive programming, at a certain level
>>>>>>>>>> beam is already reactive. Are you referring to a specific way of writing
>>>>>>>>>> the code?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to
>>>>>>>>>>>> scale way more without having to hardly synchronize multithreading. Elegant
>>>>>>>>>>>> and efficient :). Beam 3?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a
>>>>>>>>>>>> écrit :
>>>>>>>>>>>>
>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which is
>>>>>>>>>>>>> that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>
>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>>>>>>>>>> executorService) {
>>>>>>>>>>>>>           ... launch some futures, put them in instance vars
>>>>>>>>>>>>> ...
>>>>>>>>>>>>>       }
>>>>>>>>>>>>>
>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>          ... block on futures, output results if appropriate
>>>>>>>>>>>>> ...
>>>>>>>>>>>>>       }
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>
>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, share a
>>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I haven't explored rolling your own here, if you are willing
>>>>>>>>>>>>> to do the knob tuning to get the threading acceptable for your particular
>>>>>>>>>>>>> use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Our team has a pipeline that make external network calls.
>>>>>>>>>>>>>> These pipelines are currently super slow, and the hypothesis is that they
>>>>>>>>>>>>>> are slow because we are not threading for our network calls. The github
>>>>>>>>>>>>>> issue below provides some discussion around this:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which
>>>>>>>>>>>>>> helped with this. However, this was removed because it didn't comply with a
>>>>>>>>>>>>>> few BEAM paradigms.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What is advised for jobs that make blocking network calls? It
>>>>>>>>>>>>>> seems bundling the elements into groups of size X prior to passing to the
>>>>>>>>>>>>>> DoFn, and managing the threading within the function might work. thoughts?
>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>
>>>>

Re: Advice on parallelizing network calls in DoFn

Posted by Kenneth Knowles <kl...@google.com>.
Yea, I think it could. But it is probably more readable to not overload the
term, plus certainly a bit simpler in implementation. So perhaps
@AsyncElement to make it very clear.

Kenn

On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:

> Ken, can NewDoFn distinguish at generation time the difference between:
>
>     public void process(@Element CompletionStage<InputT> element, ...) {
>
> and
>
>     public void process(@Element Input element, ...) {
>
> If not, then we would probably need separate annotations....
>
>
>
>
>
> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com> wrote:
>
>> Nice! I agree that providing a CompletionStage for chaining is much
>> better than an ExecutorService, and very clear.
>>
>> It is very feasible to add support that looks like
>>
>>   new DoFn<InputT, OutputT>() {
>>     @ProcessElement
>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>       element.thenApply(...)
>>     }
>>   }
>>
>> If we had this available, I think users could even experiment with this
>> often as it might help even where it isn't obvious.
>>
>> My main hesitation is that big part of Beam is giving a basic/imperative
>> style of programming a DoFn that executes in a very smart
>> functional/parallel way. Full future-oriented programming is not
>> explored much outside of Javascript (and maybe Haskell) and requires
>> greater discipline in programming in a functional manner - if you are
>> mutating stuff in your callback you are going to have bugs, and then when
>> you add concurrency control you are going to have bad performance and
>> deadlocks. So I definitely wouldn't make it the default or want to spend
>> all our support effort on teaching advanced programming technique.
>>
>> Kenn
>>
>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <rm...@gmail.com>
>> wrote:
>>
>>>
>>>
>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>
>>>> Have you considered drafting in detail what you think this API might
>>>> look like?
>>>>
>>>
>>>
>>> Yes, but it is after the "enhancements" - for my use cases - and "bugs"
>>> list so didn't started to work on it much.
>>>
>>>
>>>>
>>>> If it's a radically different API, it might be more appropriate as an
>>>> alternative parallel Beam API rather than a replacement for the current API
>>>> (there is also one such fluent API in the works).
>>>>
>>>
>>> What I plan is to draft it on top of beam (so the "useless" case I spoke
>>> about before) and then propose to impl it ~natively and move it as main API
>>> for another major.
>>>
>>>
>>>
>>>
>>>>
>>>>
>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>> rmannibucau@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>
>>>>>> This is another version (maybe a better, Java 8 idiomatic one?) of
>>>>>> what Kenn suggested.
>>>>>>
>>>>>> Note that with NewDoFn this need not be incompatible (so might not
>>>>>> require waiting till Beam 3.0). We can recognize new parameters to
>>>>>> processElement and populate add needed.
>>>>>>
>>>>>
>>>>> This is right however in my head it was a single way movemenent to
>>>>> enforce the design to be reactive and not fake a reactive API with a sync
>>>>> and not reactive impl which is what would be done today with both support I
>>>>> fear.
>>>>>
>>>>>
>>>>>>
>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>
>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>>> it as well.
>>>>>>>
>>>>>>> This way you register an execution chain. Mixed with streams you get
>>>>>>> a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>>> performing manner ;).
>>>>>>>
>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>
>>>>>>>> So you mean the user should have a way of registering asynchronous
>>>>>>>> activity with a callback (the callback must be registered with Beam,
>>>>>>>> because Beam needs to know not to mark the element as done until all
>>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>>> when remote IO are involved.
>>>>>>>>>
>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>>> an unified fashion.
>>>>>>>>>
>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>>
>>>>>>>>> If you're talking about reactive programming, at a certain level
>>>>>>>>> beam is already reactive. Are you referring to a specific way of writing
>>>>>>>>> the code?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to
>>>>>>>>>>> scale way more without having to hardly synchronize multithreading. Elegant
>>>>>>>>>>> and efficient :). Beam 3?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a
>>>>>>>>>>> écrit :
>>>>>>>>>>>
>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which is
>>>>>>>>>>>> that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>
>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>>>>>>>>> executorService) {
>>>>>>>>>>>>           ... launch some futures, put them in instance vars ...
>>>>>>>>>>>>       }
>>>>>>>>>>>>
>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>          ... block on futures, output results if appropriate ...
>>>>>>>>>>>>       }
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, share a
>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>
>>>>>>>>>>>> I haven't explored rolling your own here, if you are willing to
>>>>>>>>>>>> do the knob tuning to get the threading acceptable for your particular use
>>>>>>>>>>>> case. Perhaps someone else can weigh in.
>>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Our team has a pipeline that make external network calls.
>>>>>>>>>>>>> These pipelines are currently super slow, and the hypothesis is that they
>>>>>>>>>>>>> are slow because we are not threading for our network calls. The github
>>>>>>>>>>>>> issue below provides some discussion around this:
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>
>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which
>>>>>>>>>>>>> helped with this. However, this was removed because it didn't comply with a
>>>>>>>>>>>>> few BEAM paradigms.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>
>>>>>>>>>>>>> What is advised for jobs that make blocking network calls? It
>>>>>>>>>>>>> seems bundling the elements into groups of size X prior to passing to the
>>>>>>>>>>>>> DoFn, and managing the threading within the function might work. thoughts?
>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>
>>>

Re: Advice on parallelizing network calls in DoFn

Posted by Reuven Lax <re...@google.com>.
Ken, can NewDoFn distinguish at generation time the difference between:

    public void process(@Element CompletionStage<InputT> element, ...) {

and

    public void process(@Element Input element, ...) {

If not, then we would probably need separate annotations....





On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <kl...@google.com> wrote:

> Nice! I agree that providing a CompletionStage for chaining is much better
> than an ExecutorService, and very clear.
>
> It is very feasible to add support that looks like
>
>   new DoFn<InputT, OutputT>() {
>     @ProcessElement
>     public void process(@Element CompletionStage<InputT> element, ...) {
>       element.thenApply(...)
>     }
>   }
>
> If we had this available, I think users could even experiment with this
> often as it might help even where it isn't obvious.
>
> My main hesitation is that big part of Beam is giving a basic/imperative
> style of programming a DoFn that executes in a very smart
> functional/parallel way. Full future-oriented programming is not explored
> much outside of Javascript (and maybe Haskell) and requires greater
> discipline in programming in a functional manner - if you are mutating
> stuff in your callback you are going to have bugs, and then when you add
> concurrency control you are going to have bad performance and deadlocks. So
> I definitely wouldn't make it the default or want to spend all our support
> effort on teaching advanced programming technique.
>
> Kenn
>
> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <rm...@gmail.com>
> wrote:
>
>>
>>
>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>
>>> Have you considered drafting in detail what you think this API might
>>> look like?
>>>
>>
>>
>> Yes, but it is after the "enhancements" - for my use cases - and "bugs"
>> list so didn't started to work on it much.
>>
>>
>>>
>>> If it's a radically different API, it might be more appropriate as an
>>> alternative parallel Beam API rather than a replacement for the current API
>>> (there is also one such fluent API in the works).
>>>
>>
>> What I plan is to draft it on top of beam (so the "useless" case I spoke
>> about before) and then propose to impl it ~natively and move it as main API
>> for another major.
>>
>>
>>
>>
>>>
>>>
>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>> rmannibucau@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>
>>>>> This is another version (maybe a better, Java 8 idiomatic one?) of
>>>>> what Kenn suggested.
>>>>>
>>>>> Note that with NewDoFn this need not be incompatible (so might not
>>>>> require waiting till Beam 3.0). We can recognize new parameters to
>>>>> processElement and populate add needed.
>>>>>
>>>>
>>>> This is right however in my head it was a single way movemenent to
>>>> enforce the design to be reactive and not fake a reactive API with a sync
>>>> and not reactive impl which is what would be done today with both support I
>>>> fear.
>>>>
>>>>
>>>>>
>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>> rmannibucau@gmail.com> wrote:
>>>>>
>>>>>> Yes, for the dofn for instance, instead of having
>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>> it as well.
>>>>>>
>>>>>> This way you register an execution chain. Mixed with streams you get
>>>>>> a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>> performing manner ;).
>>>>>>
>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>
>>>>>>> So you mean the user should have a way of registering asynchronous
>>>>>>> activity with a callback (the callback must be registered with Beam,
>>>>>>> because Beam needs to know not to mark the element as done until all
>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>
>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>> when remote IO are involved.
>>>>>>>>
>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>>> an unified fashion.
>>>>>>>>
>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>
>>>>>>>> If you're talking about reactive programming, at a certain level
>>>>>>>> beam is already reactive. Are you referring to a specific way of writing
>>>>>>>> the code?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>> What do you mean by reactive?
>>>>>>>>>
>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to
>>>>>>>>>> scale way more without having to hardly synchronize multithreading. Elegant
>>>>>>>>>> and efficient :). Beam 3?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a
>>>>>>>>>> écrit :
>>>>>>>>>>
>>>>>>>>>>> I will start with the "exciting futuristic" answer, which is
>>>>>>>>>>> that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>
>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>>>>>>>> executorService) {
>>>>>>>>>>>           ... launch some futures, put them in instance vars ...
>>>>>>>>>>>       }
>>>>>>>>>>>
>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>          ... block on futures, output results if appropriate ...
>>>>>>>>>>>       }
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>> This way, the Java SDK harness can use its overarching knowledge
>>>>>>>>>>> of what is going on in a computation to, for example, share a thread pool
>>>>>>>>>>> between different bits. This was one reason to delete
>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>
>>>>>>>>>>> I haven't explored rolling your own here, if you are willing to
>>>>>>>>>>> do the knob tuning to get the threading acceptable for your particular use
>>>>>>>>>>> case. Perhaps someone else can weigh in.
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>
>>>>>>>>>>>> Our team has a pipeline that make external network calls. These
>>>>>>>>>>>> pipelines are currently super slow, and the hypothesis is that they are
>>>>>>>>>>>> slow because we are not threading for our network calls. The github issue
>>>>>>>>>>>> below provides some discussion around this:
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>
>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which helped
>>>>>>>>>>>> with this. However, this was removed because it didn't comply with a few
>>>>>>>>>>>> BEAM paradigms.
>>>>>>>>>>>>
>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>
>>>>>>>>>>>> What is advised for jobs that make blocking network calls? It
>>>>>>>>>>>> seems bundling the elements into groups of size X prior to passing to the
>>>>>>>>>>>> DoFn, and managing the threading within the function might work. thoughts?
>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>
>>

Re: Advice on parallelizing network calls in DoFn

Posted by Kenneth Knowles <kl...@google.com>.
Nice! I agree that providing a CompletionStage for chaining is much better
than an ExecutorService, and very clear.

It is very feasible to add support that looks like

  new DoFn<InputT, OutputT>() {
    @ProcessElement
    public void process(@Element CompletionStage<InputT> element, ...) {
      element.thenApply(...)
    }
  }

If we had this available, I think users could even experiment with this
often as it might help even where it isn't obvious.

My main hesitation is that big part of Beam is giving a basic/imperative
style of programming a DoFn that executes in a very smart
functional/parallel way. Full future-oriented programming is not explored
much outside of Javascript (and maybe Haskell) and requires greater
discipline in programming in a functional manner - if you are mutating
stuff in your callback you are going to have bugs, and then when you add
concurrency control you are going to have bad performance and deadlocks. So
I definitely wouldn't make it the default or want to spend all our support
effort on teaching advanced programming technique.

Kenn

On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <rm...@gmail.com>
wrote:

>
>
> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>
>> Have you considered drafting in detail what you think this API might look
>> like?
>>
>
>
> Yes, but it is after the "enhancements" - for my use cases - and "bugs"
> list so didn't started to work on it much.
>
>
>>
>> If it's a radically different API, it might be more appropriate as an
>> alternative parallel Beam API rather than a replacement for the current API
>> (there is also one such fluent API in the works).
>>
>
> What I plan is to draft it on top of beam (so the "useless" case I spoke
> about before) and then propose to impl it ~natively and move it as main API
> for another major.
>
>
>
>
>>
>>
>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <rm...@gmail.com>
>> wrote:
>>
>>>
>>>
>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>
>>>> This is another version (maybe a better, Java 8 idiomatic one?) of what
>>>> Kenn suggested.
>>>>
>>>> Note that with NewDoFn this need not be incompatible (so might not
>>>> require waiting till Beam 3.0). We can recognize new parameters to
>>>> processElement and populate add needed.
>>>>
>>>
>>> This is right however in my head it was a single way movemenent to
>>> enforce the design to be reactive and not fake a reactive API with a sync
>>> and not reactive impl which is what would be done today with both support I
>>> fear.
>>>
>>>
>>>>
>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>> rmannibucau@gmail.com> wrote:
>>>>
>>>>> Yes, for the dofn for instance, instead of having
>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>> it as well.
>>>>>
>>>>> This way you register an execution chain. Mixed with streams you get a
>>>>> big data java 8/9/10 API which enabkes any connectivity in a wel performing
>>>>> manner ;).
>>>>>
>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>>
>>>>>> So you mean the user should have a way of registering asynchronous
>>>>>> activity with a callback (the callback must be registered with Beam,
>>>>>> because Beam needs to know not to mark the element as done until all
>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>> was suggesting, unless I'm missing something.
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>
>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>> when remote IO are involved.
>>>>>>>
>>>>>>> However it requires to break source, sdf design to use
>>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>>> an unified fashion.
>>>>>>>
>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>
>>>>>>> If you're talking about reactive programming, at a certain level
>>>>>>> beam is already reactive. Are you referring to a specific way of writing
>>>>>>> the code?
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> What do you mean by reactive?
>>>>>>>>
>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to
>>>>>>>>> scale way more without having to hardly synchronize multithreading. Elegant
>>>>>>>>> and efficient :). Beam 3?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>>>>>>
>>>>>>>>>> I will start with the "exciting futuristic" answer, which is that
>>>>>>>>>> we envision the new DoFn to be able to provide an automatic ExecutorService
>>>>>>>>>> parameters that you can use as you wish.
>>>>>>>>>>
>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>       @ProcessElement
>>>>>>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>>>>>>> executorService) {
>>>>>>>>>>           ... launch some futures, put them in instance vars ...
>>>>>>>>>>       }
>>>>>>>>>>
>>>>>>>>>>       @FinishBundle
>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>          ... block on futures, output results if appropriate ...
>>>>>>>>>>       }
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>> This way, the Java SDK harness can use its overarching knowledge
>>>>>>>>>> of what is going on in a computation to, for example, share a thread pool
>>>>>>>>>> between different bits. This was one reason to delete
>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>
>>>>>>>>>> I haven't explored rolling your own here, if you are willing to
>>>>>>>>>> do the knob tuning to get the threading acceptable for your particular use
>>>>>>>>>> case. Perhaps someone else can weigh in.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello all:
>>>>>>>>>>>
>>>>>>>>>>> Our team has a pipeline that make external network calls. These
>>>>>>>>>>> pipelines are currently super slow, and the hypothesis is that they are
>>>>>>>>>>> slow because we are not threading for our network calls. The github issue
>>>>>>>>>>> below provides some discussion around this:
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>
>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which helped
>>>>>>>>>>> with this. However, this was removed because it didn't comply with a few
>>>>>>>>>>> BEAM paradigms.
>>>>>>>>>>>
>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>
>>>>>>>>>>> What is advised for jobs that make blocking network calls? It
>>>>>>>>>>> seems bundling the elements into groups of size X prior to passing to the
>>>>>>>>>>> DoFn, and managing the threading within the function might work. thoughts?
>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:

> Have you considered drafting in detail what you think this API might look
> like?
>


Yes, but it is after the "enhancements" - for my use cases - and "bugs"
list so didn't started to work on it much.


>
> If it's a radically different API, it might be more appropriate as an
> alternative parallel Beam API rather than a replacement for the current API
> (there is also one such fluent API in the works).
>

What I plan is to draft it on top of beam (so the "useless" case I spoke
about before) and then propose to impl it ~natively and move it as main API
for another major.




>
>
> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <rm...@gmail.com>
> wrote:
>
>>
>>
>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>
>>> This is another version (maybe a better, Java 8 idiomatic one?) of what
>>> Kenn suggested.
>>>
>>> Note that with NewDoFn this need not be incompatible (so might not
>>> require waiting till Beam 3.0). We can recognize new parameters to
>>> processElement and populate add needed.
>>>
>>
>> This is right however in my head it was a single way movemenent to
>> enforce the design to be reactive and not fake a reactive API with a sync
>> and not reactive impl which is what would be done today with both support I
>> fear.
>>
>>
>>>
>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <rm...@gmail.com>
>>> wrote:
>>>
>>>> Yes, for the dofn for instance, instead of having
>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>> it as well.
>>>>
>>>> This way you register an execution chain. Mixed with streams you get a
>>>> big data java 8/9/10 API which enabkes any connectivity in a wel performing
>>>> manner ;).
>>>>
>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>
>>>>> So you mean the user should have a way of registering asynchronous
>>>>> activity with a callback (the callback must be registered with Beam,
>>>>> because Beam needs to know not to mark the element as done until all
>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>> was suggesting, unless I'm missing something.
>>>>>
>>>>>
>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>> rmannibucau@gmail.com> wrote:
>>>>>
>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>> issues properly and to have better scalability on the overall execution
>>>>>> when remote IO are involved.
>>>>>>
>>>>>> However it requires to break source, sdf design to use
>>>>>> completionstage - or equivalent - to chain the processing properly and in
>>>>>> an unified fashion.
>>>>>>
>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>
>>>>>> If you're talking about reactive programming, at a certain level beam
>>>>>> is already reactive. Are you referring to a specific way of writing the
>>>>>> code?
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> What do you mean by reactive?
>>>>>>>
>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>>
>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to
>>>>>>>> scale way more without having to hardly synchronize multithreading. Elegant
>>>>>>>> and efficient :). Beam 3?
>>>>>>>>
>>>>>>>>
>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>>>>>
>>>>>>>>> I will start with the "exciting futuristic" answer, which is that
>>>>>>>>> we envision the new DoFn to be able to provide an automatic ExecutorService
>>>>>>>>> parameters that you can use as you wish.
>>>>>>>>>
>>>>>>>>>     new DoFn<>() {
>>>>>>>>>       @ProcessElement
>>>>>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>>>>>> executorService) {
>>>>>>>>>           ... launch some futures, put them in instance vars ...
>>>>>>>>>       }
>>>>>>>>>
>>>>>>>>>       @FinishBundle
>>>>>>>>>       public void finish(...) {
>>>>>>>>>          ... block on futures, output results if appropriate ...
>>>>>>>>>       }
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>> This way, the Java SDK harness can use its overarching knowledge
>>>>>>>>> of what is going on in a computation to, for example, share a thread pool
>>>>>>>>> between different bits. This was one reason to delete
>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>> FinishBundle, etc.
>>>>>>>>>
>>>>>>>>> I haven't explored rolling your own here, if you are willing to do
>>>>>>>>> the knob tuning to get the threading acceptable for your particular use
>>>>>>>>> case. Perhaps someone else can weigh in.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello all:
>>>>>>>>>>
>>>>>>>>>> Our team has a pipeline that make external network calls. These
>>>>>>>>>> pipelines are currently super slow, and the hypothesis is that they are
>>>>>>>>>> slow because we are not threading for our network calls. The github issue
>>>>>>>>>> below provides some discussion around this:
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>
>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which helped
>>>>>>>>>> with this. However, this was removed because it didn't comply with a few
>>>>>>>>>> BEAM paradigms.
>>>>>>>>>>
>>>>>>>>>> Questions going forward:
>>>>>>>>>>
>>>>>>>>>> What is advised for jobs that make blocking network calls? It
>>>>>>>>>> seems bundling the elements into groups of size X prior to passing to the
>>>>>>>>>> DoFn, and managing the threading within the function might work. thoughts?
>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>
>>>>>>
>>

Re: Advice on parallelizing network calls in DoFn

Posted by Reuven Lax <re...@google.com>.
Have you considered drafting in detail what you think this API might look
like?

If it's a radically different API, it might be more appropriate as an
alternative parallel Beam API rather than a replacement for the current API
(there is also one such fluent API in the works).


On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <rm...@gmail.com>
wrote:

>
>
> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>
>> This is another version (maybe a better, Java 8 idiomatic one?) of what
>> Kenn suggested.
>>
>> Note that with NewDoFn this need not be incompatible (so might not
>> require waiting till Beam 3.0). We can recognize new parameters to
>> processElement and populate add needed.
>>
>
> This is right however in my head it was a single way movemenent to enforce
> the design to be reactive and not fake a reactive API with a sync and not
> reactive impl which is what would be done today with both support I fear.
>
>
>>
>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <rm...@gmail.com>
>> wrote:
>>
>>> Yes, for the dofn for instance, instead of having
>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>> it as well.
>>>
>>> This way you register an execution chain. Mixed with streams you get a
>>> big data java 8/9/10 API which enabkes any connectivity in a wel performing
>>> manner ;).
>>>
>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>
>>>> So you mean the user should have a way of registering asynchronous
>>>> activity with a callback (the callback must be registered with Beam,
>>>> because Beam needs to know not to mark the element as done until all
>>>> associated callbacks have completed). I think that's basically what Kenn
>>>> was suggesting, unless I'm missing something.
>>>>
>>>>
>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>> rmannibucau@gmail.com> wrote:
>>>>
>>>>> Yes, callback based. Beam today is synchronous and until
>>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>> issues properly and to have better scalability on the overall execution
>>>>> when remote IO are involved.
>>>>>
>>>>> However it requires to break source, sdf design to use completionstage
>>>>> - or equivalent - to chain the processing properly and in an unified
>>>>> fashion.
>>>>>
>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>>>
>>>>> If you're talking about reactive programming, at a certain level beam
>>>>> is already reactive. Are you referring to a specific way of writing the
>>>>> code?
>>>>>
>>>>>
>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> What do you mean by reactive?
>>>>>>
>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>> rmannibucau@gmail.com> wrote:
>>>>>>
>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to scale
>>>>>>> way more without having to hardly synchronize multithreading. Elegant and
>>>>>>> efficient :). Beam 3?
>>>>>>>
>>>>>>>
>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>>>>
>>>>>>>> I will start with the "exciting futuristic" answer, which is that
>>>>>>>> we envision the new DoFn to be able to provide an automatic ExecutorService
>>>>>>>> parameters that you can use as you wish.
>>>>>>>>
>>>>>>>>     new DoFn<>() {
>>>>>>>>       @ProcessElement
>>>>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>>>>> executorService) {
>>>>>>>>           ... launch some futures, put them in instance vars ...
>>>>>>>>       }
>>>>>>>>
>>>>>>>>       @FinishBundle
>>>>>>>>       public void finish(...) {
>>>>>>>>          ... block on futures, output results if appropriate ...
>>>>>>>>       }
>>>>>>>>     }
>>>>>>>>
>>>>>>>> This way, the Java SDK harness can use its overarching knowledge of
>>>>>>>> what is going on in a computation to, for example, share a thread pool
>>>>>>>> between different bits. This was one reason to delete
>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>> FinishBundle, etc.
>>>>>>>>
>>>>>>>> I haven't explored rolling your own here, if you are willing to do
>>>>>>>> the knob tuning to get the threading acceptable for your particular use
>>>>>>>> case. Perhaps someone else can weigh in.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>>
>>>>>>>>> Hello all:
>>>>>>>>>
>>>>>>>>> Our team has a pipeline that make external network calls. These
>>>>>>>>> pipelines are currently super slow, and the hypothesis is that they are
>>>>>>>>> slow because we are not threading for our network calls. The github issue
>>>>>>>>> below provides some discussion around this:
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>
>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which helped
>>>>>>>>> with this. However, this was removed because it didn't comply with a few
>>>>>>>>> BEAM paradigms.
>>>>>>>>>
>>>>>>>>> Questions going forward:
>>>>>>>>>
>>>>>>>>> What is advised for jobs that make blocking network calls? It
>>>>>>>>> seems bundling the elements into groups of size X prior to passing to the
>>>>>>>>> DoFn, and managing the threading within the function might work. thoughts?
>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>
>>>>>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:

> This is another version (maybe a better, Java 8 idiomatic one?) of what
> Kenn suggested.
>
> Note that with NewDoFn this need not be incompatible (so might not require
> waiting till Beam 3.0). We can recognize new parameters to processElement
> and populate add needed.
>

This is right however in my head it was a single way movemenent to enforce
the design to be reactive and not fake a reactive API with a sync and not
reactive impl which is what would be done today with both support I fear.


>
> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <rm...@gmail.com>
> wrote:
>
>> Yes, for the dofn for instance, instead of having
>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>> it as well.
>>
>> This way you register an execution chain. Mixed with streams you get a
>> big data java 8/9/10 API which enabkes any connectivity in a wel performing
>> manner ;).
>>
>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>
>>> So you mean the user should have a way of registering asynchronous
>>> activity with a callback (the callback must be registered with Beam,
>>> because Beam needs to know not to mark the element as done until all
>>> associated callbacks have completed). I think that's basically what Kenn
>>> was suggesting, unless I'm missing something.
>>>
>>>
>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>> rmannibucau@gmail.com> wrote:
>>>
>>>> Yes, callback based. Beam today is synchronous and until
>>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>>> other parts do. Becoming reactive will enable to manage the threading
>>>> issues properly and to have better scalability on the overall execution
>>>> when remote IO are involved.
>>>>
>>>> However it requires to break source, sdf design to use completionstage
>>>> - or equivalent - to chain the processing properly and in an unified
>>>> fashion.
>>>>
>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>>
>>>> If you're talking about reactive programming, at a certain level beam
>>>> is already reactive. Are you referring to a specific way of writing the
>>>> code?
>>>>
>>>>
>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> What do you mean by reactive?
>>>>>
>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <rm...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to scale
>>>>>> way more without having to hardly synchronize multithreading. Elegant and
>>>>>> efficient :). Beam 3?
>>>>>>
>>>>>>
>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>>>
>>>>>>> I will start with the "exciting futuristic" answer, which is that we
>>>>>>> envision the new DoFn to be able to provide an automatic ExecutorService
>>>>>>> parameters that you can use as you wish.
>>>>>>>
>>>>>>>     new DoFn<>() {
>>>>>>>       @ProcessElement
>>>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>>>> executorService) {
>>>>>>>           ... launch some futures, put them in instance vars ...
>>>>>>>       }
>>>>>>>
>>>>>>>       @FinishBundle
>>>>>>>       public void finish(...) {
>>>>>>>          ... block on futures, output results if appropriate ...
>>>>>>>       }
>>>>>>>     }
>>>>>>>
>>>>>>> This way, the Java SDK harness can use its overarching knowledge of
>>>>>>> what is going on in a computation to, for example, share a thread pool
>>>>>>> between different bits. This was one reason to delete
>>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>> FinishBundle, etc.
>>>>>>>
>>>>>>> I haven't explored rolling your own here, if you are willing to do
>>>>>>> the knob tuning to get the threading acceptable for your particular use
>>>>>>> case. Perhaps someone else can weigh in.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>>
>>>>>>>> Hello all:
>>>>>>>>
>>>>>>>> Our team has a pipeline that make external network calls. These
>>>>>>>> pipelines are currently super slow, and the hypothesis is that they are
>>>>>>>> slow because we are not threading for our network calls. The github issue
>>>>>>>> below provides some discussion around this:
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>
>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which helped
>>>>>>>> with this. However, this was removed because it didn't comply with a few
>>>>>>>> BEAM paradigms.
>>>>>>>>
>>>>>>>> Questions going forward:
>>>>>>>>
>>>>>>>> What is advised for jobs that make blocking network calls? It seems
>>>>>>>> bundling the elements into groups of size X prior to passing to the DoFn,
>>>>>>>> and managing the threading within the function might work. thoughts?
>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>
>>>>

Re: Advice on parallelizing network calls in DoFn

Posted by Reuven Lax <re...@google.com>.
This is another version (maybe a better, Java 8 idiomatic one?) of what
Kenn suggested.

Note that with NewDoFn this need not be incompatible (so might not require
waiting till Beam 3.0). We can recognize new parameters to processElement
and populate add needed.

On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <rm...@gmail.com>
wrote:

> Yes, for the dofn for instance, instead of having
> processcontext.element()=<T> you get a CompletionStage<T> and output gets
> it as well.
>
> This way you register an execution chain. Mixed with streams you get a big
> data java 8/9/10 API which enabkes any connectivity in a wel performing
> manner ;).
>
> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>
>> So you mean the user should have a way of registering asynchronous
>> activity with a callback (the callback must be registered with Beam,
>> because Beam needs to know not to mark the element as done until all
>> associated callbacks have completed). I think that's basically what Kenn
>> was suggesting, unless I'm missing something.
>>
>>
>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <rm...@gmail.com>
>> wrote:
>>
>>> Yes, callback based. Beam today is synchronous and until
>>> bundles+combines are reactive friendly, beam will be synchronous whatever
>>> other parts do. Becoming reactive will enable to manage the threading
>>> issues properly and to have better scalability on the overall execution
>>> when remote IO are involved.
>>>
>>> However it requires to break source, sdf design to use completionstage -
>>> or equivalent - to chain the processing properly and in an unified fashion.
>>>
>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>
>>> If you're talking about reactive programming, at a certain level beam is
>>> already reactive. Are you referring to a specific way of writing the code?
>>>
>>>
>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> What do you mean by reactive?
>>>>
>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <rm...@gmail.com>
>>>> wrote:
>>>>
>>>>> @Kenn: why not preferring to make beam reactive? Would alow to scale
>>>>> way more without having to hardly synchronize multithreading. Elegant and
>>>>> efficient :). Beam 3?
>>>>>
>>>>>
>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>>
>>>>>> I will start with the "exciting futuristic" answer, which is that we
>>>>>> envision the new DoFn to be able to provide an automatic ExecutorService
>>>>>> parameters that you can use as you wish.
>>>>>>
>>>>>>     new DoFn<>() {
>>>>>>       @ProcessElement
>>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>>> executorService) {
>>>>>>           ... launch some futures, put them in instance vars ...
>>>>>>       }
>>>>>>
>>>>>>       @FinishBundle
>>>>>>       public void finish(...) {
>>>>>>          ... block on futures, output results if appropriate ...
>>>>>>       }
>>>>>>     }
>>>>>>
>>>>>> This way, the Java SDK harness can use its overarching knowledge of
>>>>>> what is going on in a computation to, for example, share a thread pool
>>>>>> between different bits. This was one reason to delete
>>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>>> on outstanding requests versus it being your responsibility in
>>>>>> FinishBundle, etc.
>>>>>>
>>>>>> I haven't explored rolling your own here, if you are willing to do
>>>>>> the knob tuning to get the threading acceptable for your particular use
>>>>>> case. Perhaps someone else can weigh in.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>>
>>>>>>> Hello all:
>>>>>>>
>>>>>>> Our team has a pipeline that make external network calls. These
>>>>>>> pipelines are currently super slow, and the hypothesis is that they are
>>>>>>> slow because we are not threading for our network calls. The github issue
>>>>>>> below provides some discussion around this:
>>>>>>>
>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>
>>>>>>> In beam 1.0, there was IntraBundleParallelization, which helped with
>>>>>>> this. However, this was removed because it didn't comply with a few BEAM
>>>>>>> paradigms.
>>>>>>>
>>>>>>> Questions going forward:
>>>>>>>
>>>>>>> What is advised for jobs that make blocking network calls? It seems
>>>>>>> bundling the elements into groups of size X prior to passing to the DoFn,
>>>>>>> and managing the threading within the function might work. thoughts?
>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>
>>>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
Yes, for the dofn for instance, instead of having
processcontext.element()=<T> you get a CompletionStage<T> and output gets
it as well.

This way you register an execution chain. Mixed with streams you get a big
data java 8/9/10 API which enabkes any connectivity in a wel performing
manner ;).

Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :

> So you mean the user should have a way of registering asynchronous
> activity with a callback (the callback must be registered with Beam,
> because Beam needs to know not to mark the element as done until all
> associated callbacks have completed). I think that's basically what Kenn
> was suggesting, unless I'm missing something.
>
>
> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <rm...@gmail.com>
> wrote:
>
>> Yes, callback based. Beam today is synchronous and until bundles+combines
>> are reactive friendly, beam will be synchronous whatever other parts do.
>> Becoming reactive will enable to manage the threading issues properly and
>> to have better scalability on the overall execution when remote IO are
>> involved.
>>
>> However it requires to break source, sdf design to use completionstage -
>> or equivalent - to chain the processing properly and in an unified fashion.
>>
>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>
>> If you're talking about reactive programming, at a certain level beam is
>> already reactive. Are you referring to a specific way of writing the code?
>>
>>
>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote:
>>
>>> What do you mean by reactive?
>>>
>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <rm...@gmail.com>
>>> wrote:
>>>
>>>> @Kenn: why not preferring to make beam reactive? Would alow to scale
>>>> way more without having to hardly synchronize multithreading. Elegant and
>>>> efficient :). Beam 3?
>>>>
>>>>
>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>>
>>>>> I will start with the "exciting futuristic" answer, which is that we
>>>>> envision the new DoFn to be able to provide an automatic ExecutorService
>>>>> parameters that you can use as you wish.
>>>>>
>>>>>     new DoFn<>() {
>>>>>       @ProcessElement
>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>> executorService) {
>>>>>           ... launch some futures, put them in instance vars ...
>>>>>       }
>>>>>
>>>>>       @FinishBundle
>>>>>       public void finish(...) {
>>>>>          ... block on futures, output results if appropriate ...
>>>>>       }
>>>>>     }
>>>>>
>>>>> This way, the Java SDK harness can use its overarching knowledge of
>>>>> what is going on in a computation to, for example, share a thread pool
>>>>> between different bits. This was one reason to delete
>>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>>> properly manage how many things were going on concurrently. And mostly the
>>>>> runner should own parallelizing to max out cores and what user code needs
>>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>>> on outstanding requests versus it being your responsibility in
>>>>> FinishBundle, etc.
>>>>>
>>>>> I haven't explored rolling your own here, if you are willing to do the
>>>>> knob tuning to get the threading acceptable for your particular use case.
>>>>> Perhaps someone else can weigh in.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>> josh.ferge@bounceexchange.com> wrote:
>>>>>
>>>>>> Hello all:
>>>>>>
>>>>>> Our team has a pipeline that make external network calls. These
>>>>>> pipelines are currently super slow, and the hypothesis is that they are
>>>>>> slow because we are not threading for our network calls. The github issue
>>>>>> below provides some discussion around this:
>>>>>>
>>>>>> https://github.com/apache/beam/pull/957
>>>>>>
>>>>>> In beam 1.0, there was IntraBundleParallelization, which helped with
>>>>>> this. However, this was removed because it didn't comply with a few BEAM
>>>>>> paradigms.
>>>>>>
>>>>>> Questions going forward:
>>>>>>
>>>>>> What is advised for jobs that make blocking network calls? It seems
>>>>>> bundling the elements into groups of size X prior to passing to the DoFn,
>>>>>> and managing the threading within the function might work. thoughts?
>>>>>> Are these types of jobs even suitable for beam?
>>>>>> Are there any plans to develop features that help with this?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>
>>

Re: Advice on parallelizing network calls in DoFn

Posted by Reuven Lax <re...@google.com>.
So you mean the user should have a way of registering asynchronous activity
with a callback (the callback must be registered with Beam, because Beam
needs to know not to mark the element as done until all associated
callbacks have completed). I think that's basically what Kenn was
suggesting, unless I'm missing something.


On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <rm...@gmail.com>
wrote:

> Yes, callback based. Beam today is synchronous and until bundles+combines
> are reactive friendly, beam will be synchronous whatever other parts do.
> Becoming reactive will enable to manage the threading issues properly and
> to have better scalability on the overall execution when remote IO are
> involved.
>
> However it requires to break source, sdf design to use completionstage -
> or equivalent - to chain the processing properly and in an unified fashion.
>
> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>
> If you're talking about reactive programming, at a certain level beam is
> already reactive. Are you referring to a specific way of writing the code?
>
>
> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote:
>
>> What do you mean by reactive?
>>
>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <rm...@gmail.com>
>> wrote:
>>
>>> @Kenn: why not preferring to make beam reactive? Would alow to scale way
>>> more without having to hardly synchronize multithreading. Elegant and
>>> efficient :). Beam 3?
>>>
>>>
>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a écrit :
>>>
>>>> I will start with the "exciting futuristic" answer, which is that we
>>>> envision the new DoFn to be able to provide an automatic ExecutorService
>>>> parameters that you can use as you wish.
>>>>
>>>>     new DoFn<>() {
>>>>       @ProcessElement
>>>>       public void process(ProcessContext ctx, ExecutorService
>>>> executorService) {
>>>>           ... launch some futures, put them in instance vars ...
>>>>       }
>>>>
>>>>       @FinishBundle
>>>>       public void finish(...) {
>>>>          ... block on futures, output results if appropriate ...
>>>>       }
>>>>     }
>>>>
>>>> This way, the Java SDK harness can use its overarching knowledge of
>>>> what is going on in a computation to, for example, share a thread pool
>>>> between different bits. This was one reason to delete
>>>> IntraBundleParallelization - it didn't allow the runner and user code to
>>>> properly manage how many things were going on concurrently. And mostly the
>>>> runner should own parallelizing to max out cores and what user code needs
>>>> is asynchrony hooks that can interact with that. However, this feature is
>>>> not thoroughly considered. TBD how much the harness itself manages blocking
>>>> on outstanding requests versus it being your responsibility in
>>>> FinishBundle, etc.
>>>>
>>>> I haven't explored rolling your own here, if you are willing to do the
>>>> knob tuning to get the threading acceptable for your particular use case.
>>>> Perhaps someone else can weigh in.
>>>>
>>>> Kenn
>>>>
>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>> josh.ferge@bounceexchange.com> wrote:
>>>>
>>>>> Hello all:
>>>>>
>>>>> Our team has a pipeline that make external network calls. These
>>>>> pipelines are currently super slow, and the hypothesis is that they are
>>>>> slow because we are not threading for our network calls. The github issue
>>>>> below provides some discussion around this:
>>>>>
>>>>> https://github.com/apache/beam/pull/957
>>>>>
>>>>> In beam 1.0, there was IntraBundleParallelization, which helped with
>>>>> this. However, this was removed because it didn't comply with a few BEAM
>>>>> paradigms.
>>>>>
>>>>> Questions going forward:
>>>>>
>>>>> What is advised for jobs that make blocking network calls? It seems
>>>>> bundling the elements into groups of size X prior to passing to the DoFn,
>>>>> and managing the threading within the function might work. thoughts?
>>>>> Are these types of jobs even suitable for beam?
>>>>> Are there any plans to develop features that help with this?
>>>>>
>>>>> Thanks
>>>>>
>>>>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
Yes, callback based. Beam today is synchronous and until bundles+combines
are reactive friendly, beam will be synchronous whatever other parts do.
Becoming reactive will enable to manage the threading issues properly and
to have better scalability on the overall execution when remote IO are
involved.

However it requires to break source, sdf design to use completionstage - or
equivalent - to chain the processing properly and in an unified fashion.

Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :

If you're talking about reactive programming, at a certain level beam is
already reactive. Are you referring to a specific way of writing the code?


On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote:

> What do you mean by reactive?
>
> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <rm...@gmail.com>
> wrote:
>
>> @Kenn: why not preferring to make beam reactive? Would alow to scale way
>> more without having to hardly synchronize multithreading. Elegant and
>> efficient :). Beam 3?
>>
>>
>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a écrit :
>>
>>> I will start with the "exciting futuristic" answer, which is that we
>>> envision the new DoFn to be able to provide an automatic ExecutorService
>>> parameters that you can use as you wish.
>>>
>>>     new DoFn<>() {
>>>       @ProcessElement
>>>       public void process(ProcessContext ctx, ExecutorService
>>> executorService) {
>>>           ... launch some futures, put them in instance vars ...
>>>       }
>>>
>>>       @FinishBundle
>>>       public void finish(...) {
>>>          ... block on futures, output results if appropriate ...
>>>       }
>>>     }
>>>
>>> This way, the Java SDK harness can use its overarching knowledge of what
>>> is going on in a computation to, for example, share a thread pool between
>>> different bits. This was one reason to delete IntraBundleParallelization -
>>> it didn't allow the runner and user code to properly manage how many things
>>> were going on concurrently. And mostly the runner should own parallelizing
>>> to max out cores and what user code needs is asynchrony hooks that can
>>> interact with that. However, this feature is not thoroughly considered. TBD
>>> how much the harness itself manages blocking on outstanding requests versus
>>> it being your responsibility in FinishBundle, etc.
>>>
>>> I haven't explored rolling your own here, if you are willing to do the
>>> knob tuning to get the threading acceptable for your particular use case.
>>> Perhaps someone else can weigh in.
>>>
>>> Kenn
>>>
>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <jo...@bounceexchange.com>
>>> wrote:
>>>
>>>> Hello all:
>>>>
>>>> Our team has a pipeline that make external network calls. These
>>>> pipelines are currently super slow, and the hypothesis is that they are
>>>> slow because we are not threading for our network calls. The github issue
>>>> below provides some discussion around this:
>>>>
>>>> https://github.com/apache/beam/pull/957
>>>>
>>>> In beam 1.0, there was IntraBundleParallelization, which helped with
>>>> this. However, this was removed because it didn't comply with a few BEAM
>>>> paradigms.
>>>>
>>>> Questions going forward:
>>>>
>>>> What is advised for jobs that make blocking network calls? It seems
>>>> bundling the elements into groups of size X prior to passing to the DoFn,
>>>> and managing the threading within the function might work. thoughts?
>>>> Are these types of jobs even suitable for beam?
>>>> Are there any plans to develop features that help with this?
>>>>
>>>> Thanks
>>>>
>>>

Re: Advice on parallelizing network calls in DoFn

Posted by Reuven Lax <re...@google.com>.
If you're talking about reactive programming, at a certain level beam is
already reactive. Are you referring to a specific way of writing the code?


On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote:

> What do you mean by reactive?
>
> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <rm...@gmail.com>
> wrote:
>
>> @Kenn: why not preferring to make beam reactive? Would alow to scale way
>> more without having to hardly synchronize multithreading. Elegant and
>> efficient :). Beam 3?
>>
>>
>> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a écrit :
>>
>>> I will start with the "exciting futuristic" answer, which is that we
>>> envision the new DoFn to be able to provide an automatic ExecutorService
>>> parameters that you can use as you wish.
>>>
>>>     new DoFn<>() {
>>>       @ProcessElement
>>>       public void process(ProcessContext ctx, ExecutorService
>>> executorService) {
>>>           ... launch some futures, put them in instance vars ...
>>>       }
>>>
>>>       @FinishBundle
>>>       public void finish(...) {
>>>          ... block on futures, output results if appropriate ...
>>>       }
>>>     }
>>>
>>> This way, the Java SDK harness can use its overarching knowledge of what
>>> is going on in a computation to, for example, share a thread pool between
>>> different bits. This was one reason to delete IntraBundleParallelization -
>>> it didn't allow the runner and user code to properly manage how many things
>>> were going on concurrently. And mostly the runner should own parallelizing
>>> to max out cores and what user code needs is asynchrony hooks that can
>>> interact with that. However, this feature is not thoroughly considered. TBD
>>> how much the harness itself manages blocking on outstanding requests versus
>>> it being your responsibility in FinishBundle, etc.
>>>
>>> I haven't explored rolling your own here, if you are willing to do the
>>> knob tuning to get the threading acceptable for your particular use case.
>>> Perhaps someone else can weigh in.
>>>
>>> Kenn
>>>
>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <jo...@bounceexchange.com>
>>> wrote:
>>>
>>>> Hello all:
>>>>
>>>> Our team has a pipeline that make external network calls. These
>>>> pipelines are currently super slow, and the hypothesis is that they are
>>>> slow because we are not threading for our network calls. The github issue
>>>> below provides some discussion around this:
>>>>
>>>> https://github.com/apache/beam/pull/957
>>>>
>>>> In beam 1.0, there was IntraBundleParallelization, which helped with
>>>> this. However, this was removed because it didn't comply with a few BEAM
>>>> paradigms.
>>>>
>>>> Questions going forward:
>>>>
>>>> What is advised for jobs that make blocking network calls? It seems
>>>> bundling the elements into groups of size X prior to passing to the DoFn,
>>>> and managing the threading within the function might work. thoughts?
>>>> Are these types of jobs even suitable for beam?
>>>> Are there any plans to develop features that help with this?
>>>>
>>>> Thanks
>>>>
>>>

Re: Advice on parallelizing network calls in DoFn

Posted by Reuven Lax <re...@google.com>.
What do you mean by reactive?

On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <rm...@gmail.com>
wrote:

> @Kenn: why not preferring to make beam reactive? Would alow to scale way
> more without having to hardly synchronize multithreading. Elegant and
> efficient :). Beam 3?
>
>
> Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a écrit :
>
>> I will start with the "exciting futuristic" answer, which is that we
>> envision the new DoFn to be able to provide an automatic ExecutorService
>> parameters that you can use as you wish.
>>
>>     new DoFn<>() {
>>       @ProcessElement
>>       public void process(ProcessContext ctx, ExecutorService
>> executorService) {
>>           ... launch some futures, put them in instance vars ...
>>       }
>>
>>       @FinishBundle
>>       public void finish(...) {
>>          ... block on futures, output results if appropriate ...
>>       }
>>     }
>>
>> This way, the Java SDK harness can use its overarching knowledge of what
>> is going on in a computation to, for example, share a thread pool between
>> different bits. This was one reason to delete IntraBundleParallelization -
>> it didn't allow the runner and user code to properly manage how many things
>> were going on concurrently. And mostly the runner should own parallelizing
>> to max out cores and what user code needs is asynchrony hooks that can
>> interact with that. However, this feature is not thoroughly considered. TBD
>> how much the harness itself manages blocking on outstanding requests versus
>> it being your responsibility in FinishBundle, etc.
>>
>> I haven't explored rolling your own here, if you are willing to do the
>> knob tuning to get the threading acceptable for your particular use case.
>> Perhaps someone else can weigh in.
>>
>> Kenn
>>
>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <jo...@bounceexchange.com>
>> wrote:
>>
>>> Hello all:
>>>
>>> Our team has a pipeline that make external network calls. These
>>> pipelines are currently super slow, and the hypothesis is that they are
>>> slow because we are not threading for our network calls. The github issue
>>> below provides some discussion around this:
>>>
>>> https://github.com/apache/beam/pull/957
>>>
>>> In beam 1.0, there was IntraBundleParallelization, which helped with
>>> this. However, this was removed because it didn't comply with a few BEAM
>>> paradigms.
>>>
>>> Questions going forward:
>>>
>>> What is advised for jobs that make blocking network calls? It seems
>>> bundling the elements into groups of size X prior to passing to the DoFn,
>>> and managing the threading within the function might work. thoughts?
>>> Are these types of jobs even suitable for beam?
>>> Are there any plans to develop features that help with this?
>>>
>>> Thanks
>>>
>>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
@Kenn: why not preferring to make beam reactive? Would alow to scale way
more without having to hardly synchronize multithreading. Elegant and
efficient :). Beam 3?


Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a écrit :

> I will start with the "exciting futuristic" answer, which is that we
> envision the new DoFn to be able to provide an automatic ExecutorService
> parameters that you can use as you wish.
>
>     new DoFn<>() {
>       @ProcessElement
>       public void process(ProcessContext ctx, ExecutorService
> executorService) {
>           ... launch some futures, put them in instance vars ...
>       }
>
>       @FinishBundle
>       public void finish(...) {
>          ... block on futures, output results if appropriate ...
>       }
>     }
>
> This way, the Java SDK harness can use its overarching knowledge of what
> is going on in a computation to, for example, share a thread pool between
> different bits. This was one reason to delete IntraBundleParallelization -
> it didn't allow the runner and user code to properly manage how many things
> were going on concurrently. And mostly the runner should own parallelizing
> to max out cores and what user code needs is asynchrony hooks that can
> interact with that. However, this feature is not thoroughly considered. TBD
> how much the harness itself manages blocking on outstanding requests versus
> it being your responsibility in FinishBundle, etc.
>
> I haven't explored rolling your own here, if you are willing to do the
> knob tuning to get the threading acceptable for your particular use case.
> Perhaps someone else can weigh in.
>
> Kenn
>
> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <jo...@bounceexchange.com>
> wrote:
>
>> Hello all:
>>
>> Our team has a pipeline that make external network calls. These pipelines
>> are currently super slow, and the hypothesis is that they are slow because
>> we are not threading for our network calls. The github issue below provides
>> some discussion around this:
>>
>> https://github.com/apache/beam/pull/957
>>
>> In beam 1.0, there was IntraBundleParallelization, which helped with
>> this. However, this was removed because it didn't comply with a few BEAM
>> paradigms.
>>
>> Questions going forward:
>>
>> What is advised for jobs that make blocking network calls? It seems
>> bundling the elements into groups of size X prior to passing to the DoFn,
>> and managing the threading within the function might work. thoughts?
>> Are these types of jobs even suitable for beam?
>> Are there any plans to develop features that help with this?
>>
>> Thanks
>>
>

Re: Advice on parallelizing network calls in DoFn

Posted by Romain Manni-Bucau <rm...@gmail.com>.
@Kenn: why not preferring to make beam reactive? Would alow to scale way
more without having to hardly synchronize multithreading. Elegant and
efficient :). Beam 3?


Le 9 mars 2018 22:49, "Kenneth Knowles" <kl...@google.com> a écrit :

> I will start with the "exciting futuristic" answer, which is that we
> envision the new DoFn to be able to provide an automatic ExecutorService
> parameters that you can use as you wish.
>
>     new DoFn<>() {
>       @ProcessElement
>       public void process(ProcessContext ctx, ExecutorService
> executorService) {
>           ... launch some futures, put them in instance vars ...
>       }
>
>       @FinishBundle
>       public void finish(...) {
>          ... block on futures, output results if appropriate ...
>       }
>     }
>
> This way, the Java SDK harness can use its overarching knowledge of what
> is going on in a computation to, for example, share a thread pool between
> different bits. This was one reason to delete IntraBundleParallelization -
> it didn't allow the runner and user code to properly manage how many things
> were going on concurrently. And mostly the runner should own parallelizing
> to max out cores and what user code needs is asynchrony hooks that can
> interact with that. However, this feature is not thoroughly considered. TBD
> how much the harness itself manages blocking on outstanding requests versus
> it being your responsibility in FinishBundle, etc.
>
> I haven't explored rolling your own here, if you are willing to do the
> knob tuning to get the threading acceptable for your particular use case.
> Perhaps someone else can weigh in.
>
> Kenn
>
> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <jo...@bounceexchange.com>
> wrote:
>
>> Hello all:
>>
>> Our team has a pipeline that make external network calls. These pipelines
>> are currently super slow, and the hypothesis is that they are slow because
>> we are not threading for our network calls. The github issue below provides
>> some discussion around this:
>>
>> https://github.com/apache/beam/pull/957
>>
>> In beam 1.0, there was IntraBundleParallelization, which helped with
>> this. However, this was removed because it didn't comply with a few BEAM
>> paradigms.
>>
>> Questions going forward:
>>
>> What is advised for jobs that make blocking network calls? It seems
>> bundling the elements into groups of size X prior to passing to the DoFn,
>> and managing the threading within the function might work. thoughts?
>> Are these types of jobs even suitable for beam?
>> Are there any plans to develop features that help with this?
>>
>> Thanks
>>
>