You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Xinyu Liu <xi...@gmail.com> on 2019/01/22 20:43:47 UTC

[DISCUSSION] ParDo Async Java API

Hi, guys,

As more users try out Beam running on the SamzaRunner, we got a lot of asks
for an asynchronous processing API. There are a few reasons for these asks:

   - The users here are experienced in asynchronous programming. With async
   frameworks such as Netty and ParSeq and libs like async jersey client, they
   are able to make remote calls efficiently and the libraries help manage the
   execution threads underneath. Async remote calls are very common in most of
   our streaming applications today.
   - Many jobs are running on a multi-tenancy cluster. Async processing
   helps for less resource usage and fast computation (less context switch).

I asked about the async support in a previous email thread. The following
API was mentioned in the reply:

  new DoFn<InputT, OutputT>() {
    @ProcessElement
    public void process(@Element CompletionStage<InputT> element, ...) {
      element.thenApply(...)
    }
  }

We are wondering whether there are any discussions on this API and related
docs. It is awesome that you guys already considered having DoFn to process
asynchronously. Out of curiosity, this API seems to create a
CompletionState out of the input element (probably using framework's
executor) and then allow user to chain on it. To us, it seems more
convenient if the DoFn output a CompletionStage<OutputT> or pass in a
CompletionStage<OutputT> to invoke upon completion.

We would like to discuss further on the async API and hopefully we will
have a great support in Beam. Really appreciate the feedback!

Thanks,
Xinyu

Re: [DISCUSSION] ParDo Async Java API

Posted by Xinyu Liu <xi...@gmail.com>.
I can speak on Samza's perspective: Samza only commits the messages once
the async callbacks have been completed. So if there are any failures, it
will recover from last checkpoint and reprocess the messages that we
haven't got the completion. So there is no data lost. The "Guaranteed
Semantics" in [1] has a little bit more details. I believe Flink honors the
same semantics by reading the "Fault Tolerance Guarantees" section in [2].

Thanks,
Xinyu

[1]:
https://samza.apache.org/learn/tutorials/0.11/samza-async-user-guide.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

On Tue, Jan 22, 2019 at 5:23 PM Reuven Lax <re...@google.com> wrote:

>
>
> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xi...@gmail.com> wrote:
>
>> @Steve: it's good to see that this is going to be useful in your use
>> cases as well. Thanks for sharing the code from Scio! I can see in your
>> implementation that waiting for the future completion is part of the
>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>> async support so the user-level code won't need to implement this logic,
>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>> after future completion[1], and Flink also has AsyncFunction api [2] which
>> provides a ResultFuture similar to the API we discussed.
>>
>
> Can this be done correctly? What I mean is that if the process dies, can
> you guarantee that no data is lost? Beam currently guarantees this for
> FinishBundle, but if you use an arbitrary async framework this might not be
> true.
>
>
>> A simple use case for this is to execute a Runnable asynchronously in
>> user's own executor. The following code illustrates Kenn's option #2, with
>> a very simple single-thread pool being the executor:
>>
>> new DoFn<InputT, OutputT>() {
>>   @ProcessElement
>>   public void process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>     CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
>>         () -> someOutput,
>>         Executors.newSingleThreadExecutor());
>>     outputReceiver.output(future);
>>   }
>> }
>>
>> The neat thing about this API is that the user can choose their own async framework and we only expect the output to be a CompletionStage.
>>
>>
>> For the implementation of bundling, can we compose a CompletableFuture from each element in the bundle, e.g. CompletableFuture.allOf(...), and then invoke @FinishBundle when this future is complete? Seems this might work.
>>
>> Thanks,
>> Xinyu
>>
>>
>> [1]
>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>
>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sn...@apache.org>
>> wrote:
>>
>>> I'd love to see something like this as well.  Also +1 to process(@Element
>>> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I
>>> don't know if there's much benefit to passing a future in, since the
>>> framework itself could hook up the process function to complete when the
>>> future completes.
>>>
>>> I feel like I've spent a bunch of time writing very similar "kick off a
>>> future in ProcessElement, join it in FinishBundle" code, and looking around
>>> beam itself a lot of built-in transforms do it as well.  Scio provides a
>>> few AsyncDoFn implementations [1] but it'd be great to see this as a
>>> first-class concept in beam itself.  Doing error handling, concurrency, etc
>>> correctly can be tricky.
>>>
>>> [1]
>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>
>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com> wrote:
>>>
>>>> If the input is a CompletionStage<InputT> then the output should also
>>>> be a CompletionStage<OutputT>, since all you should do is async chaining.
>>>> We could enforce this by giving the DoFn an
>>>> OutputReceiver(CompletionStage<OutputT>).
>>>>
>>>> Another possibility that might be even more robust against poor future
>>>> use could be process(@Element InputT element, @Output
>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>>> itself will be async chained, rather than counting on the user to do the
>>>> right thing.
>>>>
>>>> We should see how these look in real use cases. The way that processing
>>>> is split between @ProcessElement and @FinishBundle might complicate things.
>>>>
>>>> Kenn
>>>>
>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi, guys,
>>>>>
>>>>> As more users try out Beam running on the SamzaRunner, we got a lot of
>>>>> asks for an asynchronous processing API. There are a few reasons for these
>>>>> asks:
>>>>>
>>>>>    - The users here are experienced in asynchronous programming. With
>>>>>    async frameworks such as Netty and ParSeq and libs like async jersey
>>>>>    client, they are able to make remote calls efficiently and the libraries
>>>>>    help manage the execution threads underneath. Async remote calls are very
>>>>>    common in most of our streaming applications today.
>>>>>    - Many jobs are running on a multi-tenancy cluster. Async
>>>>>    processing helps for less resource usage and fast computation (less context
>>>>>    switch).
>>>>>
>>>>> I asked about the async support in a previous email thread. The
>>>>> following API was mentioned in the reply:
>>>>>
>>>>>   new DoFn<InputT, OutputT>() {
>>>>>     @ProcessElement
>>>>>     public void process(@Element CompletionStage<InputT> element, ...)
>>>>> {
>>>>>       element.thenApply(...)
>>>>>     }
>>>>>   }
>>>>>
>>>>> We are wondering whether there are any discussions on this API and
>>>>> related docs. It is awesome that you guys already considered having DoFn to
>>>>> process asynchronously. Out of curiosity, this API seems to create a
>>>>> CompletionState out of the input element (probably using framework's
>>>>> executor) and then allow user to chain on it. To us, it seems more
>>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>>>> CompletionStage<OutputT> to invoke upon completion.
>>>>>
>>>>> We would like to discuss further on the async API and hopefully we
>>>>> will have a great support in Beam. Really appreciate the feedback!
>>>>>
>>>>> Thanks,
>>>>> Xinyu
>>>>>
>>>>

Re: [DISCUSSION] ParDo Async Java API

Posted by Bharath Kumara Subramanian <co...@gmail.com>.
Let me start a separate proposal thread and link this conversation.
Sorry about that.

On Fri, Sep 13, 2019 at 9:31 AM Bharath Kumara Subramanian <
codin.martial@gmail.com> wrote:

> I have put together a design document
> <https://docs.google.com/document/d/1t--UYXgaij0ULEoXUnhG3r8OZPBljN9r_WWlwQJBDrI/edit?usp=sharing>
> that consolidates our discussion in this thread.
> Please let me know your thoughts.
>
> Thanks,
> Bharath
>
>
>
> On Wed, Jan 30, 2019 at 10:18 AM Xinyu Liu <xi...@gmail.com> wrote:
>
>> I put the asks and email discussions in this JIRA to track the Async API:
>> https://jira.apache.org/jira/browse/BEAM-6550. Bharath on the
>> SamzaRunner side is willing to take a stab at this. He will come up with
>> some design doc based on our discussions. Will update the thread once it's
>> ready. Really appreciate all the suggestions and feedback here.
>>
>> Thanks,
>> Xinyu
>>
>> On Thu, Jan 24, 2019 at 2:41 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> That's a good point that this "IO" time should be tracked differently.
>>>
>>> For a single level, a wrapper/utility that correctly and completely
>>> (and transparently) implements the "naive" bit I sketched above under
>>> the hood may be sufficient and implementable purely in user-space, and
>>> quite useful.
>>>
>>> On Thu, Jan 24, 2019 at 7:38 PM Scott Wegner <sc...@apache.org> wrote:
>>> >
>>> > Makes sense to me. We should make it easier to write DoFn's in this
>>> pattern that has emerged as common among I/O connectors.
>>> >
>>> > Enabling asynchronous task chaining across a fusion tree is more
>>> complicated but not necessary for this scenario.
>>> >
>>> > On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz <sn...@apache.org>
>>> wrote:
>>> >>
>>> >> It's also important to note that in many (most?) IO frameworks (gRPC,
>>> finagle, etc), asynchronous IO is typically completely non-blocking, so
>>> there generally won't be a large number of threads waiting for IO to
>>> complete.  (netty uses a small pool of threads for the Event Loop Group for
>>> example).
>>> >>
>>> >> But in general I agree with Reuven, runners should not count threads
>>> in use in other thread pools for IO for the purpose of autoscaling (or most
>>> kinds of accounting).
>>> >>
>>> >> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <re...@google.com> wrote:
>>> >>>
>>> >>> As Steve said, the main rationale for this is so that asynchronous
>>> IOs (or in general, asynchronous remote calls) call be made. To some degree
>>> this addresses Scott's concern: the asynchronous threads should be, for the
>>> most part, simply waiting for IOs to complete; the reason to do the waiting
>>> asynchronously is so that the main threadpool does not become blocked,
>>> causing the pipeline to become IO bound. A runner like Dataflow should not
>>> be tracking these threads for the purpose of autoscaling, as adding more
>>> workers will (usually) not cause these calls to complete any faster.
>>> >>>
>>> >>> Reuven
>>> >>>
>>> >>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <sn...@apache.org>
>>> wrote:
>>> >>>>
>>> >>>> I think I agree with a lot of what you said here, I'm just going to
>>> restate my initial use-case to try to make it more clear as well.
>>> >>>>
>>> >>>> From my usage of beam, I feel like the big benefit of async DoFns
>>> would be to allow batched IO to be implemented more simply inside a DoFn.
>>> Even in the Beam SDK itself, there are a lot of IOs that batch up IO
>>> operations in ProcessElement and wait for them to complete in FinishBundle
>>> ([1][2], etc).  From my experience, things like error handling, emitting
>>> outputs as the result of an asynchronous operation completing (in the
>>> correct window, with the correct timestamp, etc) get pretty tricky, and it
>>> would be great for the SDK to provide support natively for it.
>>> >>>>
>>> >>>> It's also probably good to point out that really only DoFns that do
>>> IO should be asynchronous, normal CPU bound DoFns have no reason to be
>>> asynchronous.
>>> >>>>
>>> >>>> A really good example of this is an IO I had written recently for
>>> Bigtable, it takes an input PCollection of ByteStrings representing row
>>> keys, and returns a PCollection of the row data from bigtable.  Naively
>>> this could be implemented by simply blocking on the Bigtable read inside
>>> the ParDo, however this would limit throughput substantially (even assuming
>>> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
>>> ParDo).  My implementation batches many reads together (as they arrive at
>>> the DoFn), executes them once the batch is big enough (or some time
>>> passes), and then emits them once the batch read completes.  Emitting them
>>> in the correct window and handling errors gets tricky, so this is certainly
>>> something I'd love the framework itself to handle.
>>> >>>>
>>> >>>> I also don't see a big benefit of making a DoFn receive a future,
>>> if all a user is ever supposed to do is attach a continuation to it, that
>>> could just as easily be done by the runner itself, basically just invoking
>>> the entire ParDo as a continuation on the future (which then assumes the
>>> runner is even representing these tasks as futures internally).
>>> >>>>
>>> >>>> Making the DoFn itself actually return a future could be an option,
>>> even if the language itself doesn't support something like `await`, you
>>> could still implement it yourself in the DoFn, however, it seems like it'd
>>> be a strange contrast to the non-async version, which returns void.
>>> >>>>
>>> >>>> [1]
>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
>>> >>>> [2]
>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
>>> >>>>
>>> >>>>
>>> >>>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <
>>> robertwb@google.com> wrote:
>>> >>>>>
>>> >>>>> If I understand correctly, the end goal is to process input
>>> elements
>>> >>>>> of a DoFn asynchronously. Were I to do this naively, I would
>>> implement
>>> >>>>> DoFns that simply take and receive [Serializable?]CompletionStages
>>> as
>>> >>>>> element types, followed by a DoFn that adds a callback to emit on
>>> >>>>> completion (possibly via a queue to avoid being-on-the-wrong-thread
>>> >>>>> issues) and whose finalize forces all completions. This would, of
>>> >>>>> course, interact poorly with processing time tracking, fusion
>>> breaks,
>>> >>>>> watermark tracking, counter attribution, window propagation, etc.
>>> so
>>> >>>>> it is desirable to make it part of the system itself.
>>> >>>>>
>>> >>>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a
>>> decent
>>> >>>>> API. The invoking of the downstream process could be chained onto
>>> >>>>> this, with all the implicit tracking and tracing set up correctly.
>>> >>>>> Taking a CompletionStage as input means a DoFn would not have to
>>> >>>>> create its output CompletionStage ex nihilo and possibly allow for
>>> >>>>> better chaining (depending on the asynchronous APIs used).
>>> >>>>>
>>> >>>>> Even better might be to simply let the invocation of all
>>> >>>>> DoFn.process() methods be asynchronous, but as Java doesn't offer
>>> an
>>> >>>>> await primitive to relinquish control in the middle of a function
>>> body
>>> >>>>> this might be hard.
>>> >>>>>
>>> >>>>> I think for correctness, completion would have to be forced at the
>>> end
>>> >>>>> of each bundle. If your bundles are large enough, this may not be
>>> that
>>> >>>>> big of a deal. In this case you could also start executing
>>> subsequent
>>> >>>>> bundles while waiting for prior ones to complete.
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
>>> >>>>> <co...@gmail.com> wrote:
>>> >>>>> >>
>>> >>>>> >> I'd love to see something like this as well.  Also +1 to
>>> process(@Element InputT element, @Output
>>> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much
>>> benefit to passing a future in, since the framework itself could hook up
>>> the process function to complete when the future completes.
>>> >>>>> >
>>> >>>>> >
>>> >>>>> > One benefit we get by wrapping the input with CompletionStage is
>>> to mandate[1] users to chain their processing logic to the input future;
>>> thereby, ensuring asynchrony for the most part. However, it is still
>>> possible for users to go out of their way and write blocking code.
>>> >>>>> >
>>> >>>>> > Although, I am not sure how counter intuitive it is for the
>>> runners to wrap the input element into a future before passing it to the
>>> user code.
>>> >>>>> >
>>> >>>>> > Bharath
>>> >>>>> >
>>> >>>>> > [1] CompletionStage interface does not define methods for
>>> initially creating, forcibly completing normally or exceptionally, probing
>>> completion status or results, or awaiting completion of a stage.
>>> Implementations of CompletionStage may provide means of achieving such
>>> effects, as appropriate
>>> >>>>> >
>>> >>>>> >
>>> >>>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <
>>> kenn@apache.org> wrote:
>>> >>>>> >>
>>> >>>>> >> I think your concerns are valid but i want to clarify about
>>> "first class async APIs". Does "first class" mean that it is a
>>> well-encapsulated abstraction? or does it mean that the user can more or
>>> less do whatever they want? These are opposite but both valid meanings for
>>> "first class", to me.
>>> >>>>> >>
>>> >>>>> >> I would not want to encourage users to do explicit
>>> multi-threaded programming or control parallelism. Part of the point of
>>> Beam is to gain big data parallelism without explicit multithreading. I see
>>> asynchronous chaining of futures (or their best-approximation in your
>>> language of choice) as a highly disciplined way of doing asynchronous
>>> dependency-driven computation that is nonetheless conceptually, and
>>> readably, straight-line code. Threads are not required nor the only way to
>>> execute this code. In fact you might often want to execute without
>>> threading for a reference implementation to provide canonically correct
>>> results. APIs that leak lower-level details of threads are asking for
>>> trouble.
>>> >>>>> >>
>>> >>>>> >> One of our other ideas was to provide a dynamic parameter of
>>> type ExecutorService. The SDK harness (pre-portability: the runner) would
>>> control and observe parallelism while the user could simply register tasks.
>>> Providing a future/promise API is even more disciplined.
>>> >>>>> >>
>>> >>>>> >> Kenn
>>> >>>>> >>
>>> >>>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org>
>>> wrote:
>>> >>>>> >>>
>>> >>>>> >>> A related question is how to make execution observable such
>>> that a runner can make proper scaling decisions. Runners decide how to
>>> schedule bundles within and across multiple worker instances, and can use
>>> information about execution to make dynamic scaling decisions. First-class
>>> async APIs seem like they would encourage DoFn authors to implement their
>>> own parallelization, rather than deferring to the runner that should be
>>> more capable of providing the right level of parallelism.
>>> >>>>> >>>
>>> >>>>> >>> In the Dataflow worker harness, we estimate execution time to
>>> PTransform steps by sampling execution time on the execution thread and
>>> attributing it to the currently invoked method. This approach is fairly
>>> simple and possible because we assume that execution happens within the
>>> thread controlled by the runner. Some DoFn's already implement their own
>>> async logic and break this assumption; I would expect more if we make async
>>> built into the DoFn APIs.
>>> >>>>> >>>
>>> >>>>> >>> So: this isn't an argument against async APIs, but rather:
>>> does this break execution observability, and are there other lightweight
>>> mechanisms for attributing execution time of async work?
>>> >>>>> >>>
>>> >>>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <
>>> klk@google.com> wrote:
>>> >>>>> >>>>
>>> >>>>> >>>> When executed over the portable APIs, it will be primarily
>>> the Java SDK harness that makes all of these decisions. If we wanted
>>> runners to have some insight into it we would have to add it to the Beam
>>> model protos. I don't have any suggestions there, so I would leave it out
>>> of this discussion until there's good ideas. We could learn a lot by trying
>>> it out just in the SDK harness.
>>> >>>>> >>>>
>>> >>>>> >>>> Kenn
>>> >>>>> >>>>
>>> >>>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <
>>> xinyuliu.us@gmail.com> wrote:
>>> >>>>> >>>>>
>>> >>>>> >>>>> I don't have a strong opinion on the resolution of the
>>> futures regarding to @FinishBundle invocation. Leaving it to be unspecified
>>> does give runners more room to implement it with their own support.
>>> >>>>> >>>>>
>>> >>>>> >>>>> Optimization is also another great point. Fuse seems pretty
>>> complex to me too if we need to find a way to chain the resulting future
>>> into the next transform, or leave the async transform as a standalone stage
>>> initially?
>>> >>>>> >>>>>
>>> >>>>> >>>>> Btw, I was counting the number of replies before we hit the
>>> portability. Seems after 4 replies fuse finally showed up :).
>>> >>>>> >>>>>
>>> >>>>> >>>>> Thanks,
>>> >>>>> >>>>> Xinyu
>>> >>>>> >>>>>
>>> >>>>> >>>>>
>>> >>>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <
>>> klk@google.com> wrote:
>>> >>>>> >>>>>>
>>> >>>>> >>>>>>
>>> >>>>> >>>>>>
>>> >>>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com
>>> wrote:
>>> >>>>> >>>>>>>
>>> >>>>> >>>>>>>
>>> >>>>> >>>>>>>
>>> >>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <
>>> xinyuliu.us@gmail.com> wrote:
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> @Steve: it's good to see that this is going to be useful
>>> in your use cases as well. Thanks for sharing the code from Scio! I can see
>>> in your implementation that waiting for the future completion is part of
>>> the @FinishBundle. We are thinking of taking advantage of the underlying
>>> runner async support so the user-level code won't need to implement this
>>> logic, e.g. Samza has an AsyncSteamTask api that provides a callback to
>>> invoke after future completion[1], and Flink also has AsyncFunction api [2]
>>> which provides a ResultFuture similar to the API we discussed.
>>> >>>>> >>>>>>>
>>> >>>>> >>>>>>>
>>> >>>>> >>>>>>> Can this be done correctly? What I mean is that if the
>>> process dies, can you guarantee that no data is lost? Beam currently
>>> guarantees this for FinishBundle, but if you use an arbitrary async
>>> framework this might not be true.
>>> >>>>> >>>>>>
>>> >>>>> >>>>>>
>>> >>>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is
>>> committed, *then* finishbundle has run. So it seems just as easy to say
>>> *if* a bundle is committed, *then* every async result has been resolved.
>>> >>>>> >>>>>>
>>> >>>>> >>>>>> If the process dies the two cases should be naturally
>>> analogous.
>>> >>>>> >>>>>>
>>> >>>>> >>>>>> But it raises the question of whether they should be
>>> resolved prior to finishbundle, after, or unspecified. I lean toward
>>> unspecified.
>>> >>>>> >>>>>>
>>> >>>>> >>>>>> That's for a single ParDo. Where this could get complex is
>>> optimizing fused stages for greater asynchrony.
>>> >>>>> >>>>>>
>>> >>>>> >>>>>> Kenn
>>> >>>>> >>>>>>
>>> >>>>> >>>>>>>
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> A simple use case for this is to execute a Runnable
>>> asynchronously in user's own executor. The following code illustrates
>>> Kenn's option #2, with a very simple single-thread pool being the executor:
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> new DoFn<InputT, OutputT>() {
>>> >>>>> >>>>>>>>   @ProcessElement
>>> >>>>> >>>>>>>>   public void process(@Element InputT element, @Output
>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>> >>>>> >>>>>>>>     CompletableFuture<OutputT> future =
>>> CompletableFuture.supplyAsync(
>>> >>>>> >>>>>>>>         () -> someOutput,
>>> >>>>> >>>>>>>>         Executors.newSingleThreadExecutor());
>>> >>>>> >>>>>>>>     outputReceiver.output(future);
>>> >>>>> >>>>>>>>   }
>>> >>>>> >>>>>>>> }
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> The neat thing about this API is that the user can choose
>>> their own async framework and we only expect the output to be a
>>> CompletionStage.
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> For the implementation of bundling, can we compose a
>>> CompletableFuture from each element in the bundle, e.g.
>>> CompletableFuture.allOf(...), and then invoke @FinishBundle when this
>>> future is complete? Seems this might work.
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> Thanks,
>>> >>>>> >>>>>>>> Xinyu
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> [1]
>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>> >>>>> >>>>>>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>> >>>>> >>>>>>>>
>>> >>>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <
>>> sniemitz@apache.org> wrote:
>>> >>>>> >>>>>>>>>
>>> >>>>> >>>>>>>>> I'd love to see something like this as well.  Also +1 to
>>> process(@Element InputT element, @Output
>>> OutputReceiver<CompletionStage<OutputT>>).  I don't know if there's much
>>> benefit to passing a future in, since the framework itself could hook up
>>> the process function to complete when the future completes.
>>> >>>>> >>>>>>>>>
>>> >>>>> >>>>>>>>> I feel like I've spent a bunch of time writing very
>>> similar "kick off a future in ProcessElement, join it in FinishBundle"
>>> code, and looking around beam itself a lot of built-in transforms do it as
>>> well.  Scio provides a few AsyncDoFn implementations [1] but it'd be great
>>> to see this as a first-class concept in beam itself.  Doing error handling,
>>> concurrency, etc correctly can be tricky.
>>> >>>>> >>>>>>>>>
>>> >>>>> >>>>>>>>> [1]
>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>> >>>>> >>>>>>>>>
>>> >>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <
>>> klk@google.com> wrote:
>>> >>>>> >>>>>>>>>>
>>> >>>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the
>>> output should also be a CompletionStage<OutputT>, since all you should do
>>> is async chaining. We could enforce this by giving the DoFn an
>>> OutputReceiver(CompletionStage<OutputT>).
>>> >>>>> >>>>>>>>>>
>>> >>>>> >>>>>>>>>> Another possibility that might be even more robust
>>> against poor future use could be process(@Element InputT element, @Output
>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>> itself will be async chained, rather than counting on the user to do the
>>> right thing.
>>> >>>>> >>>>>>>>>>
>>> >>>>> >>>>>>>>>> We should see how these look in real use cases. The way
>>> that processing is split between @ProcessElement and @FinishBundle might
>>> complicate things.
>>> >>>>> >>>>>>>>>>
>>> >>>>> >>>>>>>>>> Kenn
>>> >>>>> >>>>>>>>>>
>>> >>>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <
>>> xinyuliu.us@gmail.com> wrote:
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> Hi, guys,
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner,
>>> we got a lot of asks for an asynchronous processing API. There are a few
>>> reasons for these asks:
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> The users here are experienced in asynchronous
>>> programming. With async frameworks such as Netty and ParSeq and libs like
>>> async jersey client, they are able to make remote calls efficiently and the
>>> libraries help manage the execution threads underneath. Async remote calls
>>> are very common in most of our streaming applications today.
>>> >>>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster.
>>> Async processing helps for less resource usage and fast computation (less
>>> context switch).
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> I asked about the async support in a previous email
>>> thread. The following API was mentioned in the reply:
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>> >>>>> >>>>>>>>>>>     @ProcessElement
>>> >>>>> >>>>>>>>>>>     public void process(@Element
>>> CompletionStage<InputT> element, ...) {
>>> >>>>> >>>>>>>>>>>       element.thenApply(...)
>>> >>>>> >>>>>>>>>>>     }
>>> >>>>> >>>>>>>>>>>   }
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> We are wondering whether there are any discussions on
>>> this API and related docs. It is awesome that you guys already considered
>>> having DoFn to process asynchronously. Out of curiosity, this API seems to
>>> create a CompletionState out of the input element (probably using
>>> framework's executor) and then allow user to chain on it. To us, it seems
>>> more convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>> CompletionStage<OutputT> to invoke upon completion.
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> We would like to discuss further on the async API and
>>> hopefully we will have a great support in Beam. Really appreciate the
>>> feedback!
>>> >>>>> >>>>>>>>>>>
>>> >>>>> >>>>>>>>>>> Thanks,
>>> >>>>> >>>>>>>>>>> Xinyu
>>> >>>>> >>>
>>> >>>>> >>>
>>> >>>>> >>>
>>> >>>>> >>> --
>>> >>>>> >>>
>>> >>>>> >>>
>>> >>>>> >>>
>>> >>>>> >>>
>>> >>>>> >>> Got feedback? tinyurl.com/swegner-feedback
>>> >
>>> >
>>> >
>>> > --
>>> >
>>> >
>>> >
>>> >
>>> > Got feedback? tinyurl.com/swegner-feedback
>>>
>>

Re: [DISCUSSION] ParDo Async Java API

Posted by Bharath Kumara Subramanian <co...@gmail.com>.
I have put together a design document
<https://docs.google.com/document/d/1t--UYXgaij0ULEoXUnhG3r8OZPBljN9r_WWlwQJBDrI/edit?usp=sharing>
that consolidates our discussion in this thread.
Please let me know your thoughts.

Thanks,
Bharath



On Wed, Jan 30, 2019 at 10:18 AM Xinyu Liu <xi...@gmail.com> wrote:

> I put the asks and email discussions in this JIRA to track the Async API:
> https://jira.apache.org/jira/browse/BEAM-6550. Bharath on the SamzaRunner
> side is willing to take a stab at this. He will come up with some design
> doc based on our discussions. Will update the thread once it's ready.
> Really appreciate all the suggestions and feedback here.
>
> Thanks,
> Xinyu
>
> On Thu, Jan 24, 2019 at 2:41 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> That's a good point that this "IO" time should be tracked differently.
>>
>> For a single level, a wrapper/utility that correctly and completely
>> (and transparently) implements the "naive" bit I sketched above under
>> the hood may be sufficient and implementable purely in user-space, and
>> quite useful.
>>
>> On Thu, Jan 24, 2019 at 7:38 PM Scott Wegner <sc...@apache.org> wrote:
>> >
>> > Makes sense to me. We should make it easier to write DoFn's in this
>> pattern that has emerged as common among I/O connectors.
>> >
>> > Enabling asynchronous task chaining across a fusion tree is more
>> complicated but not necessary for this scenario.
>> >
>> > On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz <sn...@apache.org>
>> wrote:
>> >>
>> >> It's also important to note that in many (most?) IO frameworks (gRPC,
>> finagle, etc), asynchronous IO is typically completely non-blocking, so
>> there generally won't be a large number of threads waiting for IO to
>> complete.  (netty uses a small pool of threads for the Event Loop Group for
>> example).
>> >>
>> >> But in general I agree with Reuven, runners should not count threads
>> in use in other thread pools for IO for the purpose of autoscaling (or most
>> kinds of accounting).
>> >>
>> >> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <re...@google.com> wrote:
>> >>>
>> >>> As Steve said, the main rationale for this is so that asynchronous
>> IOs (or in general, asynchronous remote calls) call be made. To some degree
>> this addresses Scott's concern: the asynchronous threads should be, for the
>> most part, simply waiting for IOs to complete; the reason to do the waiting
>> asynchronously is so that the main threadpool does not become blocked,
>> causing the pipeline to become IO bound. A runner like Dataflow should not
>> be tracking these threads for the purpose of autoscaling, as adding more
>> workers will (usually) not cause these calls to complete any faster.
>> >>>
>> >>> Reuven
>> >>>
>> >>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <sn...@apache.org>
>> wrote:
>> >>>>
>> >>>> I think I agree with a lot of what you said here, I'm just going to
>> restate my initial use-case to try to make it more clear as well.
>> >>>>
>> >>>> From my usage of beam, I feel like the big benefit of async DoFns
>> would be to allow batched IO to be implemented more simply inside a DoFn.
>> Even in the Beam SDK itself, there are a lot of IOs that batch up IO
>> operations in ProcessElement and wait for them to complete in FinishBundle
>> ([1][2], etc).  From my experience, things like error handling, emitting
>> outputs as the result of an asynchronous operation completing (in the
>> correct window, with the correct timestamp, etc) get pretty tricky, and it
>> would be great for the SDK to provide support natively for it.
>> >>>>
>> >>>> It's also probably good to point out that really only DoFns that do
>> IO should be asynchronous, normal CPU bound DoFns have no reason to be
>> asynchronous.
>> >>>>
>> >>>> A really good example of this is an IO I had written recently for
>> Bigtable, it takes an input PCollection of ByteStrings representing row
>> keys, and returns a PCollection of the row data from bigtable.  Naively
>> this could be implemented by simply blocking on the Bigtable read inside
>> the ParDo, however this would limit throughput substantially (even assuming
>> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
>> ParDo).  My implementation batches many reads together (as they arrive at
>> the DoFn), executes them once the batch is big enough (or some time
>> passes), and then emits them once the batch read completes.  Emitting them
>> in the correct window and handling errors gets tricky, so this is certainly
>> something I'd love the framework itself to handle.
>> >>>>
>> >>>> I also don't see a big benefit of making a DoFn receive a future, if
>> all a user is ever supposed to do is attach a continuation to it, that
>> could just as easily be done by the runner itself, basically just invoking
>> the entire ParDo as a continuation on the future (which then assumes the
>> runner is even representing these tasks as futures internally).
>> >>>>
>> >>>> Making the DoFn itself actually return a future could be an option,
>> even if the language itself doesn't support something like `await`, you
>> could still implement it yourself in the DoFn, however, it seems like it'd
>> be a strange contrast to the non-async version, which returns void.
>> >>>>
>> >>>> [1]
>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
>> >>>> [2]
>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
>> >>>>
>> >>>>
>> >>>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>>>>
>> >>>>> If I understand correctly, the end goal is to process input elements
>> >>>>> of a DoFn asynchronously. Were I to do this naively, I would
>> implement
>> >>>>> DoFns that simply take and receive [Serializable?]CompletionStages
>> as
>> >>>>> element types, followed by a DoFn that adds a callback to emit on
>> >>>>> completion (possibly via a queue to avoid being-on-the-wrong-thread
>> >>>>> issues) and whose finalize forces all completions. This would, of
>> >>>>> course, interact poorly with processing time tracking, fusion
>> breaks,
>> >>>>> watermark tracking, counter attribution, window propagation, etc. so
>> >>>>> it is desirable to make it part of the system itself.
>> >>>>>
>> >>>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a
>> decent
>> >>>>> API. The invoking of the downstream process could be chained onto
>> >>>>> this, with all the implicit tracking and tracing set up correctly.
>> >>>>> Taking a CompletionStage as input means a DoFn would not have to
>> >>>>> create its output CompletionStage ex nihilo and possibly allow for
>> >>>>> better chaining (depending on the asynchronous APIs used).
>> >>>>>
>> >>>>> Even better might be to simply let the invocation of all
>> >>>>> DoFn.process() methods be asynchronous, but as Java doesn't offer an
>> >>>>> await primitive to relinquish control in the middle of a function
>> body
>> >>>>> this might be hard.
>> >>>>>
>> >>>>> I think for correctness, completion would have to be forced at the
>> end
>> >>>>> of each bundle. If your bundles are large enough, this may not be
>> that
>> >>>>> big of a deal. In this case you could also start executing
>> subsequent
>> >>>>> bundles while waiting for prior ones to complete.
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
>> >>>>> <co...@gmail.com> wrote:
>> >>>>> >>
>> >>>>> >> I'd love to see something like this as well.  Also +1 to
>> process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much
>> benefit to passing a future in, since the framework itself could hook up
>> the process function to complete when the future completes.
>> >>>>> >
>> >>>>> >
>> >>>>> > One benefit we get by wrapping the input with CompletionStage is
>> to mandate[1] users to chain their processing logic to the input future;
>> thereby, ensuring asynchrony for the most part. However, it is still
>> possible for users to go out of their way and write blocking code.
>> >>>>> >
>> >>>>> > Although, I am not sure how counter intuitive it is for the
>> runners to wrap the input element into a future before passing it to the
>> user code.
>> >>>>> >
>> >>>>> > Bharath
>> >>>>> >
>> >>>>> > [1] CompletionStage interface does not define methods for
>> initially creating, forcibly completing normally or exceptionally, probing
>> completion status or results, or awaiting completion of a stage.
>> Implementations of CompletionStage may provide means of achieving such
>> effects, as appropriate
>> >>>>> >
>> >>>>> >
>> >>>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <ke...@apache.org>
>> wrote:
>> >>>>> >>
>> >>>>> >> I think your concerns are valid but i want to clarify about
>> "first class async APIs". Does "first class" mean that it is a
>> well-encapsulated abstraction? or does it mean that the user can more or
>> less do whatever they want? These are opposite but both valid meanings for
>> "first class", to me.
>> >>>>> >>
>> >>>>> >> I would not want to encourage users to do explicit
>> multi-threaded programming or control parallelism. Part of the point of
>> Beam is to gain big data parallelism without explicit multithreading. I see
>> asynchronous chaining of futures (or their best-approximation in your
>> language of choice) as a highly disciplined way of doing asynchronous
>> dependency-driven computation that is nonetheless conceptually, and
>> readably, straight-line code. Threads are not required nor the only way to
>> execute this code. In fact you might often want to execute without
>> threading for a reference implementation to provide canonically correct
>> results. APIs that leak lower-level details of threads are asking for
>> trouble.
>> >>>>> >>
>> >>>>> >> One of our other ideas was to provide a dynamic parameter of
>> type ExecutorService. The SDK harness (pre-portability: the runner) would
>> control and observe parallelism while the user could simply register tasks.
>> Providing a future/promise API is even more disciplined.
>> >>>>> >>
>> >>>>> >> Kenn
>> >>>>> >>
>> >>>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org>
>> wrote:
>> >>>>> >>>
>> >>>>> >>> A related question is how to make execution observable such
>> that a runner can make proper scaling decisions. Runners decide how to
>> schedule bundles within and across multiple worker instances, and can use
>> information about execution to make dynamic scaling decisions. First-class
>> async APIs seem like they would encourage DoFn authors to implement their
>> own parallelization, rather than deferring to the runner that should be
>> more capable of providing the right level of parallelism.
>> >>>>> >>>
>> >>>>> >>> In the Dataflow worker harness, we estimate execution time to
>> PTransform steps by sampling execution time on the execution thread and
>> attributing it to the currently invoked method. This approach is fairly
>> simple and possible because we assume that execution happens within the
>> thread controlled by the runner. Some DoFn's already implement their own
>> async logic and break this assumption; I would expect more if we make async
>> built into the DoFn APIs.
>> >>>>> >>>
>> >>>>> >>> So: this isn't an argument against async APIs, but rather: does
>> this break execution observability, and are there other lightweight
>> mechanisms for attributing execution time of async work?
>> >>>>> >>>
>> >>>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <kl...@google.com>
>> wrote:
>> >>>>> >>>>
>> >>>>> >>>> When executed over the portable APIs, it will be primarily the
>> Java SDK harness that makes all of these decisions. If we wanted runners to
>> have some insight into it we would have to add it to the Beam model protos.
>> I don't have any suggestions there, so I would leave it out of this
>> discussion until there's good ideas. We could learn a lot by trying it out
>> just in the SDK harness.
>> >>>>> >>>>
>> >>>>> >>>> Kenn
>> >>>>> >>>>
>> >>>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <
>> xinyuliu.us@gmail.com> wrote:
>> >>>>> >>>>>
>> >>>>> >>>>> I don't have a strong opinion on the resolution of the
>> futures regarding to @FinishBundle invocation. Leaving it to be unspecified
>> does give runners more room to implement it with their own support.
>> >>>>> >>>>>
>> >>>>> >>>>> Optimization is also another great point. Fuse seems pretty
>> complex to me too if we need to find a way to chain the resulting future
>> into the next transform, or leave the async transform as a standalone stage
>> initially?
>> >>>>> >>>>>
>> >>>>> >>>>> Btw, I was counting the number of replies before we hit the
>> portability. Seems after 4 replies fuse finally showed up :).
>> >>>>> >>>>>
>> >>>>> >>>>> Thanks,
>> >>>>> >>>>> Xinyu
>> >>>>> >>>>>
>> >>>>> >>>>>
>> >>>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <
>> klk@google.com> wrote:
>> >>>>> >>>>>>
>> >>>>> >>>>>>
>> >>>>> >>>>>>
>> >>>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com
>> wrote:
>> >>>>> >>>>>>>
>> >>>>> >>>>>>>
>> >>>>> >>>>>>>
>> >>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <
>> xinyuliu.us@gmail.com> wrote:
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> @Steve: it's good to see that this is going to be useful
>> in your use cases as well. Thanks for sharing the code from Scio! I can see
>> in your implementation that waiting for the future completion is part of
>> the @FinishBundle. We are thinking of taking advantage of the underlying
>> runner async support so the user-level code won't need to implement this
>> logic, e.g. Samza has an AsyncSteamTask api that provides a callback to
>> invoke after future completion[1], and Flink also has AsyncFunction api [2]
>> which provides a ResultFuture similar to the API we discussed.
>> >>>>> >>>>>>>
>> >>>>> >>>>>>>
>> >>>>> >>>>>>> Can this be done correctly? What I mean is that if the
>> process dies, can you guarantee that no data is lost? Beam currently
>> guarantees this for FinishBundle, but if you use an arbitrary async
>> framework this might not be true.
>> >>>>> >>>>>>
>> >>>>> >>>>>>
>> >>>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is
>> committed, *then* finishbundle has run. So it seems just as easy to say
>> *if* a bundle is committed, *then* every async result has been resolved.
>> >>>>> >>>>>>
>> >>>>> >>>>>> If the process dies the two cases should be naturally
>> analogous.
>> >>>>> >>>>>>
>> >>>>> >>>>>> But it raises the question of whether they should be
>> resolved prior to finishbundle, after, or unspecified. I lean toward
>> unspecified.
>> >>>>> >>>>>>
>> >>>>> >>>>>> That's for a single ParDo. Where this could get complex is
>> optimizing fused stages for greater asynchrony.
>> >>>>> >>>>>>
>> >>>>> >>>>>> Kenn
>> >>>>> >>>>>>
>> >>>>> >>>>>>>
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> A simple use case for this is to execute a Runnable
>> asynchronously in user's own executor. The following code illustrates
>> Kenn's option #2, with a very simple single-thread pool being the executor:
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> new DoFn<InputT, OutputT>() {
>> >>>>> >>>>>>>>   @ProcessElement
>> >>>>> >>>>>>>>   public void process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>> >>>>> >>>>>>>>     CompletableFuture<OutputT> future =
>> CompletableFuture.supplyAsync(
>> >>>>> >>>>>>>>         () -> someOutput,
>> >>>>> >>>>>>>>         Executors.newSingleThreadExecutor());
>> >>>>> >>>>>>>>     outputReceiver.output(future);
>> >>>>> >>>>>>>>   }
>> >>>>> >>>>>>>> }
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> The neat thing about this API is that the user can choose
>> their own async framework and we only expect the output to be a
>> CompletionStage.
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> For the implementation of bundling, can we compose a
>> CompletableFuture from each element in the bundle, e.g.
>> CompletableFuture.allOf(...), and then invoke @FinishBundle when this
>> future is complete? Seems this might work.
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> Thanks,
>> >>>>> >>>>>>>> Xinyu
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> [1]
>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>> >>>>> >>>>>>>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>> >>>>> >>>>>>>>
>> >>>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <
>> sniemitz@apache.org> wrote:
>> >>>>> >>>>>>>>>
>> >>>>> >>>>>>>>> I'd love to see something like this as well.  Also +1 to
>> process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>>).  I don't know if there's much
>> benefit to passing a future in, since the framework itself could hook up
>> the process function to complete when the future completes.
>> >>>>> >>>>>>>>>
>> >>>>> >>>>>>>>> I feel like I've spent a bunch of time writing very
>> similar "kick off a future in ProcessElement, join it in FinishBundle"
>> code, and looking around beam itself a lot of built-in transforms do it as
>> well.  Scio provides a few AsyncDoFn implementations [1] but it'd be great
>> to see this as a first-class concept in beam itself.  Doing error handling,
>> concurrency, etc correctly can be tricky.
>> >>>>> >>>>>>>>>
>> >>>>> >>>>>>>>> [1]
>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>> >>>>> >>>>>>>>>
>> >>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <
>> klk@google.com> wrote:
>> >>>>> >>>>>>>>>>
>> >>>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the
>> output should also be a CompletionStage<OutputT>, since all you should do
>> is async chaining. We could enforce this by giving the DoFn an
>> OutputReceiver(CompletionStage<OutputT>).
>> >>>>> >>>>>>>>>>
>> >>>>> >>>>>>>>>> Another possibility that might be even more robust
>> against poor future use could be process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>> itself will be async chained, rather than counting on the user to do the
>> right thing.
>> >>>>> >>>>>>>>>>
>> >>>>> >>>>>>>>>> We should see how these look in real use cases. The way
>> that processing is split between @ProcessElement and @FinishBundle might
>> complicate things.
>> >>>>> >>>>>>>>>>
>> >>>>> >>>>>>>>>> Kenn
>> >>>>> >>>>>>>>>>
>> >>>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <
>> xinyuliu.us@gmail.com> wrote:
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> Hi, guys,
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner,
>> we got a lot of asks for an asynchronous processing API. There are a few
>> reasons for these asks:
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> The users here are experienced in asynchronous
>> programming. With async frameworks such as Netty and ParSeq and libs like
>> async jersey client, they are able to make remote calls efficiently and the
>> libraries help manage the execution threads underneath. Async remote calls
>> are very common in most of our streaming applications today.
>> >>>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async
>> processing helps for less resource usage and fast computation (less context
>> switch).
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> I asked about the async support in a previous email
>> thread. The following API was mentioned in the reply:
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>> >>>>> >>>>>>>>>>>     @ProcessElement
>> >>>>> >>>>>>>>>>>     public void process(@Element
>> CompletionStage<InputT> element, ...) {
>> >>>>> >>>>>>>>>>>       element.thenApply(...)
>> >>>>> >>>>>>>>>>>     }
>> >>>>> >>>>>>>>>>>   }
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> We are wondering whether there are any discussions on
>> this API and related docs. It is awesome that you guys already considered
>> having DoFn to process asynchronously. Out of curiosity, this API seems to
>> create a CompletionState out of the input element (probably using
>> framework's executor) and then allow user to chain on it. To us, it seems
>> more convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>> CompletionStage<OutputT> to invoke upon completion.
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> We would like to discuss further on the async API and
>> hopefully we will have a great support in Beam. Really appreciate the
>> feedback!
>> >>>>> >>>>>>>>>>>
>> >>>>> >>>>>>>>>>> Thanks,
>> >>>>> >>>>>>>>>>> Xinyu
>> >>>>> >>>
>> >>>>> >>>
>> >>>>> >>>
>> >>>>> >>> --
>> >>>>> >>>
>> >>>>> >>>
>> >>>>> >>>
>> >>>>> >>>
>> >>>>> >>> Got feedback? tinyurl.com/swegner-feedback
>> >
>> >
>> >
>> > --
>> >
>> >
>> >
>> >
>> > Got feedback? tinyurl.com/swegner-feedback
>>
>

Re: [DISCUSSION] ParDo Async Java API

Posted by Xinyu Liu <xi...@gmail.com>.
I put the asks and email discussions in this JIRA to track the Async API:
https://jira.apache.org/jira/browse/BEAM-6550. Bharath on the SamzaRunner
side is willing to take a stab at this. He will come up with some design
doc based on our discussions. Will update the thread once it's ready.
Really appreciate all the suggestions and feedback here.

Thanks,
Xinyu

On Thu, Jan 24, 2019 at 2:41 PM Robert Bradshaw <ro...@google.com> wrote:

> That's a good point that this "IO" time should be tracked differently.
>
> For a single level, a wrapper/utility that correctly and completely
> (and transparently) implements the "naive" bit I sketched above under
> the hood may be sufficient and implementable purely in user-space, and
> quite useful.
>
> On Thu, Jan 24, 2019 at 7:38 PM Scott Wegner <sc...@apache.org> wrote:
> >
> > Makes sense to me. We should make it easier to write DoFn's in this
> pattern that has emerged as common among I/O connectors.
> >
> > Enabling asynchronous task chaining across a fusion tree is more
> complicated but not necessary for this scenario.
> >
> > On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz <sn...@apache.org>
> wrote:
> >>
> >> It's also important to note that in many (most?) IO frameworks (gRPC,
> finagle, etc), asynchronous IO is typically completely non-blocking, so
> there generally won't be a large number of threads waiting for IO to
> complete.  (netty uses a small pool of threads for the Event Loop Group for
> example).
> >>
> >> But in general I agree with Reuven, runners should not count threads in
> use in other thread pools for IO for the purpose of autoscaling (or most
> kinds of accounting).
> >>
> >> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <re...@google.com> wrote:
> >>>
> >>> As Steve said, the main rationale for this is so that asynchronous IOs
> (or in general, asynchronous remote calls) call be made. To some degree
> this addresses Scott's concern: the asynchronous threads should be, for the
> most part, simply waiting for IOs to complete; the reason to do the waiting
> asynchronously is so that the main threadpool does not become blocked,
> causing the pipeline to become IO bound. A runner like Dataflow should not
> be tracking these threads for the purpose of autoscaling, as adding more
> workers will (usually) not cause these calls to complete any faster.
> >>>
> >>> Reuven
> >>>
> >>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <sn...@apache.org>
> wrote:
> >>>>
> >>>> I think I agree with a lot of what you said here, I'm just going to
> restate my initial use-case to try to make it more clear as well.
> >>>>
> >>>> From my usage of beam, I feel like the big benefit of async DoFns
> would be to allow batched IO to be implemented more simply inside a DoFn.
> Even in the Beam SDK itself, there are a lot of IOs that batch up IO
> operations in ProcessElement and wait for them to complete in FinishBundle
> ([1][2], etc).  From my experience, things like error handling, emitting
> outputs as the result of an asynchronous operation completing (in the
> correct window, with the correct timestamp, etc) get pretty tricky, and it
> would be great for the SDK to provide support natively for it.
> >>>>
> >>>> It's also probably good to point out that really only DoFns that do
> IO should be asynchronous, normal CPU bound DoFns have no reason to be
> asynchronous.
> >>>>
> >>>> A really good example of this is an IO I had written recently for
> Bigtable, it takes an input PCollection of ByteStrings representing row
> keys, and returns a PCollection of the row data from bigtable.  Naively
> this could be implemented by simply blocking on the Bigtable read inside
> the ParDo, however this would limit throughput substantially (even assuming
> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
> ParDo).  My implementation batches many reads together (as they arrive at
> the DoFn), executes them once the batch is big enough (or some time
> passes), and then emits them once the batch read completes.  Emitting them
> in the correct window and handling errors gets tricky, so this is certainly
> something I'd love the framework itself to handle.
> >>>>
> >>>> I also don't see a big benefit of making a DoFn receive a future, if
> all a user is ever supposed to do is attach a continuation to it, that
> could just as easily be done by the runner itself, basically just invoking
> the entire ParDo as a continuation on the future (which then assumes the
> runner is even representing these tasks as futures internally).
> >>>>
> >>>> Making the DoFn itself actually return a future could be an option,
> even if the language itself doesn't support something like `await`, you
> could still implement it yourself in the DoFn, however, it seems like it'd
> be a strange contrast to the non-async version, which returns void.
> >>>>
> >>>> [1]
> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
> >>>> [2]
> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
> >>>>
> >>>>
> >>>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <ro...@google.com>
> wrote:
> >>>>>
> >>>>> If I understand correctly, the end goal is to process input elements
> >>>>> of a DoFn asynchronously. Were I to do this naively, I would
> implement
> >>>>> DoFns that simply take and receive [Serializable?]CompletionStages as
> >>>>> element types, followed by a DoFn that adds a callback to emit on
> >>>>> completion (possibly via a queue to avoid being-on-the-wrong-thread
> >>>>> issues) and whose finalize forces all completions. This would, of
> >>>>> course, interact poorly with processing time tracking, fusion breaks,
> >>>>> watermark tracking, counter attribution, window propagation, etc. so
> >>>>> it is desirable to make it part of the system itself.
> >>>>>
> >>>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a decent
> >>>>> API. The invoking of the downstream process could be chained onto
> >>>>> this, with all the implicit tracking and tracing set up correctly.
> >>>>> Taking a CompletionStage as input means a DoFn would not have to
> >>>>> create its output CompletionStage ex nihilo and possibly allow for
> >>>>> better chaining (depending on the asynchronous APIs used).
> >>>>>
> >>>>> Even better might be to simply let the invocation of all
> >>>>> DoFn.process() methods be asynchronous, but as Java doesn't offer an
> >>>>> await primitive to relinquish control in the middle of a function
> body
> >>>>> this might be hard.
> >>>>>
> >>>>> I think for correctness, completion would have to be forced at the
> end
> >>>>> of each bundle. If your bundles are large enough, this may not be
> that
> >>>>> big of a deal. In this case you could also start executing subsequent
> >>>>> bundles while waiting for prior ones to complete.
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
> >>>>> <co...@gmail.com> wrote:
> >>>>> >>
> >>>>> >> I'd love to see something like this as well.  Also +1 to
> process(@Element InputT element, @Output
> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much
> benefit to passing a future in, since the framework itself could hook up
> the process function to complete when the future completes.
> >>>>> >
> >>>>> >
> >>>>> > One benefit we get by wrapping the input with CompletionStage is
> to mandate[1] users to chain their processing logic to the input future;
> thereby, ensuring asynchrony for the most part. However, it is still
> possible for users to go out of their way and write blocking code.
> >>>>> >
> >>>>> > Although, I am not sure how counter intuitive it is for the
> runners to wrap the input element into a future before passing it to the
> user code.
> >>>>> >
> >>>>> > Bharath
> >>>>> >
> >>>>> > [1] CompletionStage interface does not define methods for
> initially creating, forcibly completing normally or exceptionally, probing
> completion status or results, or awaiting completion of a stage.
> Implementations of CompletionStage may provide means of achieving such
> effects, as appropriate
> >>>>> >
> >>>>> >
> >>>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <ke...@apache.org>
> wrote:
> >>>>> >>
> >>>>> >> I think your concerns are valid but i want to clarify about
> "first class async APIs". Does "first class" mean that it is a
> well-encapsulated abstraction? or does it mean that the user can more or
> less do whatever they want? These are opposite but both valid meanings for
> "first class", to me.
> >>>>> >>
> >>>>> >> I would not want to encourage users to do explicit multi-threaded
> programming or control parallelism. Part of the point of Beam is to gain
> big data parallelism without explicit multithreading. I see asynchronous
> chaining of futures (or their best-approximation in your language of
> choice) as a highly disciplined way of doing asynchronous dependency-driven
> computation that is nonetheless conceptually, and readably, straight-line
> code. Threads are not required nor the only way to execute this code. In
> fact you might often want to execute without threading for a reference
> implementation to provide canonically correct results. APIs that leak
> lower-level details of threads are asking for trouble.
> >>>>> >>
> >>>>> >> One of our other ideas was to provide a dynamic parameter of type
> ExecutorService. The SDK harness (pre-portability: the runner) would
> control and observe parallelism while the user could simply register tasks.
> Providing a future/promise API is even more disciplined.
> >>>>> >>
> >>>>> >> Kenn
> >>>>> >>
> >>>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org>
> wrote:
> >>>>> >>>
> >>>>> >>> A related question is how to make execution observable such that
> a runner can make proper scaling decisions. Runners decide how to schedule
> bundles within and across multiple worker instances, and can use
> information about execution to make dynamic scaling decisions. First-class
> async APIs seem like they would encourage DoFn authors to implement their
> own parallelization, rather than deferring to the runner that should be
> more capable of providing the right level of parallelism.
> >>>>> >>>
> >>>>> >>> In the Dataflow worker harness, we estimate execution time to
> PTransform steps by sampling execution time on the execution thread and
> attributing it to the currently invoked method. This approach is fairly
> simple and possible because we assume that execution happens within the
> thread controlled by the runner. Some DoFn's already implement their own
> async logic and break this assumption; I would expect more if we make async
> built into the DoFn APIs.
> >>>>> >>>
> >>>>> >>> So: this isn't an argument against async APIs, but rather: does
> this break execution observability, and are there other lightweight
> mechanisms for attributing execution time of async work?
> >>>>> >>>
> >>>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <kl...@google.com>
> wrote:
> >>>>> >>>>
> >>>>> >>>> When executed over the portable APIs, it will be primarily the
> Java SDK harness that makes all of these decisions. If we wanted runners to
> have some insight into it we would have to add it to the Beam model protos.
> I don't have any suggestions there, so I would leave it out of this
> discussion until there's good ideas. We could learn a lot by trying it out
> just in the SDK harness.
> >>>>> >>>>
> >>>>> >>>> Kenn
> >>>>> >>>>
> >>>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <
> xinyuliu.us@gmail.com> wrote:
> >>>>> >>>>>
> >>>>> >>>>> I don't have a strong opinion on the resolution of the futures
> regarding to @FinishBundle invocation. Leaving it to be unspecified does
> give runners more room to implement it with their own support.
> >>>>> >>>>>
> >>>>> >>>>> Optimization is also another great point. Fuse seems pretty
> complex to me too if we need to find a way to chain the resulting future
> into the next transform, or leave the async transform as a standalone stage
> initially?
> >>>>> >>>>>
> >>>>> >>>>> Btw, I was counting the number of replies before we hit the
> portability. Seems after 4 replies fuse finally showed up :).
> >>>>> >>>>>
> >>>>> >>>>> Thanks,
> >>>>> >>>>> Xinyu
> >>>>> >>>>>
> >>>>> >>>>>
> >>>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <
> klk@google.com> wrote:
> >>>>> >>>>>>
> >>>>> >>>>>>
> >>>>> >>>>>>
> >>>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com
> wrote:
> >>>>> >>>>>>>
> >>>>> >>>>>>>
> >>>>> >>>>>>>
> >>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <
> xinyuliu.us@gmail.com> wrote:
> >>>>> >>>>>>>>
> >>>>> >>>>>>>> @Steve: it's good to see that this is going to be useful in
> your use cases as well. Thanks for sharing the code from Scio! I can see in
> your implementation that waiting for the future completion is part of the
> @FinishBundle. We are thinking of taking advantage of the underlying runner
> async support so the user-level code won't need to implement this logic,
> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
> after future completion[1], and Flink also has AsyncFunction api [2] which
> provides a ResultFuture similar to the API we discussed.
> >>>>> >>>>>>>
> >>>>> >>>>>>>
> >>>>> >>>>>>> Can this be done correctly? What I mean is that if the
> process dies, can you guarantee that no data is lost? Beam currently
> guarantees this for FinishBundle, but if you use an arbitrary async
> framework this might not be true.
> >>>>> >>>>>>
> >>>>> >>>>>>
> >>>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is
> committed, *then* finishbundle has run. So it seems just as easy to say
> *if* a bundle is committed, *then* every async result has been resolved.
> >>>>> >>>>>>
> >>>>> >>>>>> If the process dies the two cases should be naturally
> analogous.
> >>>>> >>>>>>
> >>>>> >>>>>> But it raises the question of whether they should be resolved
> prior to finishbundle, after, or unspecified. I lean toward unspecified.
> >>>>> >>>>>>
> >>>>> >>>>>> That's for a single ParDo. Where this could get complex is
> optimizing fused stages for greater asynchrony.
> >>>>> >>>>>>
> >>>>> >>>>>> Kenn
> >>>>> >>>>>>
> >>>>> >>>>>>>
> >>>>> >>>>>>>>
> >>>>> >>>>>>>> A simple use case for this is to execute a Runnable
> asynchronously in user's own executor. The following code illustrates
> Kenn's option #2, with a very simple single-thread pool being the executor:
> >>>>> >>>>>>>>
> >>>>> >>>>>>>> new DoFn<InputT, OutputT>() {
> >>>>> >>>>>>>>   @ProcessElement
> >>>>> >>>>>>>>   public void process(@Element InputT element, @Output
> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
> >>>>> >>>>>>>>     CompletableFuture<OutputT> future =
> CompletableFuture.supplyAsync(
> >>>>> >>>>>>>>         () -> someOutput,
> >>>>> >>>>>>>>         Executors.newSingleThreadExecutor());
> >>>>> >>>>>>>>     outputReceiver.output(future);
> >>>>> >>>>>>>>   }
> >>>>> >>>>>>>> }
> >>>>> >>>>>>>>
> >>>>> >>>>>>>> The neat thing about this API is that the user can choose
> their own async framework and we only expect the output to be a
> CompletionStage.
> >>>>> >>>>>>>>
> >>>>> >>>>>>>>
> >>>>> >>>>>>>> For the implementation of bundling, can we compose a
> CompletableFuture from each element in the bundle, e.g.
> CompletableFuture.allOf(...), and then invoke @FinishBundle when this
> future is complete? Seems this might work.
> >>>>> >>>>>>>>
> >>>>> >>>>>>>> Thanks,
> >>>>> >>>>>>>> Xinyu
> >>>>> >>>>>>>>
> >>>>> >>>>>>>>
> >>>>> >>>>>>>> [1]
> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
> >>>>> >>>>>>>> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
> >>>>> >>>>>>>>
> >>>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <
> sniemitz@apache.org> wrote:
> >>>>> >>>>>>>>>
> >>>>> >>>>>>>>> I'd love to see something like this as well.  Also +1 to
> process(@Element InputT element, @Output
> OutputReceiver<CompletionStage<OutputT>>).  I don't know if there's much
> benefit to passing a future in, since the framework itself could hook up
> the process function to complete when the future completes.
> >>>>> >>>>>>>>>
> >>>>> >>>>>>>>> I feel like I've spent a bunch of time writing very
> similar "kick off a future in ProcessElement, join it in FinishBundle"
> code, and looking around beam itself a lot of built-in transforms do it as
> well.  Scio provides a few AsyncDoFn implementations [1] but it'd be great
> to see this as a first-class concept in beam itself.  Doing error handling,
> concurrency, etc correctly can be tricky.
> >>>>> >>>>>>>>>
> >>>>> >>>>>>>>> [1]
> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
> >>>>> >>>>>>>>>
> >>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <
> klk@google.com> wrote:
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the output
> should also be a CompletionStage<OutputT>, since all you should do is async
> chaining. We could enforce this by giving the DoFn an
> OutputReceiver(CompletionStage<OutputT>).
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> Another possibility that might be even more robust
> against poor future use could be process(@Element InputT element, @Output
> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
> itself will be async chained, rather than counting on the user to do the
> right thing.
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> We should see how these look in real use cases. The way
> that processing is split between @ProcessElement and @FinishBundle might
> complicate things.
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> Kenn
> >>>>> >>>>>>>>>>
> >>>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <
> xinyuliu.us@gmail.com> wrote:
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>> Hi, guys,
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner,
> we got a lot of asks for an asynchronous processing API. There are a few
> reasons for these asks:
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>> The users here are experienced in asynchronous
> programming. With async frameworks such as Netty and ParSeq and libs like
> async jersey client, they are able to make remote calls efficiently and the
> libraries help manage the execution threads underneath. Async remote calls
> are very common in most of our streaming applications today.
> >>>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async
> processing helps for less resource usage and fast computation (less context
> switch).
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>> I asked about the async support in a previous email
> thread. The following API was mentioned in the reply:
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>>   new DoFn<InputT, OutputT>() {
> >>>>> >>>>>>>>>>>     @ProcessElement
> >>>>> >>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
> element, ...) {
> >>>>> >>>>>>>>>>>       element.thenApply(...)
> >>>>> >>>>>>>>>>>     }
> >>>>> >>>>>>>>>>>   }
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>> We are wondering whether there are any discussions on
> this API and related docs. It is awesome that you guys already considered
> having DoFn to process asynchronously. Out of curiosity, this API seems to
> create a CompletionState out of the input element (probably using
> framework's executor) and then allow user to chain on it. To us, it seems
> more convenient if the DoFn output a CompletionStage<OutputT> or pass in a
> CompletionStage<OutputT> to invoke upon completion.
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>> We would like to discuss further on the async API and
> hopefully we will have a great support in Beam. Really appreciate the
> feedback!
> >>>>> >>>>>>>>>>>
> >>>>> >>>>>>>>>>> Thanks,
> >>>>> >>>>>>>>>>> Xinyu
> >>>>> >>>
> >>>>> >>>
> >>>>> >>>
> >>>>> >>> --
> >>>>> >>>
> >>>>> >>>
> >>>>> >>>
> >>>>> >>>
> >>>>> >>> Got feedback? tinyurl.com/swegner-feedback
> >
> >
> >
> > --
> >
> >
> >
> >
> > Got feedback? tinyurl.com/swegner-feedback
>

Re: [DISCUSSION] ParDo Async Java API

Posted by Robert Bradshaw <ro...@google.com>.
That's a good point that this "IO" time should be tracked differently.

For a single level, a wrapper/utility that correctly and completely
(and transparently) implements the "naive" bit I sketched above under
the hood may be sufficient and implementable purely in user-space, and
quite useful.

On Thu, Jan 24, 2019 at 7:38 PM Scott Wegner <sc...@apache.org> wrote:
>
> Makes sense to me. We should make it easier to write DoFn's in this pattern that has emerged as common among I/O connectors.
>
> Enabling asynchronous task chaining across a fusion tree is more complicated but not necessary for this scenario.
>
> On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz <sn...@apache.org> wrote:
>>
>> It's also important to note that in many (most?) IO frameworks (gRPC, finagle, etc), asynchronous IO is typically completely non-blocking, so there generally won't be a large number of threads waiting for IO to complete.  (netty uses a small pool of threads for the Event Loop Group for example).
>>
>> But in general I agree with Reuven, runners should not count threads in use in other thread pools for IO for the purpose of autoscaling (or most kinds of accounting).
>>
>> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <re...@google.com> wrote:
>>>
>>> As Steve said, the main rationale for this is so that asynchronous IOs (or in general, asynchronous remote calls) call be made. To some degree this addresses Scott's concern: the asynchronous threads should be, for the most part, simply waiting for IOs to complete; the reason to do the waiting asynchronously is so that the main threadpool does not become blocked, causing the pipeline to become IO bound. A runner like Dataflow should not be tracking these threads for the purpose of autoscaling, as adding more workers will (usually) not cause these calls to complete any faster.
>>>
>>> Reuven
>>>
>>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <sn...@apache.org> wrote:
>>>>
>>>> I think I agree with a lot of what you said here, I'm just going to restate my initial use-case to try to make it more clear as well.
>>>>
>>>> From my usage of beam, I feel like the big benefit of async DoFns would be to allow batched IO to be implemented more simply inside a DoFn.  Even in the Beam SDK itself, there are a lot of IOs that batch up IO operations in ProcessElement and wait for them to complete in FinishBundle ([1][2], etc).  From my experience, things like error handling, emitting outputs as the result of an asynchronous operation completing (in the correct window, with the correct timestamp, etc) get pretty tricky, and it would be great for the SDK to provide support natively for it.
>>>>
>>>> It's also probably good to point out that really only DoFns that do IO should be asynchronous, normal CPU bound DoFns have no reason to be asynchronous.
>>>>
>>>> A really good example of this is an IO I had written recently for Bigtable, it takes an input PCollection of ByteStrings representing row keys, and returns a PCollection of the row data from bigtable.  Naively this could be implemented by simply blocking on the Bigtable read inside the ParDo, however this would limit throughput substantially (even assuming an avg read latency is 1ms, thats still only 1000 QPS / instance of the ParDo).  My implementation batches many reads together (as they arrive at the DoFn), executes them once the batch is big enough (or some time passes), and then emits them once the batch read completes.  Emitting them in the correct window and handling errors gets tricky, so this is certainly something I'd love the framework itself to handle.
>>>>
>>>> I also don't see a big benefit of making a DoFn receive a future, if all a user is ever supposed to do is attach a continuation to it, that could just as easily be done by the runner itself, basically just invoking the entire ParDo as a continuation on the future (which then assumes the runner is even representing these tasks as futures internally).
>>>>
>>>> Making the DoFn itself actually return a future could be an option, even if the language itself doesn't support something like `await`, you could still implement it yourself in the DoFn, however, it seems like it'd be a strange contrast to the non-async version, which returns void.
>>>>
>>>> [1] https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
>>>> [2] https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
>>>>
>>>>
>>>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <ro...@google.com> wrote:
>>>>>
>>>>> If I understand correctly, the end goal is to process input elements
>>>>> of a DoFn asynchronously. Were I to do this naively, I would implement
>>>>> DoFns that simply take and receive [Serializable?]CompletionStages as
>>>>> element types, followed by a DoFn that adds a callback to emit on
>>>>> completion (possibly via a queue to avoid being-on-the-wrong-thread
>>>>> issues) and whose finalize forces all completions. This would, of
>>>>> course, interact poorly with processing time tracking, fusion breaks,
>>>>> watermark tracking, counter attribution, window propagation, etc. so
>>>>> it is desirable to make it part of the system itself.
>>>>>
>>>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a decent
>>>>> API. The invoking of the downstream process could be chained onto
>>>>> this, with all the implicit tracking and tracing set up correctly.
>>>>> Taking a CompletionStage as input means a DoFn would not have to
>>>>> create its output CompletionStage ex nihilo and possibly allow for
>>>>> better chaining (depending on the asynchronous APIs used).
>>>>>
>>>>> Even better might be to simply let the invocation of all
>>>>> DoFn.process() methods be asynchronous, but as Java doesn't offer an
>>>>> await primitive to relinquish control in the middle of a function body
>>>>> this might be hard.
>>>>>
>>>>> I think for correctness, completion would have to be forced at the end
>>>>> of each bundle. If your bundles are large enough, this may not be that
>>>>> big of a deal. In this case you could also start executing subsequent
>>>>> bundles while waiting for prior ones to complete.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
>>>>> <co...@gmail.com> wrote:
>>>>> >>
>>>>> >> I'd love to see something like this as well.  Also +1 to process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much benefit to passing a future in, since the framework itself could hook up the process function to complete when the future completes.
>>>>> >
>>>>> >
>>>>> > One benefit we get by wrapping the input with CompletionStage is to mandate[1] users to chain their processing logic to the input future; thereby, ensuring asynchrony for the most part. However, it is still possible for users to go out of their way and write blocking code.
>>>>> >
>>>>> > Although, I am not sure how counter intuitive it is for the runners to wrap the input element into a future before passing it to the user code.
>>>>> >
>>>>> > Bharath
>>>>> >
>>>>> > [1] CompletionStage interface does not define methods for initially creating, forcibly completing normally or exceptionally, probing completion status or results, or awaiting completion of a stage. Implementations of CompletionStage may provide means of achieving such effects, as appropriate
>>>>> >
>>>>> >
>>>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <ke...@apache.org> wrote:
>>>>> >>
>>>>> >> I think your concerns are valid but i want to clarify about "first class async APIs". Does "first class" mean that it is a well-encapsulated abstraction? or does it mean that the user can more or less do whatever they want? These are opposite but both valid meanings for "first class", to me.
>>>>> >>
>>>>> >> I would not want to encourage users to do explicit multi-threaded programming or control parallelism. Part of the point of Beam is to gain big data parallelism without explicit multithreading. I see asynchronous chaining of futures (or their best-approximation in your language of choice) as a highly disciplined way of doing asynchronous dependency-driven computation that is nonetheless conceptually, and readably, straight-line code. Threads are not required nor the only way to execute this code. In fact you might often want to execute without threading for a reference implementation to provide canonically correct results. APIs that leak lower-level details of threads are asking for trouble.
>>>>> >>
>>>>> >> One of our other ideas was to provide a dynamic parameter of type ExecutorService. The SDK harness (pre-portability: the runner) would control and observe parallelism while the user could simply register tasks. Providing a future/promise API is even more disciplined.
>>>>> >>
>>>>> >> Kenn
>>>>> >>
>>>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org> wrote:
>>>>> >>>
>>>>> >>> A related question is how to make execution observable such that a runner can make proper scaling decisions. Runners decide how to schedule bundles within and across multiple worker instances, and can use information about execution to make dynamic scaling decisions. First-class async APIs seem like they would encourage DoFn authors to implement their own parallelization, rather than deferring to the runner that should be more capable of providing the right level of parallelism.
>>>>> >>>
>>>>> >>> In the Dataflow worker harness, we estimate execution time to PTransform steps by sampling execution time on the execution thread and attributing it to the currently invoked method. This approach is fairly simple and possible because we assume that execution happens within the thread controlled by the runner. Some DoFn's already implement their own async logic and break this assumption; I would expect more if we make async built into the DoFn APIs.
>>>>> >>>
>>>>> >>> So: this isn't an argument against async APIs, but rather: does this break execution observability, and are there other lightweight mechanisms for attributing execution time of async work?
>>>>> >>>
>>>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <kl...@google.com> wrote:
>>>>> >>>>
>>>>> >>>> When executed over the portable APIs, it will be primarily the Java SDK harness that makes all of these decisions. If we wanted runners to have some insight into it we would have to add it to the Beam model protos. I don't have any suggestions there, so I would leave it out of this discussion until there's good ideas. We could learn a lot by trying it out just in the SDK harness.
>>>>> >>>>
>>>>> >>>> Kenn
>>>>> >>>>
>>>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xi...@gmail.com> wrote:
>>>>> >>>>>
>>>>> >>>>> I don't have a strong opinion on the resolution of the futures regarding to @FinishBundle invocation. Leaving it to be unspecified does give runners more room to implement it with their own support.
>>>>> >>>>>
>>>>> >>>>> Optimization is also another great point. Fuse seems pretty complex to me too if we need to find a way to chain the resulting future into the next transform, or leave the async transform as a standalone stage initially?
>>>>> >>>>>
>>>>> >>>>> Btw, I was counting the number of replies before we hit the portability. Seems after 4 replies fuse finally showed up :).
>>>>> >>>>>
>>>>> >>>>> Thanks,
>>>>> >>>>> Xinyu
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <kl...@google.com> wrote:
>>>>> >>>>>>
>>>>> >>>>>>
>>>>> >>>>>>
>>>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com wrote:
>>>>> >>>>>>>
>>>>> >>>>>>>
>>>>> >>>>>>>
>>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xi...@gmail.com> wrote:
>>>>> >>>>>>>>
>>>>> >>>>>>>> @Steve: it's good to see that this is going to be useful in your use cases as well. Thanks for sharing the code from Scio! I can see in your implementation that waiting for the future completion is part of the @FinishBundle. We are thinking of taking advantage of the underlying runner async support so the user-level code won't need to implement this logic, e.g. Samza has an AsyncSteamTask api that provides a callback to invoke after future completion[1], and Flink also has AsyncFunction api [2] which provides a ResultFuture similar to the API we discussed.
>>>>> >>>>>>>
>>>>> >>>>>>>
>>>>> >>>>>>> Can this be done correctly? What I mean is that if the process dies, can you guarantee that no data is lost? Beam currently guarantees this for FinishBundle, but if you use an arbitrary async framework this might not be true.
>>>>> >>>>>>
>>>>> >>>>>>
>>>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is committed, *then* finishbundle has run. So it seems just as easy to say *if* a bundle is committed, *then* every async result has been resolved.
>>>>> >>>>>>
>>>>> >>>>>> If the process dies the two cases should be naturally analogous.
>>>>> >>>>>>
>>>>> >>>>>> But it raises the question of whether they should be resolved prior to finishbundle, after, or unspecified. I lean toward unspecified.
>>>>> >>>>>>
>>>>> >>>>>> That's for a single ParDo. Where this could get complex is optimizing fused stages for greater asynchrony.
>>>>> >>>>>>
>>>>> >>>>>> Kenn
>>>>> >>>>>>
>>>>> >>>>>>>
>>>>> >>>>>>>>
>>>>> >>>>>>>> A simple use case for this is to execute a Runnable asynchronously in user's own executor. The following code illustrates Kenn's option #2, with a very simple single-thread pool being the executor:
>>>>> >>>>>>>>
>>>>> >>>>>>>> new DoFn<InputT, OutputT>() {
>>>>> >>>>>>>>   @ProcessElement
>>>>> >>>>>>>>   public void process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>>>> >>>>>>>>     CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
>>>>> >>>>>>>>         () -> someOutput,
>>>>> >>>>>>>>         Executors.newSingleThreadExecutor());
>>>>> >>>>>>>>     outputReceiver.output(future);
>>>>> >>>>>>>>   }
>>>>> >>>>>>>> }
>>>>> >>>>>>>>
>>>>> >>>>>>>> The neat thing about this API is that the user can choose their own async framework and we only expect the output to be a CompletionStage.
>>>>> >>>>>>>>
>>>>> >>>>>>>>
>>>>> >>>>>>>> For the implementation of bundling, can we compose a CompletableFuture from each element in the bundle, e.g. CompletableFuture.allOf(...), and then invoke @FinishBundle when this future is complete? Seems this might work.
>>>>> >>>>>>>>
>>>>> >>>>>>>> Thanks,
>>>>> >>>>>>>> Xinyu
>>>>> >>>>>>>>
>>>>> >>>>>>>>
>>>>> >>>>>>>> [1] https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>> >>>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>>> >>>>>>>>
>>>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sn...@apache.org> wrote:
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> I'd love to see something like this as well.  Also +1 to process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I don't know if there's much benefit to passing a future in, since the framework itself could hook up the process function to complete when the future completes.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> I feel like I've spent a bunch of time writing very similar "kick off a future in ProcessElement, join it in FinishBundle" code, and looking around beam itself a lot of built-in transforms do it as well.  Scio provides a few AsyncDoFn implementations [1] but it'd be great to see this as a first-class concept in beam itself.  Doing error handling, concurrency, etc correctly can be tricky.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> [1] https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com> wrote:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the output should also be a CompletionStage<OutputT>, since all you should do is async chaining. We could enforce this by giving the DoFn an OutputReceiver(CompletionStage<OutputT>).
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Another possibility that might be even more robust against poor future use could be process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>>). In this way, the process method itself will be async chained, rather than counting on the user to do the right thing.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> We should see how these look in real use cases. The way that processing is split between @ProcessElement and @FinishBundle might complicate things.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Kenn
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xi...@gmail.com> wrote:
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> Hi, guys,
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner, we got a lot of asks for an asynchronous processing API. There are a few reasons for these asks:
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> The users here are experienced in asynchronous programming. With async frameworks such as Netty and ParSeq and libs like async jersey client, they are able to make remote calls efficiently and the libraries help manage the execution threads underneath. Async remote calls are very common in most of our streaming applications today.
>>>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async processing helps for less resource usage and fast computation (less context switch).
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> I asked about the async support in a previous email thread. The following API was mentioned in the reply:
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>> >>>>>>>>>>>     @ProcessElement
>>>>> >>>>>>>>>>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>>>> >>>>>>>>>>>       element.thenApply(...)
>>>>> >>>>>>>>>>>     }
>>>>> >>>>>>>>>>>   }
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> We are wondering whether there are any discussions on this API and related docs. It is awesome that you guys already considered having DoFn to process asynchronously. Out of curiosity, this API seems to create a CompletionState out of the input element (probably using framework's executor) and then allow user to chain on it. To us, it seems more convenient if the DoFn output a CompletionStage<OutputT> or pass in a CompletionStage<OutputT> to invoke upon completion.
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> We would like to discuss further on the async API and hopefully we will have a great support in Beam. Really appreciate the feedback!
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> Thanks,
>>>>> >>>>>>>>>>> Xinyu
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> --
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> Got feedback? tinyurl.com/swegner-feedback
>
>
>
> --
>
>
>
>
> Got feedback? tinyurl.com/swegner-feedback

Re: [DISCUSSION] ParDo Async Java API

Posted by Scott Wegner <sc...@apache.org>.
Makes sense to me. We should make it easier to write DoFn's in this pattern
that has emerged as common among I/O connectors.

Enabling asynchronous task chaining across a fusion tree is more
complicated but not necessary for this scenario.

On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz <sn...@apache.org> wrote:

> It's also important to note that in many (most?) IO frameworks (gRPC,
> finagle, etc), asynchronous IO is typically completely non-blocking, so
> there generally won't be a large number of threads waiting for IO to
> complete.  (netty uses a small pool of threads for the Event Loop Group for
> example).
>
> But in general I agree with Reuven, runners should not count threads in
> use in other thread pools for IO for the purpose of autoscaling (or most
> kinds of accounting).
>
> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <re...@google.com> wrote:
>
>> As Steve said, the main rationale for this is so that asynchronous IOs
>> (or in general, asynchronous remote calls) call be made. To some degree
>> this addresses Scott's concern: the asynchronous threads should be, for the
>> most part, simply waiting for IOs to complete; the reason to do the waiting
>> asynchronously is so that the main threadpool does not become blocked,
>> causing the pipeline to become IO bound. A runner like Dataflow should not
>> be tracking these threads for the purpose of autoscaling, as adding more
>> workers will (usually) not cause these calls to complete any faster.
>>
>> Reuven
>>
>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <sn...@apache.org>
>> wrote:
>>
>>> I think I agree with a lot of what you said here, I'm just going to
>>> restate my initial use-case to try to make it more clear as well.
>>>
>>> From my usage of beam, I feel like the big benefit of async DoFns would
>>> be to allow batched IO to be implemented more simply inside a DoFn.  Even
>>> in the Beam SDK itself, there are a lot of IOs that batch up IO operations
>>> in ProcessElement and wait for them to complete in FinishBundle ([1][2],
>>> etc).  From my experience, things like error handling, emitting outputs as
>>> the result of an asynchronous operation completing (in the correct window,
>>> with the correct timestamp, etc) get pretty tricky, and it would be great
>>> for the SDK to provide support natively for it.
>>>
>>> It's also probably good to point out that really only DoFns that do IO
>>> should be asynchronous, normal CPU bound DoFns have no reason to be
>>> asynchronous.
>>>
>>> A really good example of this is an IO I had written recently for
>>> Bigtable, it takes an input PCollection of ByteStrings representing row
>>> keys, and returns a PCollection of the row data from bigtable.  Naively
>>> this could be implemented by simply blocking on the Bigtable read inside
>>> the ParDo, however this would limit throughput substantially (even assuming
>>> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
>>> ParDo).  My implementation batches many reads together (as they arrive at
>>> the DoFn), executes them once the batch is big enough (or some time
>>> passes), and then emits them once the batch read completes.  Emitting them
>>> in the correct window and handling errors gets tricky, so this is certainly
>>> something I'd love the framework itself to handle.
>>>
>>> I also don't see a big benefit of making a DoFn receive a future, if all
>>> a user is ever supposed to do is attach a continuation to it, that could
>>> just as easily be done by the runner itself, basically just invoking the
>>> entire ParDo as a continuation on the future (which then assumes the runner
>>> is even representing these tasks as futures internally).
>>>
>>> Making the DoFn itself actually return a future could be an option, even
>>> if the language itself doesn't support something like `await`, you could
>>> still implement it yourself in the DoFn, however, it seems like it'd be a
>>> strange contrast to the non-async version, which returns void.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
>>> [2]
>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
>>>
>>>
>>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> If I understand correctly, the end goal is to process input elements
>>>> of a DoFn asynchronously. Were I to do this naively, I would implement
>>>> DoFns that simply take and receive [Serializable?]CompletionStages as
>>>> element types, followed by a DoFn that adds a callback to emit on
>>>> completion (possibly via a queue to avoid being-on-the-wrong-thread
>>>> issues) and whose finalize forces all completions. This would, of
>>>> course, interact poorly with processing time tracking, fusion breaks,
>>>> watermark tracking, counter attribution, window propagation, etc. so
>>>> it is desirable to make it part of the system itself.
>>>>
>>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a decent
>>>> API. The invoking of the downstream process could be chained onto
>>>> this, with all the implicit tracking and tracing set up correctly.
>>>> Taking a CompletionStage as input means a DoFn would not have to
>>>> create its output CompletionStage ex nihilo and possibly allow for
>>>> better chaining (depending on the asynchronous APIs used).
>>>>
>>>> Even better might be to simply let the invocation of all
>>>> DoFn.process() methods be asynchronous, but as Java doesn't offer an
>>>> await primitive to relinquish control in the middle of a function body
>>>> this might be hard.
>>>>
>>>> I think for correctness, completion would have to be forced at the end
>>>> of each bundle. If your bundles are large enough, this may not be that
>>>> big of a deal. In this case you could also start executing subsequent
>>>> bundles while waiting for prior ones to complete.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
>>>> <co...@gmail.com> wrote:
>>>> >>
>>>> >> I'd love to see something like this as well.  Also +1 to
>>>> process(@Element InputT element, @Output
>>>> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much
>>>> benefit to passing a future in, since the framework itself could hook up
>>>> the process function to complete when the future completes.
>>>> >
>>>> >
>>>> > One benefit we get by wrapping the input with CompletionStage is to
>>>> mandate[1] users to chain their processing logic to the input future;
>>>> thereby, ensuring asynchrony for the most part. However, it is still
>>>> possible for users to go out of their way and write blocking code.
>>>> >
>>>> > Although, I am not sure how counter intuitive it is for the runners
>>>> to wrap the input element into a future before passing it to the user code.
>>>> >
>>>> > Bharath
>>>> >
>>>> > [1] CompletionStage interface does not define methods for initially
>>>> creating, forcibly completing normally or exceptionally, probing completion
>>>> status or results, or awaiting completion of a stage. Implementations of
>>>> CompletionStage may provide means of achieving such effects, as appropriate
>>>> >
>>>> >
>>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>> >>
>>>> >> I think your concerns are valid but i want to clarify about "first
>>>> class async APIs". Does "first class" mean that it is a well-encapsulated
>>>> abstraction? or does it mean that the user can more or less do whatever
>>>> they want? These are opposite but both valid meanings for "first class", to
>>>> me.
>>>> >>
>>>> >> I would not want to encourage users to do explicit multi-threaded
>>>> programming or control parallelism. Part of the point of Beam is to gain
>>>> big data parallelism without explicit multithreading. I see asynchronous
>>>> chaining of futures (or their best-approximation in your language of
>>>> choice) as a highly disciplined way of doing asynchronous dependency-driven
>>>> computation that is nonetheless conceptually, and readably, straight-line
>>>> code. Threads are not required nor the only way to execute this code. In
>>>> fact you might often want to execute without threading for a reference
>>>> implementation to provide canonically correct results. APIs that leak
>>>> lower-level details of threads are asking for trouble.
>>>> >>
>>>> >> One of our other ideas was to provide a dynamic parameter of type
>>>> ExecutorService. The SDK harness (pre-portability: the runner) would
>>>> control and observe parallelism while the user could simply register tasks.
>>>> Providing a future/promise API is even more disciplined.
>>>> >>
>>>> >> Kenn
>>>> >>
>>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org>
>>>> wrote:
>>>> >>>
>>>> >>> A related question is how to make execution observable such that a
>>>> runner can make proper scaling decisions. Runners decide how to schedule
>>>> bundles within and across multiple worker instances, and can use
>>>> information about execution to make dynamic scaling decisions. First-class
>>>> async APIs seem like they would encourage DoFn authors to implement their
>>>> own parallelization, rather than deferring to the runner that should be
>>>> more capable of providing the right level of parallelism.
>>>> >>>
>>>> >>> In the Dataflow worker harness, we estimate execution time to
>>>> PTransform steps by sampling execution time on the execution thread and
>>>> attributing it to the currently invoked method. This approach is fairly
>>>> simple and possible because we assume that execution happens within the
>>>> thread controlled by the runner. Some DoFn's already implement their own
>>>> async logic and break this assumption; I would expect more if we make async
>>>> built into the DoFn APIs.
>>>> >>>
>>>> >>> So: this isn't an argument against async APIs, but rather: does
>>>> this break execution observability, and are there other lightweight
>>>> mechanisms for attributing execution time of async work?
>>>> >>>
>>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <kl...@google.com>
>>>> wrote:
>>>> >>>>
>>>> >>>> When executed over the portable APIs, it will be primarily the
>>>> Java SDK harness that makes all of these decisions. If we wanted runners to
>>>> have some insight into it we would have to add it to the Beam model protos.
>>>> I don't have any suggestions there, so I would leave it out of this
>>>> discussion until there's good ideas. We could learn a lot by trying it out
>>>> just in the SDK harness.
>>>> >>>>
>>>> >>>> Kenn
>>>> >>>>
>>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xi...@gmail.com>
>>>> wrote:
>>>> >>>>>
>>>> >>>>> I don't have a strong opinion on the resolution of the futures
>>>> regarding to @FinishBundle invocation. Leaving it to be unspecified does
>>>> give runners more room to implement it with their own support.
>>>> >>>>>
>>>> >>>>> Optimization is also another great point. Fuse seems pretty
>>>> complex to me too if we need to find a way to chain the resulting future
>>>> into the next transform, or leave the async transform as a standalone stage
>>>> initially?
>>>> >>>>>
>>>> >>>>> Btw, I was counting the number of replies before we hit the
>>>> portability. Seems after 4 replies fuse finally showed up :).
>>>> >>>>>
>>>> >>>>> Thanks,
>>>> >>>>> Xinyu
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <kl...@google.com>
>>>> wrote:
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com wrote:
>>>> >>>>>>>
>>>> >>>>>>>
>>>> >>>>>>>
>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <
>>>> xinyuliu.us@gmail.com> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>> @Steve: it's good to see that this is going to be useful in
>>>> your use cases as well. Thanks for sharing the code from Scio! I can see in
>>>> your implementation that waiting for the future completion is part of the
>>>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>>>> async support so the user-level code won't need to implement this logic,
>>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>>>> after future completion[1], and Flink also has AsyncFunction api [2] which
>>>> provides a ResultFuture similar to the API we discussed.
>>>> >>>>>>>
>>>> >>>>>>>
>>>> >>>>>>> Can this be done correctly? What I mean is that if the process
>>>> dies, can you guarantee that no data is lost? Beam currently guarantees
>>>> this for FinishBundle, but if you use an arbitrary async framework this
>>>> might not be true.
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is
>>>> committed, *then* finishbundle has run. So it seems just as easy to say
>>>> *if* a bundle is committed, *then* every async result has been resolved.
>>>> >>>>>>
>>>> >>>>>> If the process dies the two cases should be naturally analogous.
>>>> >>>>>>
>>>> >>>>>> But it raises the question of whether they should be resolved
>>>> prior to finishbundle, after, or unspecified. I lean toward unspecified.
>>>> >>>>>>
>>>> >>>>>> That's for a single ParDo. Where this could get complex is
>>>> optimizing fused stages for greater asynchrony.
>>>> >>>>>>
>>>> >>>>>> Kenn
>>>> >>>>>>
>>>> >>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>> A simple use case for this is to execute a Runnable
>>>> asynchronously in user's own executor. The following code illustrates
>>>> Kenn's option #2, with a very simple single-thread pool being the executor:
>>>> >>>>>>>>
>>>> >>>>>>>> new DoFn<InputT, OutputT>() {
>>>> >>>>>>>>   @ProcessElement
>>>> >>>>>>>>   public void process(@Element InputT element, @Output
>>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>>> >>>>>>>>     CompletableFuture<OutputT> future =
>>>> CompletableFuture.supplyAsync(
>>>> >>>>>>>>         () -> someOutput,
>>>> >>>>>>>>         Executors.newSingleThreadExecutor());
>>>> >>>>>>>>     outputReceiver.output(future);
>>>> >>>>>>>>   }
>>>> >>>>>>>> }
>>>> >>>>>>>>
>>>> >>>>>>>> The neat thing about this API is that the user can choose
>>>> their own async framework and we only expect the output to be a
>>>> CompletionStage.
>>>> >>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>> For the implementation of bundling, can we compose a
>>>> CompletableFuture from each element in the bundle, e.g.
>>>> CompletableFuture.allOf(...), and then invoke @FinishBundle when this
>>>> future is complete? Seems this might work.
>>>> >>>>>>>>
>>>> >>>>>>>> Thanks,
>>>> >>>>>>>> Xinyu
>>>> >>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>> [1]
>>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>> >>>>>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>> >>>>>>>>
>>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <
>>>> sniemitz@apache.org> wrote:
>>>> >>>>>>>>>
>>>> >>>>>>>>> I'd love to see something like this as well.  Also +1 to
>>>> process(@Element InputT element, @Output
>>>> OutputReceiver<CompletionStage<OutputT>>).  I don't know if there's much
>>>> benefit to passing a future in, since the framework itself could hook up
>>>> the process function to complete when the future completes.
>>>> >>>>>>>>>
>>>> >>>>>>>>> I feel like I've spent a bunch of time writing very similar
>>>> "kick off a future in ProcessElement, join it in FinishBundle" code, and
>>>> looking around beam itself a lot of built-in transforms do it as well.
>>>> Scio provides a few AsyncDoFn implementations [1] but it'd be great to see
>>>> this as a first-class concept in beam itself.  Doing error handling,
>>>> concurrency, etc correctly can be tricky.
>>>> >>>>>>>>>
>>>> >>>>>>>>> [1]
>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>> >>>>>>>>>
>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <
>>>> klk@google.com> wrote:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the output
>>>> should also be a CompletionStage<OutputT>, since all you should do is async
>>>> chaining. We could enforce this by giving the DoFn an
>>>> OutputReceiver(CompletionStage<OutputT>).
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Another possibility that might be even more robust against
>>>> poor future use could be process(@Element InputT element, @Output
>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>>> itself will be async chained, rather than counting on the user to do the
>>>> right thing.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> We should see how these look in real use cases. The way that
>>>> processing is split between @ProcessElement and @FinishBundle might
>>>> complicate things.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Kenn
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <
>>>> xinyuliu.us@gmail.com> wrote:
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> Hi, guys,
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner, we
>>>> got a lot of asks for an asynchronous processing API. There are a few
>>>> reasons for these asks:
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> The users here are experienced in asynchronous programming.
>>>> With async frameworks such as Netty and ParSeq and libs like async jersey
>>>> client, they are able to make remote calls efficiently and the libraries
>>>> help manage the execution threads underneath. Async remote calls are very
>>>> common in most of our streaming applications today.
>>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async
>>>> processing helps for less resource usage and fast computation (less context
>>>> switch).
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> I asked about the async support in a previous email thread.
>>>> The following API was mentioned in the reply:
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>> >>>>>>>>>>>     @ProcessElement
>>>> >>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>>>> element, ...) {
>>>> >>>>>>>>>>>       element.thenApply(...)
>>>> >>>>>>>>>>>     }
>>>> >>>>>>>>>>>   }
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> We are wondering whether there are any discussions on this
>>>> API and related docs. It is awesome that you guys already considered having
>>>> DoFn to process asynchronously. Out of curiosity, this API seems to create
>>>> a CompletionState out of the input element (probably using framework's
>>>> executor) and then allow user to chain on it. To us, it seems more
>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>>> CompletionStage<OutputT> to invoke upon completion.
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> We would like to discuss further on the async API and
>>>> hopefully we will have a great support in Beam. Really appreciate the
>>>> feedback!
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> Thanks,
>>>> >>>>>>>>>>> Xinyu
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> Got feedback? tinyurl.com/swegner-feedback
>>>>
>>>

-- 




Got feedback? tinyurl.com/swegner-feedback

Re: [DISCUSSION] ParDo Async Java API

Posted by Steve Niemitz <sn...@apache.org>.
It's also important to note that in many (most?) IO frameworks (gRPC,
finagle, etc), asynchronous IO is typically completely non-blocking, so
there generally won't be a large number of threads waiting for IO to
complete.  (netty uses a small pool of threads for the Event Loop Group for
example).

But in general I agree with Reuven, runners should not count threads in use
in other thread pools for IO for the purpose of autoscaling (or most kinds
of accounting).

On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <re...@google.com> wrote:

> As Steve said, the main rationale for this is so that asynchronous IOs (or
> in general, asynchronous remote calls) call be made. To some degree this
> addresses Scott's concern: the asynchronous threads should be, for the most
> part, simply waiting for IOs to complete; the reason to do the waiting
> asynchronously is so that the main threadpool does not become blocked,
> causing the pipeline to become IO bound. A runner like Dataflow should not
> be tracking these threads for the purpose of autoscaling, as adding more
> workers will (usually) not cause these calls to complete any faster.
>
> Reuven
>
> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <sn...@apache.org> wrote:
>
>> I think I agree with a lot of what you said here, I'm just going to
>> restate my initial use-case to try to make it more clear as well.
>>
>> From my usage of beam, I feel like the big benefit of async DoFns would
>> be to allow batched IO to be implemented more simply inside a DoFn.  Even
>> in the Beam SDK itself, there are a lot of IOs that batch up IO operations
>> in ProcessElement and wait for them to complete in FinishBundle ([1][2],
>> etc).  From my experience, things like error handling, emitting outputs as
>> the result of an asynchronous operation completing (in the correct window,
>> with the correct timestamp, etc) get pretty tricky, and it would be great
>> for the SDK to provide support natively for it.
>>
>> It's also probably good to point out that really only DoFns that do IO
>> should be asynchronous, normal CPU bound DoFns have no reason to be
>> asynchronous.
>>
>> A really good example of this is an IO I had written recently for
>> Bigtable, it takes an input PCollection of ByteStrings representing row
>> keys, and returns a PCollection of the row data from bigtable.  Naively
>> this could be implemented by simply blocking on the Bigtable read inside
>> the ParDo, however this would limit throughput substantially (even assuming
>> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
>> ParDo).  My implementation batches many reads together (as they arrive at
>> the DoFn), executes them once the batch is big enough (or some time
>> passes), and then emits them once the batch read completes.  Emitting them
>> in the correct window and handling errors gets tricky, so this is certainly
>> something I'd love the framework itself to handle.
>>
>> I also don't see a big benefit of making a DoFn receive a future, if all
>> a user is ever supposed to do is attach a continuation to it, that could
>> just as easily be done by the runner itself, basically just invoking the
>> entire ParDo as a continuation on the future (which then assumes the runner
>> is even representing these tasks as futures internally).
>>
>> Making the DoFn itself actually return a future could be an option, even
>> if the language itself doesn't support something like `await`, you could
>> still implement it yourself in the DoFn, however, it seems like it'd be a
>> strange contrast to the non-async version, which returns void.
>>
>> [1]
>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
>> [2]
>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
>>
>>
>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> If I understand correctly, the end goal is to process input elements
>>> of a DoFn asynchronously. Were I to do this naively, I would implement
>>> DoFns that simply take and receive [Serializable?]CompletionStages as
>>> element types, followed by a DoFn that adds a callback to emit on
>>> completion (possibly via a queue to avoid being-on-the-wrong-thread
>>> issues) and whose finalize forces all completions. This would, of
>>> course, interact poorly with processing time tracking, fusion breaks,
>>> watermark tracking, counter attribution, window propagation, etc. so
>>> it is desirable to make it part of the system itself.
>>>
>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a decent
>>> API. The invoking of the downstream process could be chained onto
>>> this, with all the implicit tracking and tracing set up correctly.
>>> Taking a CompletionStage as input means a DoFn would not have to
>>> create its output CompletionStage ex nihilo and possibly allow for
>>> better chaining (depending on the asynchronous APIs used).
>>>
>>> Even better might be to simply let the invocation of all
>>> DoFn.process() methods be asynchronous, but as Java doesn't offer an
>>> await primitive to relinquish control in the middle of a function body
>>> this might be hard.
>>>
>>> I think for correctness, completion would have to be forced at the end
>>> of each bundle. If your bundles are large enough, this may not be that
>>> big of a deal. In this case you could also start executing subsequent
>>> bundles while waiting for prior ones to complete.
>>>
>>>
>>>
>>>
>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
>>> <co...@gmail.com> wrote:
>>> >>
>>> >> I'd love to see something like this as well.  Also +1 to
>>> process(@Element InputT element, @Output
>>> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much
>>> benefit to passing a future in, since the framework itself could hook up
>>> the process function to complete when the future completes.
>>> >
>>> >
>>> > One benefit we get by wrapping the input with CompletionStage is to
>>> mandate[1] users to chain their processing logic to the input future;
>>> thereby, ensuring asynchrony for the most part. However, it is still
>>> possible for users to go out of their way and write blocking code.
>>> >
>>> > Although, I am not sure how counter intuitive it is for the runners to
>>> wrap the input element into a future before passing it to the user code.
>>> >
>>> > Bharath
>>> >
>>> > [1] CompletionStage interface does not define methods for initially
>>> creating, forcibly completing normally or exceptionally, probing completion
>>> status or results, or awaiting completion of a stage. Implementations of
>>> CompletionStage may provide means of achieving such effects, as appropriate
>>> >
>>> >
>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <ke...@apache.org>
>>> wrote:
>>> >>
>>> >> I think your concerns are valid but i want to clarify about "first
>>> class async APIs". Does "first class" mean that it is a well-encapsulated
>>> abstraction? or does it mean that the user can more or less do whatever
>>> they want? These are opposite but both valid meanings for "first class", to
>>> me.
>>> >>
>>> >> I would not want to encourage users to do explicit multi-threaded
>>> programming or control parallelism. Part of the point of Beam is to gain
>>> big data parallelism without explicit multithreading. I see asynchronous
>>> chaining of futures (or their best-approximation in your language of
>>> choice) as a highly disciplined way of doing asynchronous dependency-driven
>>> computation that is nonetheless conceptually, and readably, straight-line
>>> code. Threads are not required nor the only way to execute this code. In
>>> fact you might often want to execute without threading for a reference
>>> implementation to provide canonically correct results. APIs that leak
>>> lower-level details of threads are asking for trouble.
>>> >>
>>> >> One of our other ideas was to provide a dynamic parameter of type
>>> ExecutorService. The SDK harness (pre-portability: the runner) would
>>> control and observe parallelism while the user could simply register tasks.
>>> Providing a future/promise API is even more disciplined.
>>> >>
>>> >> Kenn
>>> >>
>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org>
>>> wrote:
>>> >>>
>>> >>> A related question is how to make execution observable such that a
>>> runner can make proper scaling decisions. Runners decide how to schedule
>>> bundles within and across multiple worker instances, and can use
>>> information about execution to make dynamic scaling decisions. First-class
>>> async APIs seem like they would encourage DoFn authors to implement their
>>> own parallelization, rather than deferring to the runner that should be
>>> more capable of providing the right level of parallelism.
>>> >>>
>>> >>> In the Dataflow worker harness, we estimate execution time to
>>> PTransform steps by sampling execution time on the execution thread and
>>> attributing it to the currently invoked method. This approach is fairly
>>> simple and possible because we assume that execution happens within the
>>> thread controlled by the runner. Some DoFn's already implement their own
>>> async logic and break this assumption; I would expect more if we make async
>>> built into the DoFn APIs.
>>> >>>
>>> >>> So: this isn't an argument against async APIs, but rather: does this
>>> break execution observability, and are there other lightweight mechanisms
>>> for attributing execution time of async work?
>>> >>>
>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <kl...@google.com>
>>> wrote:
>>> >>>>
>>> >>>> When executed over the portable APIs, it will be primarily the Java
>>> SDK harness that makes all of these decisions. If we wanted runners to have
>>> some insight into it we would have to add it to the Beam model protos. I
>>> don't have any suggestions there, so I would leave it out of this
>>> discussion until there's good ideas. We could learn a lot by trying it out
>>> just in the SDK harness.
>>> >>>>
>>> >>>> Kenn
>>> >>>>
>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xi...@gmail.com>
>>> wrote:
>>> >>>>>
>>> >>>>> I don't have a strong opinion on the resolution of the futures
>>> regarding to @FinishBundle invocation. Leaving it to be unspecified does
>>> give runners more room to implement it with their own support.
>>> >>>>>
>>> >>>>> Optimization is also another great point. Fuse seems pretty
>>> complex to me too if we need to find a way to chain the resulting future
>>> into the next transform, or leave the async transform as a standalone stage
>>> initially?
>>> >>>>>
>>> >>>>> Btw, I was counting the number of replies before we hit the
>>> portability. Seems after 4 replies fuse finally showed up :).
>>> >>>>>
>>> >>>>> Thanks,
>>> >>>>> Xinyu
>>> >>>>>
>>> >>>>>
>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <kl...@google.com>
>>> wrote:
>>> >>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com wrote:
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xi...@gmail.com>
>>> wrote:
>>> >>>>>>>>
>>> >>>>>>>> @Steve: it's good to see that this is going to be useful in
>>> your use cases as well. Thanks for sharing the code from Scio! I can see in
>>> your implementation that waiting for the future completion is part of the
>>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>>> async support so the user-level code won't need to implement this logic,
>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>>> after future completion[1], and Flink also has AsyncFunction api [2] which
>>> provides a ResultFuture similar to the API we discussed.
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> Can this be done correctly? What I mean is that if the process
>>> dies, can you guarantee that no data is lost? Beam currently guarantees
>>> this for FinishBundle, but if you use an arbitrary async framework this
>>> might not be true.
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is
>>> committed, *then* finishbundle has run. So it seems just as easy to say
>>> *if* a bundle is committed, *then* every async result has been resolved.
>>> >>>>>>
>>> >>>>>> If the process dies the two cases should be naturally analogous.
>>> >>>>>>
>>> >>>>>> But it raises the question of whether they should be resolved
>>> prior to finishbundle, after, or unspecified. I lean toward unspecified.
>>> >>>>>>
>>> >>>>>> That's for a single ParDo. Where this could get complex is
>>> optimizing fused stages for greater asynchrony.
>>> >>>>>>
>>> >>>>>> Kenn
>>> >>>>>>
>>> >>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> A simple use case for this is to execute a Runnable
>>> asynchronously in user's own executor. The following code illustrates
>>> Kenn's option #2, with a very simple single-thread pool being the executor:
>>> >>>>>>>>
>>> >>>>>>>> new DoFn<InputT, OutputT>() {
>>> >>>>>>>>   @ProcessElement
>>> >>>>>>>>   public void process(@Element InputT element, @Output
>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>> >>>>>>>>     CompletableFuture<OutputT> future =
>>> CompletableFuture.supplyAsync(
>>> >>>>>>>>         () -> someOutput,
>>> >>>>>>>>         Executors.newSingleThreadExecutor());
>>> >>>>>>>>     outputReceiver.output(future);
>>> >>>>>>>>   }
>>> >>>>>>>> }
>>> >>>>>>>>
>>> >>>>>>>> The neat thing about this API is that the user can choose their
>>> own async framework and we only expect the output to be a CompletionStage.
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> For the implementation of bundling, can we compose a
>>> CompletableFuture from each element in the bundle, e.g.
>>> CompletableFuture.allOf(...), and then invoke @FinishBundle when this
>>> future is complete? Seems this might work.
>>> >>>>>>>>
>>> >>>>>>>> Thanks,
>>> >>>>>>>> Xinyu
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> [1]
>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>> >>>>>>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>> >>>>>>>>
>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <
>>> sniemitz@apache.org> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>> I'd love to see something like this as well.  Also +1 to
>>> process(@Element InputT element, @Output
>>> OutputReceiver<CompletionStage<OutputT>>).  I don't know if there's much
>>> benefit to passing a future in, since the framework itself could hook up
>>> the process function to complete when the future completes.
>>> >>>>>>>>>
>>> >>>>>>>>> I feel like I've spent a bunch of time writing very similar
>>> "kick off a future in ProcessElement, join it in FinishBundle" code, and
>>> looking around beam itself a lot of built-in transforms do it as well.
>>> Scio provides a few AsyncDoFn implementations [1] but it'd be great to see
>>> this as a first-class concept in beam itself.  Doing error handling,
>>> concurrency, etc correctly can be tricky.
>>> >>>>>>>>>
>>> >>>>>>>>> [1]
>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>> >>>>>>>>>
>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <
>>> klk@google.com> wrote:
>>> >>>>>>>>>>
>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the output
>>> should also be a CompletionStage<OutputT>, since all you should do is async
>>> chaining. We could enforce this by giving the DoFn an
>>> OutputReceiver(CompletionStage<OutputT>).
>>> >>>>>>>>>>
>>> >>>>>>>>>> Another possibility that might be even more robust against
>>> poor future use could be process(@Element InputT element, @Output
>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>> itself will be async chained, rather than counting on the user to do the
>>> right thing.
>>> >>>>>>>>>>
>>> >>>>>>>>>> We should see how these look in real use cases. The way that
>>> processing is split between @ProcessElement and @FinishBundle might
>>> complicate things.
>>> >>>>>>>>>>
>>> >>>>>>>>>> Kenn
>>> >>>>>>>>>>
>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <
>>> xinyuliu.us@gmail.com> wrote:
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Hi, guys,
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner, we
>>> got a lot of asks for an asynchronous processing API. There are a few
>>> reasons for these asks:
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> The users here are experienced in asynchronous programming.
>>> With async frameworks such as Netty and ParSeq and libs like async jersey
>>> client, they are able to make remote calls efficiently and the libraries
>>> help manage the execution threads underneath. Async remote calls are very
>>> common in most of our streaming applications today.
>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async
>>> processing helps for less resource usage and fast computation (less context
>>> switch).
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> I asked about the async support in a previous email thread.
>>> The following API was mentioned in the reply:
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>> >>>>>>>>>>>     @ProcessElement
>>> >>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>>> element, ...) {
>>> >>>>>>>>>>>       element.thenApply(...)
>>> >>>>>>>>>>>     }
>>> >>>>>>>>>>>   }
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> We are wondering whether there are any discussions on this
>>> API and related docs. It is awesome that you guys already considered having
>>> DoFn to process asynchronously. Out of curiosity, this API seems to create
>>> a CompletionState out of the input element (probably using framework's
>>> executor) and then allow user to chain on it. To us, it seems more
>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>> CompletionStage<OutputT> to invoke upon completion.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> We would like to discuss further on the async API and
>>> hopefully we will have a great support in Beam. Really appreciate the
>>> feedback!
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Thanks,
>>> >>>>>>>>>>> Xinyu
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> Got feedback? tinyurl.com/swegner-feedback
>>>
>>

Re: [DISCUSSION] ParDo Async Java API

Posted by Reuven Lax <re...@google.com>.
As Steve said, the main rationale for this is so that asynchronous IOs (or
in general, asynchronous remote calls) call be made. To some degree this
addresses Scott's concern: the asynchronous threads should be, for the most
part, simply waiting for IOs to complete; the reason to do the waiting
asynchronously is so that the main threadpool does not become blocked,
causing the pipeline to become IO bound. A runner like Dataflow should not
be tracking these threads for the purpose of autoscaling, as adding more
workers will (usually) not cause these calls to complete any faster.

Reuven

On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <sn...@apache.org> wrote:

> I think I agree with a lot of what you said here, I'm just going to
> restate my initial use-case to try to make it more clear as well.
>
> From my usage of beam, I feel like the big benefit of async DoFns would be
> to allow batched IO to be implemented more simply inside a DoFn.  Even in
> the Beam SDK itself, there are a lot of IOs that batch up IO operations in
> ProcessElement and wait for them to complete in FinishBundle ([1][2],
> etc).  From my experience, things like error handling, emitting outputs as
> the result of an asynchronous operation completing (in the correct window,
> with the correct timestamp, etc) get pretty tricky, and it would be great
> for the SDK to provide support natively for it.
>
> It's also probably good to point out that really only DoFns that do IO
> should be asynchronous, normal CPU bound DoFns have no reason to be
> asynchronous.
>
> A really good example of this is an IO I had written recently for
> Bigtable, it takes an input PCollection of ByteStrings representing row
> keys, and returns a PCollection of the row data from bigtable.  Naively
> this could be implemented by simply blocking on the Bigtable read inside
> the ParDo, however this would limit throughput substantially (even assuming
> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
> ParDo).  My implementation batches many reads together (as they arrive at
> the DoFn), executes them once the batch is big enough (or some time
> passes), and then emits them once the batch read completes.  Emitting them
> in the correct window and handling errors gets tricky, so this is certainly
> something I'd love the framework itself to handle.
>
> I also don't see a big benefit of making a DoFn receive a future, if all a
> user is ever supposed to do is attach a continuation to it, that could just
> as easily be done by the runner itself, basically just invoking the entire
> ParDo as a continuation on the future (which then assumes the runner is
> even representing these tasks as futures internally).
>
> Making the DoFn itself actually return a future could be an option, even
> if the language itself doesn't support something like `await`, you could
> still implement it yourself in the DoFn, however, it seems like it'd be a
> strange contrast to the non-async version, which returns void.
>
> [1]
> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
> [2]
> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
>
>
> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> If I understand correctly, the end goal is to process input elements
>> of a DoFn asynchronously. Were I to do this naively, I would implement
>> DoFns that simply take and receive [Serializable?]CompletionStages as
>> element types, followed by a DoFn that adds a callback to emit on
>> completion (possibly via a queue to avoid being-on-the-wrong-thread
>> issues) and whose finalize forces all completions. This would, of
>> course, interact poorly with processing time tracking, fusion breaks,
>> watermark tracking, counter attribution, window propagation, etc. so
>> it is desirable to make it part of the system itself.
>>
>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a decent
>> API. The invoking of the downstream process could be chained onto
>> this, with all the implicit tracking and tracing set up correctly.
>> Taking a CompletionStage as input means a DoFn would not have to
>> create its output CompletionStage ex nihilo and possibly allow for
>> better chaining (depending on the asynchronous APIs used).
>>
>> Even better might be to simply let the invocation of all
>> DoFn.process() methods be asynchronous, but as Java doesn't offer an
>> await primitive to relinquish control in the middle of a function body
>> this might be hard.
>>
>> I think for correctness, completion would have to be forced at the end
>> of each bundle. If your bundles are large enough, this may not be that
>> big of a deal. In this case you could also start executing subsequent
>> bundles while waiting for prior ones to complete.
>>
>>
>>
>>
>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
>> <co...@gmail.com> wrote:
>> >>
>> >> I'd love to see something like this as well.  Also +1 to
>> process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much
>> benefit to passing a future in, since the framework itself could hook up
>> the process function to complete when the future completes.
>> >
>> >
>> > One benefit we get by wrapping the input with CompletionStage is to
>> mandate[1] users to chain their processing logic to the input future;
>> thereby, ensuring asynchrony for the most part. However, it is still
>> possible for users to go out of their way and write blocking code.
>> >
>> > Although, I am not sure how counter intuitive it is for the runners to
>> wrap the input element into a future before passing it to the user code.
>> >
>> > Bharath
>> >
>> > [1] CompletionStage interface does not define methods for initially
>> creating, forcibly completing normally or exceptionally, probing completion
>> status or results, or awaiting completion of a stage. Implementations of
>> CompletionStage may provide means of achieving such effects, as appropriate
>> >
>> >
>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <ke...@apache.org>
>> wrote:
>> >>
>> >> I think your concerns are valid but i want to clarify about "first
>> class async APIs". Does "first class" mean that it is a well-encapsulated
>> abstraction? or does it mean that the user can more or less do whatever
>> they want? These are opposite but both valid meanings for "first class", to
>> me.
>> >>
>> >> I would not want to encourage users to do explicit multi-threaded
>> programming or control parallelism. Part of the point of Beam is to gain
>> big data parallelism without explicit multithreading. I see asynchronous
>> chaining of futures (or their best-approximation in your language of
>> choice) as a highly disciplined way of doing asynchronous dependency-driven
>> computation that is nonetheless conceptually, and readably, straight-line
>> code. Threads are not required nor the only way to execute this code. In
>> fact you might often want to execute without threading for a reference
>> implementation to provide canonically correct results. APIs that leak
>> lower-level details of threads are asking for trouble.
>> >>
>> >> One of our other ideas was to provide a dynamic parameter of type
>> ExecutorService. The SDK harness (pre-portability: the runner) would
>> control and observe parallelism while the user could simply register tasks.
>> Providing a future/promise API is even more disciplined.
>> >>
>> >> Kenn
>> >>
>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org>
>> wrote:
>> >>>
>> >>> A related question is how to make execution observable such that a
>> runner can make proper scaling decisions. Runners decide how to schedule
>> bundles within and across multiple worker instances, and can use
>> information about execution to make dynamic scaling decisions. First-class
>> async APIs seem like they would encourage DoFn authors to implement their
>> own parallelization, rather than deferring to the runner that should be
>> more capable of providing the right level of parallelism.
>> >>>
>> >>> In the Dataflow worker harness, we estimate execution time to
>> PTransform steps by sampling execution time on the execution thread and
>> attributing it to the currently invoked method. This approach is fairly
>> simple and possible because we assume that execution happens within the
>> thread controlled by the runner. Some DoFn's already implement their own
>> async logic and break this assumption; I would expect more if we make async
>> built into the DoFn APIs.
>> >>>
>> >>> So: this isn't an argument against async APIs, but rather: does this
>> break execution observability, and are there other lightweight mechanisms
>> for attributing execution time of async work?
>> >>>
>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <kl...@google.com>
>> wrote:
>> >>>>
>> >>>> When executed over the portable APIs, it will be primarily the Java
>> SDK harness that makes all of these decisions. If we wanted runners to have
>> some insight into it we would have to add it to the Beam model protos. I
>> don't have any suggestions there, so I would leave it out of this
>> discussion until there's good ideas. We could learn a lot by trying it out
>> just in the SDK harness.
>> >>>>
>> >>>> Kenn
>> >>>>
>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xi...@gmail.com>
>> wrote:
>> >>>>>
>> >>>>> I don't have a strong opinion on the resolution of the futures
>> regarding to @FinishBundle invocation. Leaving it to be unspecified does
>> give runners more room to implement it with their own support.
>> >>>>>
>> >>>>> Optimization is also another great point. Fuse seems pretty complex
>> to me too if we need to find a way to chain the resulting future into the
>> next transform, or leave the async transform as a standalone stage
>> initially?
>> >>>>>
>> >>>>> Btw, I was counting the number of replies before we hit the
>> portability. Seems after 4 replies fuse finally showed up :).
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Xinyu
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <kl...@google.com>
>> wrote:
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com wrote:
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xi...@gmail.com>
>> wrote:
>> >>>>>>>>
>> >>>>>>>> @Steve: it's good to see that this is going to be useful in your
>> use cases as well. Thanks for sharing the code from Scio! I can see in your
>> implementation that waiting for the future completion is part of the
>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>> async support so the user-level code won't need to implement this logic,
>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>> after future completion[1], and Flink also has AsyncFunction api [2] which
>> provides a ResultFuture similar to the API we discussed.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> Can this be done correctly? What I mean is that if the process
>> dies, can you guarantee that no data is lost? Beam currently guarantees
>> this for FinishBundle, but if you use an arbitrary async framework this
>> might not be true.
>> >>>>>>
>> >>>>>>
>> >>>>>> What a Beam runner guarantees is that *if* the bundle is
>> committed, *then* finishbundle has run. So it seems just as easy to say
>> *if* a bundle is committed, *then* every async result has been resolved.
>> >>>>>>
>> >>>>>> If the process dies the two cases should be naturally analogous.
>> >>>>>>
>> >>>>>> But it raises the question of whether they should be resolved
>> prior to finishbundle, after, or unspecified. I lean toward unspecified.
>> >>>>>>
>> >>>>>> That's for a single ParDo. Where this could get complex is
>> optimizing fused stages for greater asynchrony.
>> >>>>>>
>> >>>>>> Kenn
>> >>>>>>
>> >>>>>>>
>> >>>>>>>>
>> >>>>>>>> A simple use case for this is to execute a Runnable
>> asynchronously in user's own executor. The following code illustrates
>> Kenn's option #2, with a very simple single-thread pool being the executor:
>> >>>>>>>>
>> >>>>>>>> new DoFn<InputT, OutputT>() {
>> >>>>>>>>   @ProcessElement
>> >>>>>>>>   public void process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>> >>>>>>>>     CompletableFuture<OutputT> future =
>> CompletableFuture.supplyAsync(
>> >>>>>>>>         () -> someOutput,
>> >>>>>>>>         Executors.newSingleThreadExecutor());
>> >>>>>>>>     outputReceiver.output(future);
>> >>>>>>>>   }
>> >>>>>>>> }
>> >>>>>>>>
>> >>>>>>>> The neat thing about this API is that the user can choose their
>> own async framework and we only expect the output to be a CompletionStage.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> For the implementation of bundling, can we compose a
>> CompletableFuture from each element in the bundle, e.g.
>> CompletableFuture.allOf(...), and then invoke @FinishBundle when this
>> future is complete? Seems this might work.
>> >>>>>>>>
>> >>>>>>>> Thanks,
>> >>>>>>>> Xinyu
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> [1]
>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>> >>>>>>>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>> >>>>>>>>
>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <
>> sniemitz@apache.org> wrote:
>> >>>>>>>>>
>> >>>>>>>>> I'd love to see something like this as well.  Also +1 to
>> process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>>).  I don't know if there's much
>> benefit to passing a future in, since the framework itself could hook up
>> the process function to complete when the future completes.
>> >>>>>>>>>
>> >>>>>>>>> I feel like I've spent a bunch of time writing very similar
>> "kick off a future in ProcessElement, join it in FinishBundle" code, and
>> looking around beam itself a lot of built-in transforms do it as well.
>> Scio provides a few AsyncDoFn implementations [1] but it'd be great to see
>> this as a first-class concept in beam itself.  Doing error handling,
>> concurrency, etc correctly can be tricky.
>> >>>>>>>>>
>> >>>>>>>>> [1]
>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>> >>>>>>>>>
>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com>
>> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the output
>> should also be a CompletionStage<OutputT>, since all you should do is async
>> chaining. We could enforce this by giving the DoFn an
>> OutputReceiver(CompletionStage<OutputT>).
>> >>>>>>>>>>
>> >>>>>>>>>> Another possibility that might be even more robust against
>> poor future use could be process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>> itself will be async chained, rather than counting on the user to do the
>> right thing.
>> >>>>>>>>>>
>> >>>>>>>>>> We should see how these look in real use cases. The way that
>> processing is split between @ProcessElement and @FinishBundle might
>> complicate things.
>> >>>>>>>>>>
>> >>>>>>>>>> Kenn
>> >>>>>>>>>>
>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <
>> xinyuliu.us@gmail.com> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>> Hi, guys,
>> >>>>>>>>>>>
>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner, we got
>> a lot of asks for an asynchronous processing API. There are a few reasons
>> for these asks:
>> >>>>>>>>>>>
>> >>>>>>>>>>> The users here are experienced in asynchronous programming.
>> With async frameworks such as Netty and ParSeq and libs like async jersey
>> client, they are able to make remote calls efficiently and the libraries
>> help manage the execution threads underneath. Async remote calls are very
>> common in most of our streaming applications today.
>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async
>> processing helps for less resource usage and fast computation (less context
>> switch).
>> >>>>>>>>>>>
>> >>>>>>>>>>> I asked about the async support in a previous email thread.
>> The following API was mentioned in the reply:
>> >>>>>>>>>>>
>> >>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>> >>>>>>>>>>>     @ProcessElement
>> >>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>> element, ...) {
>> >>>>>>>>>>>       element.thenApply(...)
>> >>>>>>>>>>>     }
>> >>>>>>>>>>>   }
>> >>>>>>>>>>>
>> >>>>>>>>>>> We are wondering whether there are any discussions on this
>> API and related docs. It is awesome that you guys already considered having
>> DoFn to process asynchronously. Out of curiosity, this API seems to create
>> a CompletionState out of the input element (probably using framework's
>> executor) and then allow user to chain on it. To us, it seems more
>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>> CompletionStage<OutputT> to invoke upon completion.
>> >>>>>>>>>>>
>> >>>>>>>>>>> We would like to discuss further on the async API and
>> hopefully we will have a great support in Beam. Really appreciate the
>> feedback!
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks,
>> >>>>>>>>>>> Xinyu
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> Got feedback? tinyurl.com/swegner-feedback
>>
>

Re: [DISCUSSION] ParDo Async Java API

Posted by Steve Niemitz <sn...@apache.org>.
I think I agree with a lot of what you said here, I'm just going to restate
my initial use-case to try to make it more clear as well.

From my usage of beam, I feel like the big benefit of async DoFns would be
to allow batched IO to be implemented more simply inside a DoFn.  Even in
the Beam SDK itself, there are a lot of IOs that batch up IO operations in
ProcessElement and wait for them to complete in FinishBundle ([1][2],
etc).  From my experience, things like error handling, emitting outputs as
the result of an asynchronous operation completing (in the correct window,
with the correct timestamp, etc) get pretty tricky, and it would be great
for the SDK to provide support natively for it.

It's also probably good to point out that really only DoFns that do IO
should be asynchronous, normal CPU bound DoFns have no reason to be
asynchronous.

A really good example of this is an IO I had written recently for Bigtable,
it takes an input PCollection of ByteStrings representing row keys, and
returns a PCollection of the row data from bigtable.  Naively this could be
implemented by simply blocking on the Bigtable read inside the ParDo,
however this would limit throughput substantially (even assuming an avg
read latency is 1ms, thats still only 1000 QPS / instance of the ParDo).
My implementation batches many reads together (as they arrive at the DoFn),
executes them once the batch is big enough (or some time passes), and then
emits them once the batch read completes.  Emitting them in the correct
window and handling errors gets tricky, so this is certainly something I'd
love the framework itself to handle.

I also don't see a big benefit of making a DoFn receive a future, if all a
user is ever supposed to do is attach a continuation to it, that could just
as easily be done by the runner itself, basically just invoking the entire
ParDo as a continuation on the future (which then assumes the runner is
even representing these tasks as futures internally).

Making the DoFn itself actually return a future could be an option, even if
the language itself doesn't support something like `await`, you could still
implement it yourself in the DoFn, however, it seems like it'd be a strange
contrast to the non-async version, which returns void.

[1]
https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
[2]
https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080


On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <ro...@google.com> wrote:

> If I understand correctly, the end goal is to process input elements
> of a DoFn asynchronously. Were I to do this naively, I would implement
> DoFns that simply take and receive [Serializable?]CompletionStages as
> element types, followed by a DoFn that adds a callback to emit on
> completion (possibly via a queue to avoid being-on-the-wrong-thread
> issues) and whose finalize forces all completions. This would, of
> course, interact poorly with processing time tracking, fusion breaks,
> watermark tracking, counter attribution, window propagation, etc. so
> it is desirable to make it part of the system itself.
>
> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a decent
> API. The invoking of the downstream process could be chained onto
> this, with all the implicit tracking and tracing set up correctly.
> Taking a CompletionStage as input means a DoFn would not have to
> create its output CompletionStage ex nihilo and possibly allow for
> better chaining (depending on the asynchronous APIs used).
>
> Even better might be to simply let the invocation of all
> DoFn.process() methods be asynchronous, but as Java doesn't offer an
> await primitive to relinquish control in the middle of a function body
> this might be hard.
>
> I think for correctness, completion would have to be forced at the end
> of each bundle. If your bundles are large enough, this may not be that
> big of a deal. In this case you could also start executing subsequent
> bundles while waiting for prior ones to complete.
>
>
>
>
> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
> <co...@gmail.com> wrote:
> >>
> >> I'd love to see something like this as well.  Also +1 to
> process(@Element InputT element, @Output
> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much
> benefit to passing a future in, since the framework itself could hook up
> the process function to complete when the future completes.
> >
> >
> > One benefit we get by wrapping the input with CompletionStage is to
> mandate[1] users to chain their processing logic to the input future;
> thereby, ensuring asynchrony for the most part. However, it is still
> possible for users to go out of their way and write blocking code.
> >
> > Although, I am not sure how counter intuitive it is for the runners to
> wrap the input element into a future before passing it to the user code.
> >
> > Bharath
> >
> > [1] CompletionStage interface does not define methods for initially
> creating, forcibly completing normally or exceptionally, probing completion
> status or results, or awaiting completion of a stage. Implementations of
> CompletionStage may provide means of achieving such effects, as appropriate
> >
> >
> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <ke...@apache.org>
> wrote:
> >>
> >> I think your concerns are valid but i want to clarify about "first
> class async APIs". Does "first class" mean that it is a well-encapsulated
> abstraction? or does it mean that the user can more or less do whatever
> they want? These are opposite but both valid meanings for "first class", to
> me.
> >>
> >> I would not want to encourage users to do explicit multi-threaded
> programming or control parallelism. Part of the point of Beam is to gain
> big data parallelism without explicit multithreading. I see asynchronous
> chaining of futures (or their best-approximation in your language of
> choice) as a highly disciplined way of doing asynchronous dependency-driven
> computation that is nonetheless conceptually, and readably, straight-line
> code. Threads are not required nor the only way to execute this code. In
> fact you might often want to execute without threading for a reference
> implementation to provide canonically correct results. APIs that leak
> lower-level details of threads are asking for trouble.
> >>
> >> One of our other ideas was to provide a dynamic parameter of type
> ExecutorService. The SDK harness (pre-portability: the runner) would
> control and observe parallelism while the user could simply register tasks.
> Providing a future/promise API is even more disciplined.
> >>
> >> Kenn
> >>
> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org> wrote:
> >>>
> >>> A related question is how to make execution observable such that a
> runner can make proper scaling decisions. Runners decide how to schedule
> bundles within and across multiple worker instances, and can use
> information about execution to make dynamic scaling decisions. First-class
> async APIs seem like they would encourage DoFn authors to implement their
> own parallelization, rather than deferring to the runner that should be
> more capable of providing the right level of parallelism.
> >>>
> >>> In the Dataflow worker harness, we estimate execution time to
> PTransform steps by sampling execution time on the execution thread and
> attributing it to the currently invoked method. This approach is fairly
> simple and possible because we assume that execution happens within the
> thread controlled by the runner. Some DoFn's already implement their own
> async logic and break this assumption; I would expect more if we make async
> built into the DoFn APIs.
> >>>
> >>> So: this isn't an argument against async APIs, but rather: does this
> break execution observability, and are there other lightweight mechanisms
> for attributing execution time of async work?
> >>>
> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <kl...@google.com>
> wrote:
> >>>>
> >>>> When executed over the portable APIs, it will be primarily the Java
> SDK harness that makes all of these decisions. If we wanted runners to have
> some insight into it we would have to add it to the Beam model protos. I
> don't have any suggestions there, so I would leave it out of this
> discussion until there's good ideas. We could learn a lot by trying it out
> just in the SDK harness.
> >>>>
> >>>> Kenn
> >>>>
> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xi...@gmail.com>
> wrote:
> >>>>>
> >>>>> I don't have a strong opinion on the resolution of the futures
> regarding to @FinishBundle invocation. Leaving it to be unspecified does
> give runners more room to implement it with their own support.
> >>>>>
> >>>>> Optimization is also another great point. Fuse seems pretty complex
> to me too if we need to find a way to chain the resulting future into the
> next transform, or leave the async transform as a standalone stage
> initially?
> >>>>>
> >>>>> Btw, I was counting the number of replies before we hit the
> portability. Seems after 4 replies fuse finally showed up :).
> >>>>>
> >>>>> Thanks,
> >>>>> Xinyu
> >>>>>
> >>>>>
> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <kl...@google.com>
> wrote:
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com wrote:
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xi...@gmail.com>
> wrote:
> >>>>>>>>
> >>>>>>>> @Steve: it's good to see that this is going to be useful in your
> use cases as well. Thanks for sharing the code from Scio! I can see in your
> implementation that waiting for the future completion is part of the
> @FinishBundle. We are thinking of taking advantage of the underlying runner
> async support so the user-level code won't need to implement this logic,
> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
> after future completion[1], and Flink also has AsyncFunction api [2] which
> provides a ResultFuture similar to the API we discussed.
> >>>>>>>
> >>>>>>>
> >>>>>>> Can this be done correctly? What I mean is that if the process
> dies, can you guarantee that no data is lost? Beam currently guarantees
> this for FinishBundle, but if you use an arbitrary async framework this
> might not be true.
> >>>>>>
> >>>>>>
> >>>>>> What a Beam runner guarantees is that *if* the bundle is committed,
> *then* finishbundle has run. So it seems just as easy to say *if* a bundle
> is committed, *then* every async result has been resolved.
> >>>>>>
> >>>>>> If the process dies the two cases should be naturally analogous.
> >>>>>>
> >>>>>> But it raises the question of whether they should be resolved prior
> to finishbundle, after, or unspecified. I lean toward unspecified.
> >>>>>>
> >>>>>> That's for a single ParDo. Where this could get complex is
> optimizing fused stages for greater asynchrony.
> >>>>>>
> >>>>>> Kenn
> >>>>>>
> >>>>>>>
> >>>>>>>>
> >>>>>>>> A simple use case for this is to execute a Runnable
> asynchronously in user's own executor. The following code illustrates
> Kenn's option #2, with a very simple single-thread pool being the executor:
> >>>>>>>>
> >>>>>>>> new DoFn<InputT, OutputT>() {
> >>>>>>>>   @ProcessElement
> >>>>>>>>   public void process(@Element InputT element, @Output
> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
> >>>>>>>>     CompletableFuture<OutputT> future =
> CompletableFuture.supplyAsync(
> >>>>>>>>         () -> someOutput,
> >>>>>>>>         Executors.newSingleThreadExecutor());
> >>>>>>>>     outputReceiver.output(future);
> >>>>>>>>   }
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>> The neat thing about this API is that the user can choose their
> own async framework and we only expect the output to be a CompletionStage.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> For the implementation of bundling, can we compose a
> CompletableFuture from each element in the bundle, e.g.
> CompletableFuture.allOf(...), and then invoke @FinishBundle when this
> future is complete? Seems this might work.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Xinyu
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> [1]
> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
> >>>>>>>> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
> >>>>>>>>
> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <
> sniemitz@apache.org> wrote:
> >>>>>>>>>
> >>>>>>>>> I'd love to see something like this as well.  Also +1 to
> process(@Element InputT element, @Output
> OutputReceiver<CompletionStage<OutputT>>).  I don't know if there's much
> benefit to passing a future in, since the framework itself could hook up
> the process function to complete when the future completes.
> >>>>>>>>>
> >>>>>>>>> I feel like I've spent a bunch of time writing very similar
> "kick off a future in ProcessElement, join it in FinishBundle" code, and
> looking around beam itself a lot of built-in transforms do it as well.
> Scio provides a few AsyncDoFn implementations [1] but it'd be great to see
> this as a first-class concept in beam itself.  Doing error handling,
> concurrency, etc correctly can be tricky.
> >>>>>>>>>
> >>>>>>>>> [1]
> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
> >>>>>>>>>
> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com>
> wrote:
> >>>>>>>>>>
> >>>>>>>>>> If the input is a CompletionStage<InputT> then the output
> should also be a CompletionStage<OutputT>, since all you should do is async
> chaining. We could enforce this by giving the DoFn an
> OutputReceiver(CompletionStage<OutputT>).
> >>>>>>>>>>
> >>>>>>>>>> Another possibility that might be even more robust against poor
> future use could be process(@Element InputT element, @Output
> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
> itself will be async chained, rather than counting on the user to do the
> right thing.
> >>>>>>>>>>
> >>>>>>>>>> We should see how these look in real use cases. The way that
> processing is split between @ProcessElement and @FinishBundle might
> complicate things.
> >>>>>>>>>>
> >>>>>>>>>> Kenn
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <
> xinyuliu.us@gmail.com> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi, guys,
> >>>>>>>>>>>
> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner, we got
> a lot of asks for an asynchronous processing API. There are a few reasons
> for these asks:
> >>>>>>>>>>>
> >>>>>>>>>>> The users here are experienced in asynchronous programming.
> With async frameworks such as Netty and ParSeq and libs like async jersey
> client, they are able to make remote calls efficiently and the libraries
> help manage the execution threads underneath. Async remote calls are very
> common in most of our streaming applications today.
> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async
> processing helps for less resource usage and fast computation (less context
> switch).
> >>>>>>>>>>>
> >>>>>>>>>>> I asked about the async support in a previous email thread.
> The following API was mentioned in the reply:
> >>>>>>>>>>>
> >>>>>>>>>>>   new DoFn<InputT, OutputT>() {
> >>>>>>>>>>>     @ProcessElement
> >>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
> element, ...) {
> >>>>>>>>>>>       element.thenApply(...)
> >>>>>>>>>>>     }
> >>>>>>>>>>>   }
> >>>>>>>>>>>
> >>>>>>>>>>> We are wondering whether there are any discussions on this API
> and related docs. It is awesome that you guys already considered having
> DoFn to process asynchronously. Out of curiosity, this API seems to create
> a CompletionState out of the input element (probably using framework's
> executor) and then allow user to chain on it. To us, it seems more
> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
> CompletionStage<OutputT> to invoke upon completion.
> >>>>>>>>>>>
> >>>>>>>>>>> We would like to discuss further on the async API and
> hopefully we will have a great support in Beam. Really appreciate the
> feedback!
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Xinyu
> >>>
> >>>
> >>>
> >>> --
> >>>
> >>>
> >>>
> >>>
> >>> Got feedback? tinyurl.com/swegner-feedback
>

Re: [DISCUSSION] ParDo Async Java API

Posted by Robert Bradshaw <ro...@google.com>.
If I understand correctly, the end goal is to process input elements
of a DoFn asynchronously. Were I to do this naively, I would implement
DoFns that simply take and receive [Serializable?]CompletionStages as
element types, followed by a DoFn that adds a callback to emit on
completion (possibly via a queue to avoid being-on-the-wrong-thread
issues) and whose finalize forces all completions. This would, of
course, interact poorly with processing time tracking, fusion breaks,
watermark tracking, counter attribution, window propagation, etc. so
it is desirable to make it part of the system itself.

Taking a OutputReceiver<CompletionStage<OutputT>> seems like a decent
API. The invoking of the downstream process could be chained onto
this, with all the implicit tracking and tracing set up correctly.
Taking a CompletionStage as input means a DoFn would not have to
create its output CompletionStage ex nihilo and possibly allow for
better chaining (depending on the asynchronous APIs used).

Even better might be to simply let the invocation of all
DoFn.process() methods be asynchronous, but as Java doesn't offer an
await primitive to relinquish control in the middle of a function body
this might be hard.

I think for correctness, completion would have to be forced at the end
of each bundle. If your bundles are large enough, this may not be that
big of a deal. In this case you could also start executing subsequent
bundles while waiting for prior ones to complete.




On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
<co...@gmail.com> wrote:
>>
>> I'd love to see something like this as well.  Also +1 to process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>>). I don't know if there's much benefit to passing a future in, since the framework itself could hook up the process function to complete when the future completes.
>
>
> One benefit we get by wrapping the input with CompletionStage is to mandate[1] users to chain their processing logic to the input future; thereby, ensuring asynchrony for the most part. However, it is still possible for users to go out of their way and write blocking code.
>
> Although, I am not sure how counter intuitive it is for the runners to wrap the input element into a future before passing it to the user code.
>
> Bharath
>
> [1] CompletionStage interface does not define methods for initially creating, forcibly completing normally or exceptionally, probing completion status or results, or awaiting completion of a stage. Implementations of CompletionStage may provide means of achieving such effects, as appropriate
>
>
> On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <ke...@apache.org> wrote:
>>
>> I think your concerns are valid but i want to clarify about "first class async APIs". Does "first class" mean that it is a well-encapsulated abstraction? or does it mean that the user can more or less do whatever they want? These are opposite but both valid meanings for "first class", to me.
>>
>> I would not want to encourage users to do explicit multi-threaded programming or control parallelism. Part of the point of Beam is to gain big data parallelism without explicit multithreading. I see asynchronous chaining of futures (or their best-approximation in your language of choice) as a highly disciplined way of doing asynchronous dependency-driven computation that is nonetheless conceptually, and readably, straight-line code. Threads are not required nor the only way to execute this code. In fact you might often want to execute without threading for a reference implementation to provide canonically correct results. APIs that leak lower-level details of threads are asking for trouble.
>>
>> One of our other ideas was to provide a dynamic parameter of type ExecutorService. The SDK harness (pre-portability: the runner) would control and observe parallelism while the user could simply register tasks. Providing a future/promise API is even more disciplined.
>>
>> Kenn
>>
>> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org> wrote:
>>>
>>> A related question is how to make execution observable such that a runner can make proper scaling decisions. Runners decide how to schedule bundles within and across multiple worker instances, and can use information about execution to make dynamic scaling decisions. First-class async APIs seem like they would encourage DoFn authors to implement their own parallelization, rather than deferring to the runner that should be more capable of providing the right level of parallelism.
>>>
>>> In the Dataflow worker harness, we estimate execution time to PTransform steps by sampling execution time on the execution thread and attributing it to the currently invoked method. This approach is fairly simple and possible because we assume that execution happens within the thread controlled by the runner. Some DoFn's already implement their own async logic and break this assumption; I would expect more if we make async built into the DoFn APIs.
>>>
>>> So: this isn't an argument against async APIs, but rather: does this break execution observability, and are there other lightweight mechanisms for attributing execution time of async work?
>>>
>>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <kl...@google.com> wrote:
>>>>
>>>> When executed over the portable APIs, it will be primarily the Java SDK harness that makes all of these decisions. If we wanted runners to have some insight into it we would have to add it to the Beam model protos. I don't have any suggestions there, so I would leave it out of this discussion until there's good ideas. We could learn a lot by trying it out just in the SDK harness.
>>>>
>>>> Kenn
>>>>
>>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xi...@gmail.com> wrote:
>>>>>
>>>>> I don't have a strong opinion on the resolution of the futures regarding to @FinishBundle invocation. Leaving it to be unspecified does give runners more room to implement it with their own support.
>>>>>
>>>>> Optimization is also another great point. Fuse seems pretty complex to me too if we need to find a way to chain the resulting future into the next transform, or leave the async transform as a standalone stage initially?
>>>>>
>>>>> Btw, I was counting the number of replies before we hit the portability. Seems after 4 replies fuse finally showed up :).
>>>>>
>>>>> Thanks,
>>>>> Xinyu
>>>>>
>>>>>
>>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <kl...@google.com> wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xi...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> @Steve: it's good to see that this is going to be useful in your use cases as well. Thanks for sharing the code from Scio! I can see in your implementation that waiting for the future completion is part of the @FinishBundle. We are thinking of taking advantage of the underlying runner async support so the user-level code won't need to implement this logic, e.g. Samza has an AsyncSteamTask api that provides a callback to invoke after future completion[1], and Flink also has AsyncFunction api [2] which provides a ResultFuture similar to the API we discussed.
>>>>>>>
>>>>>>>
>>>>>>> Can this be done correctly? What I mean is that if the process dies, can you guarantee that no data is lost? Beam currently guarantees this for FinishBundle, but if you use an arbitrary async framework this might not be true.
>>>>>>
>>>>>>
>>>>>> What a Beam runner guarantees is that *if* the bundle is committed, *then* finishbundle has run. So it seems just as easy to say *if* a bundle is committed, *then* every async result has been resolved.
>>>>>>
>>>>>> If the process dies the two cases should be naturally analogous.
>>>>>>
>>>>>> But it raises the question of whether they should be resolved prior to finishbundle, after, or unspecified. I lean toward unspecified.
>>>>>>
>>>>>> That's for a single ParDo. Where this could get complex is optimizing fused stages for greater asynchrony.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> A simple use case for this is to execute a Runnable asynchronously in user's own executor. The following code illustrates Kenn's option #2, with a very simple single-thread pool being the executor:
>>>>>>>>
>>>>>>>> new DoFn<InputT, OutputT>() {
>>>>>>>>   @ProcessElement
>>>>>>>>   public void process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>>>>>>>     CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
>>>>>>>>         () -> someOutput,
>>>>>>>>         Executors.newSingleThreadExecutor());
>>>>>>>>     outputReceiver.output(future);
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> The neat thing about this API is that the user can choose their own async framework and we only expect the output to be a CompletionStage.
>>>>>>>>
>>>>>>>>
>>>>>>>> For the implementation of bundling, can we compose a CompletableFuture from each element in the bundle, e.g. CompletableFuture.allOf(...), and then invoke @FinishBundle when this future is complete? Seems this might work.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Xinyu
>>>>>>>>
>>>>>>>>
>>>>>>>> [1] https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>>>>>>
>>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sn...@apache.org> wrote:
>>>>>>>>>
>>>>>>>>> I'd love to see something like this as well.  Also +1 to process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I don't know if there's much benefit to passing a future in, since the framework itself could hook up the process function to complete when the future completes.
>>>>>>>>>
>>>>>>>>> I feel like I've spent a bunch of time writing very similar "kick off a future in ProcessElement, join it in FinishBundle" code, and looking around beam itself a lot of built-in transforms do it as well.  Scio provides a few AsyncDoFn implementations [1] but it'd be great to see this as a first-class concept in beam itself.  Doing error handling, concurrency, etc correctly can be tricky.
>>>>>>>>>
>>>>>>>>> [1] https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>>>>>>>
>>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>> If the input is a CompletionStage<InputT> then the output should also be a CompletionStage<OutputT>, since all you should do is async chaining. We could enforce this by giving the DoFn an OutputReceiver(CompletionStage<OutputT>).
>>>>>>>>>>
>>>>>>>>>> Another possibility that might be even more robust against poor future use could be process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>>). In this way, the process method itself will be async chained, rather than counting on the user to do the right thing.
>>>>>>>>>>
>>>>>>>>>> We should see how these look in real use cases. The way that processing is split between @ProcessElement and @FinishBundle might complicate things.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xi...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi, guys,
>>>>>>>>>>>
>>>>>>>>>>> As more users try out Beam running on the SamzaRunner, we got a lot of asks for an asynchronous processing API. There are a few reasons for these asks:
>>>>>>>>>>>
>>>>>>>>>>> The users here are experienced in asynchronous programming. With async frameworks such as Netty and ParSeq and libs like async jersey client, they are able to make remote calls efficiently and the libraries help manage the execution threads underneath. Async remote calls are very common in most of our streaming applications today.
>>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async processing helps for less resource usage and fast computation (less context switch).
>>>>>>>>>>>
>>>>>>>>>>> I asked about the async support in a previous email thread. The following API was mentioned in the reply:
>>>>>>>>>>>
>>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>>>     @ProcessElement
>>>>>>>>>>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>>     }
>>>>>>>>>>>   }
>>>>>>>>>>>
>>>>>>>>>>> We are wondering whether there are any discussions on this API and related docs. It is awesome that you guys already considered having DoFn to process asynchronously. Out of curiosity, this API seems to create a CompletionState out of the input element (probably using framework's executor) and then allow user to chain on it. To us, it seems more convenient if the DoFn output a CompletionStage<OutputT> or pass in a CompletionStage<OutputT> to invoke upon completion.
>>>>>>>>>>>
>>>>>>>>>>> We would like to discuss further on the async API and hopefully we will have a great support in Beam. Really appreciate the feedback!
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Xinyu
>>>
>>>
>>>
>>> --
>>>
>>>
>>>
>>>
>>> Got feedback? tinyurl.com/swegner-feedback

Re: [DISCUSSION] ParDo Async Java API

Posted by Bharath Kumara Subramanian <co...@gmail.com>.
>
> I'd love to see something like this as well.  Also +1 to process(@Element
> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>). I
> don't know if there's much benefit to passing a future in, since the
> framework itself could hook up the process function to complete when the
> future completes.
>

One benefit we get by wrapping the input with CompletionStage is to
mandate[1] users to chain their processing logic to the input future;
thereby, ensuring asynchrony for the most part. However, it is still
possible for users to go out of their way and write blocking code.

Although, I am not sure how counter intuitive it is for the runners to wrap
the input element into a future before passing it to the user code.

Bharath

[1] CompletionStage interface does not define methods for initially
creating, forcibly completing normally or exceptionally, probing completion
status or results, or awaiting completion of a stage. Implementations of
CompletionStage may provide means of achieving such effects, as appropriate


On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <ke...@apache.org> wrote:

> I think your concerns are valid but i want to clarify about "first class
> async APIs". Does "first class" mean that it is a well-encapsulated
> abstraction? or does it mean that the user can more or less do whatever
> they want? These are opposite but both valid meanings for "first class", to
> me.
>
> I would not want to encourage users to do explicit multi-threaded
> programming or control parallelism. Part of the point of Beam is to gain
> big data parallelism without explicit multithreading. I see asynchronous
> chaining of futures (or their best-approximation in your language of
> choice) as a highly disciplined way of doing asynchronous dependency-driven
> computation that is nonetheless conceptually, and readably, straight-line
> code. Threads are not required nor the only way to execute this code. In
> fact you might often want to execute without threading for a reference
> implementation to provide canonically correct results. APIs that leak
> lower-level details of threads are asking for trouble.
>
> One of our other ideas was to provide a dynamic parameter of type
> ExecutorService. The SDK harness (pre-portability: the runner) would
> control and observe parallelism while the user could simply register tasks.
> Providing a future/promise API is even more disciplined.
>
> Kenn
>
> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org> wrote:
>
>> A related question is how to make execution observable such that a runner
>> can make proper scaling decisions. Runners decide how to schedule bundles
>> within and across multiple worker instances, and can use information about
>> execution to make dynamic scaling decisions. First-class async APIs seem
>> like they would encourage DoFn authors to implement their own
>> parallelization, rather than deferring to the runner that should be more
>> capable of providing the right level of parallelism.
>>
>> In the Dataflow worker harness, we estimate execution time to PTransform
>> steps by sampling execution time on the execution thread and attributing it
>> to the currently invoked method. This approach is fairly simple and
>> possible because we assume that execution happens within the thread
>> controlled by the runner. Some DoFn's already implement their own async
>> logic and break this assumption; I would expect more if we make async built
>> into the DoFn APIs.
>>
>> So: this isn't an argument against async APIs, but rather: does this
>> break execution observability, and are there other lightweight mechanisms
>> for attributing execution time of async work?
>>
>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <kl...@google.com> wrote:
>>
>>> When executed over the portable APIs, it will be primarily the Java SDK
>>> harness that makes all of these decisions. If we wanted runners to have
>>> some insight into it we would have to add it to the Beam model protos. I
>>> don't have any suggestions there, so I would leave it out of this
>>> discussion until there's good ideas. We could learn a lot by trying it out
>>> just in the SDK harness.
>>>
>>> Kenn
>>>
>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xi...@gmail.com> wrote:
>>>
>>>> I don't have a strong opinion on the resolution of the futures
>>>> regarding to @FinishBundle invocation. Leaving it to be unspecified does
>>>> give runners more room to implement it with their own support.
>>>>
>>>> Optimization is also another great point. Fuse seems pretty complex to
>>>> me too if we need to find a way to chain the resulting future into the next
>>>> transform, or leave the async transform as a standalone stage initially?
>>>>
>>>> Btw, I was counting the number of replies before we hit the
>>>> portability. Seems after 4 replies fuse finally showed up :).
>>>>
>>>> Thanks,
>>>> Xinyu
>>>>
>>>>
>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <kl...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> @Steve: it's good to see that this is going to be useful in your use
>>>>>>> cases as well. Thanks for sharing the code from Scio! I can see in your
>>>>>>> implementation that waiting for the future completion is part of the
>>>>>>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>>>>>>> async support so the user-level code won't need to implement this logic,
>>>>>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>>>>>>> after future completion[1], and Flink also has AsyncFunction api [2] which
>>>>>>> provides a ResultFuture similar to the API we discussed.
>>>>>>>
>>>>>>
>>>>>> Can this be done correctly? What I mean is that if the process dies,
>>>>>> can you guarantee that no data is lost? Beam currently guarantees this for
>>>>>> FinishBundle, but if you use an arbitrary async framework this might not be
>>>>>> true.
>>>>>>
>>>>>
>>>>> What a Beam runner guarantees is that *if* the bundle is committed,
>>>>> *then* finishbundle has run. So it seems just as easy to say *if* a bundle
>>>>> is committed, *then* every async result has been resolved.
>>>>>
>>>>> If the process dies the two cases should be naturally analogous.
>>>>>
>>>>> But it raises the question of whether they should be resolved prior to
>>>>> finishbundle, after, or unspecified. I lean toward unspecified.
>>>>>
>>>>> That's for a single ParDo. Where this could get complex is optimizing
>>>>> fused stages for greater asynchrony.
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>>
>>>>>>> A simple use case for this is to execute a Runnable asynchronously
>>>>>>> in user's own executor. The following code illustrates Kenn's option #2,
>>>>>>> with a very simple single-thread pool being the executor:
>>>>>>>
>>>>>>> new DoFn<InputT, OutputT>() {
>>>>>>>   @ProcessElement
>>>>>>>   public void process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>>>>>>     CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
>>>>>>>         () -> someOutput,
>>>>>>>         Executors.newSingleThreadExecutor());
>>>>>>>     outputReceiver.output(future);
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> The neat thing about this API is that the user can choose their own async framework and we only expect the output to be a CompletionStage.
>>>>>>>
>>>>>>>
>>>>>>> For the implementation of bundling, can we compose a CompletableFuture from each element in the bundle, e.g. CompletableFuture.allOf(...), and then invoke @FinishBundle when this future is complete? Seems this might work.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Xinyu
>>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>>>>>
>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sn...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'd love to see something like this as well.  Also +1 to process(@Element
>>>>>>>> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I
>>>>>>>> don't know if there's much benefit to passing a future in, since the
>>>>>>>> framework itself could hook up the process function to complete when the
>>>>>>>> future completes.
>>>>>>>>
>>>>>>>> I feel like I've spent a bunch of time writing very similar "kick
>>>>>>>> off a future in ProcessElement, join it in FinishBundle" code, and looking
>>>>>>>> around beam itself a lot of built-in transforms do it as well.  Scio
>>>>>>>> provides a few AsyncDoFn implementations [1] but it'd be great to see this
>>>>>>>> as a first-class concept in beam itself.  Doing error handling,
>>>>>>>> concurrency, etc correctly can be tricky.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>>>>>>
>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> If the input is a CompletionStage<InputT> then the output should
>>>>>>>>> also be a CompletionStage<OutputT>, since all you should do is async
>>>>>>>>> chaining. We could enforce this by giving the DoFn an
>>>>>>>>> OutputReceiver(CompletionStage<OutputT>).
>>>>>>>>>
>>>>>>>>> Another possibility that might be even more robust against poor
>>>>>>>>> future use could be process(@Element InputT element, @Output
>>>>>>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>>>>>>>> itself will be async chained, rather than counting on the user to do the
>>>>>>>>> right thing.
>>>>>>>>>
>>>>>>>>> We should see how these look in real use cases. The way that
>>>>>>>>> processing is split between @ProcessElement and @FinishBundle might
>>>>>>>>> complicate things.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xi...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi, guys,
>>>>>>>>>>
>>>>>>>>>> As more users try out Beam running on the SamzaRunner, we got a
>>>>>>>>>> lot of asks for an asynchronous processing API. There are a few reasons for
>>>>>>>>>> these asks:
>>>>>>>>>>
>>>>>>>>>>    - The users here are experienced in asynchronous programming.
>>>>>>>>>>    With async frameworks such as Netty and ParSeq and libs like async jersey
>>>>>>>>>>    client, they are able to make remote calls efficiently and the libraries
>>>>>>>>>>    help manage the execution threads underneath. Async remote calls are very
>>>>>>>>>>    common in most of our streaming applications today.
>>>>>>>>>>    - Many jobs are running on a multi-tenancy cluster. Async
>>>>>>>>>>    processing helps for less resource usage and fast computation (less context
>>>>>>>>>>    switch).
>>>>>>>>>>
>>>>>>>>>> I asked about the async support in a previous email thread. The
>>>>>>>>>> following API was mentioned in the reply:
>>>>>>>>>>
>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>>     @ProcessElement
>>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>>> ...) {
>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>     }
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> We are wondering whether there are any discussions on this API
>>>>>>>>>> and related docs. It is awesome that you guys already considered having
>>>>>>>>>> DoFn to process asynchronously. Out of curiosity, this API seems to create
>>>>>>>>>> a CompletionState out of the input element (probably using framework's
>>>>>>>>>> executor) and then allow user to chain on it. To us, it seems more
>>>>>>>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>>>>>>>>> CompletionStage<OutputT> to invoke upon completion.
>>>>>>>>>>
>>>>>>>>>> We would like to discuss further on the async API and hopefully
>>>>>>>>>> we will have a great support in Beam. Really appreciate the feedback!
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Xinyu
>>>>>>>>>>
>>>>>>>>>
>>
>> --
>>
>>
>>
>>
>> Got feedback? tinyurl.com/swegner-feedback
>>
>

Re: [DISCUSSION] ParDo Async Java API

Posted by Kenneth Knowles <ke...@apache.org>.
I think your concerns are valid but i want to clarify about "first class
async APIs". Does "first class" mean that it is a well-encapsulated
abstraction? or does it mean that the user can more or less do whatever
they want? These are opposite but both valid meanings for "first class", to
me.

I would not want to encourage users to do explicit multi-threaded
programming or control parallelism. Part of the point of Beam is to gain
big data parallelism without explicit multithreading. I see asynchronous
chaining of futures (or their best-approximation in your language of
choice) as a highly disciplined way of doing asynchronous dependency-driven
computation that is nonetheless conceptually, and readably, straight-line
code. Threads are not required nor the only way to execute this code. In
fact you might often want to execute without threading for a reference
implementation to provide canonically correct results. APIs that leak
lower-level details of threads are asking for trouble.

One of our other ideas was to provide a dynamic parameter of type
ExecutorService. The SDK harness (pre-portability: the runner) would
control and observe parallelism while the user could simply register tasks.
Providing a future/promise API is even more disciplined.

Kenn

On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org> wrote:

> A related question is how to make execution observable such that a runner
> can make proper scaling decisions. Runners decide how to schedule bundles
> within and across multiple worker instances, and can use information about
> execution to make dynamic scaling decisions. First-class async APIs seem
> like they would encourage DoFn authors to implement their own
> parallelization, rather than deferring to the runner that should be more
> capable of providing the right level of parallelism.
>
> In the Dataflow worker harness, we estimate execution time to PTransform
> steps by sampling execution time on the execution thread and attributing it
> to the currently invoked method. This approach is fairly simple and
> possible because we assume that execution happens within the thread
> controlled by the runner. Some DoFn's already implement their own async
> logic and break this assumption; I would expect more if we make async built
> into the DoFn APIs.
>
> So: this isn't an argument against async APIs, but rather: does this break
> execution observability, and are there other lightweight mechanisms for
> attributing execution time of async work?
>
> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <kl...@google.com> wrote:
>
>> When executed over the portable APIs, it will be primarily the Java SDK
>> harness that makes all of these decisions. If we wanted runners to have
>> some insight into it we would have to add it to the Beam model protos. I
>> don't have any suggestions there, so I would leave it out of this
>> discussion until there's good ideas. We could learn a lot by trying it out
>> just in the SDK harness.
>>
>> Kenn
>>
>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xi...@gmail.com> wrote:
>>
>>> I don't have a strong opinion on the resolution of the futures regarding
>>> to @FinishBundle invocation. Leaving it to be unspecified does give runners
>>> more room to implement it with their own support.
>>>
>>> Optimization is also another great point. Fuse seems pretty complex to
>>> me too if we need to find a way to chain the resulting future into the next
>>> transform, or leave the async transform as a standalone stage initially?
>>>
>>> Btw, I was counting the number of replies before we hit the portability.
>>> Seems after 4 replies fuse finally showed up :).
>>>
>>> Thanks,
>>> Xinyu
>>>
>>>
>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <kl...@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> @Steve: it's good to see that this is going to be useful in your use
>>>>>> cases as well. Thanks for sharing the code from Scio! I can see in your
>>>>>> implementation that waiting for the future completion is part of the
>>>>>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>>>>>> async support so the user-level code won't need to implement this logic,
>>>>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>>>>>> after future completion[1], and Flink also has AsyncFunction api [2] which
>>>>>> provides a ResultFuture similar to the API we discussed.
>>>>>>
>>>>>
>>>>> Can this be done correctly? What I mean is that if the process dies,
>>>>> can you guarantee that no data is lost? Beam currently guarantees this for
>>>>> FinishBundle, but if you use an arbitrary async framework this might not be
>>>>> true.
>>>>>
>>>>
>>>> What a Beam runner guarantees is that *if* the bundle is committed,
>>>> *then* finishbundle has run. So it seems just as easy to say *if* a bundle
>>>> is committed, *then* every async result has been resolved.
>>>>
>>>> If the process dies the two cases should be naturally analogous.
>>>>
>>>> But it raises the question of whether they should be resolved prior to
>>>> finishbundle, after, or unspecified. I lean toward unspecified.
>>>>
>>>> That's for a single ParDo. Where this could get complex is optimizing
>>>> fused stages for greater asynchrony.
>>>>
>>>> Kenn
>>>>
>>>>
>>>>>
>>>>>> A simple use case for this is to execute a Runnable asynchronously in
>>>>>> user's own executor. The following code illustrates Kenn's option #2, with
>>>>>> a very simple single-thread pool being the executor:
>>>>>>
>>>>>> new DoFn<InputT, OutputT>() {
>>>>>>   @ProcessElement
>>>>>>   public void process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>>>>>     CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
>>>>>>         () -> someOutput,
>>>>>>         Executors.newSingleThreadExecutor());
>>>>>>     outputReceiver.output(future);
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> The neat thing about this API is that the user can choose their own async framework and we only expect the output to be a CompletionStage.
>>>>>>
>>>>>>
>>>>>> For the implementation of bundling, can we compose a CompletableFuture from each element in the bundle, e.g. CompletableFuture.allOf(...), and then invoke @FinishBundle when this future is complete? Seems this might work.
>>>>>>
>>>>>> Thanks,
>>>>>> Xinyu
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>>>>
>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sn...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I'd love to see something like this as well.  Also +1 to process(@Element
>>>>>>> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I
>>>>>>> don't know if there's much benefit to passing a future in, since the
>>>>>>> framework itself could hook up the process function to complete when the
>>>>>>> future completes.
>>>>>>>
>>>>>>> I feel like I've spent a bunch of time writing very similar "kick
>>>>>>> off a future in ProcessElement, join it in FinishBundle" code, and looking
>>>>>>> around beam itself a lot of built-in transforms do it as well.  Scio
>>>>>>> provides a few AsyncDoFn implementations [1] but it'd be great to see this
>>>>>>> as a first-class concept in beam itself.  Doing error handling,
>>>>>>> concurrency, etc correctly can be tricky.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>>>>>
>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> If the input is a CompletionStage<InputT> then the output should
>>>>>>>> also be a CompletionStage<OutputT>, since all you should do is async
>>>>>>>> chaining. We could enforce this by giving the DoFn an
>>>>>>>> OutputReceiver(CompletionStage<OutputT>).
>>>>>>>>
>>>>>>>> Another possibility that might be even more robust against poor
>>>>>>>> future use could be process(@Element InputT element, @Output
>>>>>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>>>>>>> itself will be async chained, rather than counting on the user to do the
>>>>>>>> right thing.
>>>>>>>>
>>>>>>>> We should see how these look in real use cases. The way that
>>>>>>>> processing is split between @ProcessElement and @FinishBundle might
>>>>>>>> complicate things.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xi...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi, guys,
>>>>>>>>>
>>>>>>>>> As more users try out Beam running on the SamzaRunner, we got a
>>>>>>>>> lot of asks for an asynchronous processing API. There are a few reasons for
>>>>>>>>> these asks:
>>>>>>>>>
>>>>>>>>>    - The users here are experienced in asynchronous programming.
>>>>>>>>>    With async frameworks such as Netty and ParSeq and libs like async jersey
>>>>>>>>>    client, they are able to make remote calls efficiently and the libraries
>>>>>>>>>    help manage the execution threads underneath. Async remote calls are very
>>>>>>>>>    common in most of our streaming applications today.
>>>>>>>>>    - Many jobs are running on a multi-tenancy cluster. Async
>>>>>>>>>    processing helps for less resource usage and fast computation (less context
>>>>>>>>>    switch).
>>>>>>>>>
>>>>>>>>> I asked about the async support in a previous email thread. The
>>>>>>>>> following API was mentioned in the reply:
>>>>>>>>>
>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>     @ProcessElement
>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>> ...) {
>>>>>>>>>       element.thenApply(...)
>>>>>>>>>     }
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>> We are wondering whether there are any discussions on this API and
>>>>>>>>> related docs. It is awesome that you guys already considered having DoFn to
>>>>>>>>> process asynchronously. Out of curiosity, this API seems to create a
>>>>>>>>> CompletionState out of the input element (probably using framework's
>>>>>>>>> executor) and then allow user to chain on it. To us, it seems more
>>>>>>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>>>>>>>> CompletionStage<OutputT> to invoke upon completion.
>>>>>>>>>
>>>>>>>>> We would like to discuss further on the async API and hopefully we
>>>>>>>>> will have a great support in Beam. Really appreciate the feedback!
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Xinyu
>>>>>>>>>
>>>>>>>>
>
> --
>
>
>
>
> Got feedback? tinyurl.com/swegner-feedback
>

Re: [DISCUSSION] ParDo Async Java API

Posted by Scott Wegner <sc...@apache.org>.
A related question is how to make execution observable such that a runner
can make proper scaling decisions. Runners decide how to schedule bundles
within and across multiple worker instances, and can use information about
execution to make dynamic scaling decisions. First-class async APIs seem
like they would encourage DoFn authors to implement their own
parallelization, rather than deferring to the runner that should be more
capable of providing the right level of parallelism.

In the Dataflow worker harness, we estimate execution time to PTransform
steps by sampling execution time on the execution thread and attributing it
to the currently invoked method. This approach is fairly simple and
possible because we assume that execution happens within the thread
controlled by the runner. Some DoFn's already implement their own async
logic and break this assumption; I would expect more if we make async built
into the DoFn APIs.

So: this isn't an argument against async APIs, but rather: does this break
execution observability, and are there other lightweight mechanisms for
attributing execution time of async work?

On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <kl...@google.com> wrote:

> When executed over the portable APIs, it will be primarily the Java SDK
> harness that makes all of these decisions. If we wanted runners to have
> some insight into it we would have to add it to the Beam model protos. I
> don't have any suggestions there, so I would leave it out of this
> discussion until there's good ideas. We could learn a lot by trying it out
> just in the SDK harness.
>
> Kenn
>
> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xi...@gmail.com> wrote:
>
>> I don't have a strong opinion on the resolution of the futures regarding
>> to @FinishBundle invocation. Leaving it to be unspecified does give runners
>> more room to implement it with their own support.
>>
>> Optimization is also another great point. Fuse seems pretty complex to me
>> too if we need to find a way to chain the resulting future into the next
>> transform, or leave the async transform as a standalone stage initially?
>>
>> Btw, I was counting the number of replies before we hit the portability.
>> Seems after 4 replies fuse finally showed up :).
>>
>> Thanks,
>> Xinyu
>>
>>
>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <kl...@google.com> wrote:
>>
>>>
>>>
>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com wrote:
>>>
>>>>
>>>>
>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xi...@gmail.com>
>>>> wrote:
>>>>
>>>>> @Steve: it's good to see that this is going to be useful in your use
>>>>> cases as well. Thanks for sharing the code from Scio! I can see in your
>>>>> implementation that waiting for the future completion is part of the
>>>>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>>>>> async support so the user-level code won't need to implement this logic,
>>>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>>>>> after future completion[1], and Flink also has AsyncFunction api [2] which
>>>>> provides a ResultFuture similar to the API we discussed.
>>>>>
>>>>
>>>> Can this be done correctly? What I mean is that if the process dies,
>>>> can you guarantee that no data is lost? Beam currently guarantees this for
>>>> FinishBundle, but if you use an arbitrary async framework this might not be
>>>> true.
>>>>
>>>
>>> What a Beam runner guarantees is that *if* the bundle is committed,
>>> *then* finishbundle has run. So it seems just as easy to say *if* a bundle
>>> is committed, *then* every async result has been resolved.
>>>
>>> If the process dies the two cases should be naturally analogous.
>>>
>>> But it raises the question of whether they should be resolved prior to
>>> finishbundle, after, or unspecified. I lean toward unspecified.
>>>
>>> That's for a single ParDo. Where this could get complex is optimizing
>>> fused stages for greater asynchrony.
>>>
>>> Kenn
>>>
>>>
>>>>
>>>>> A simple use case for this is to execute a Runnable asynchronously in
>>>>> user's own executor. The following code illustrates Kenn's option #2, with
>>>>> a very simple single-thread pool being the executor:
>>>>>
>>>>> new DoFn<InputT, OutputT>() {
>>>>>   @ProcessElement
>>>>>   public void process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>>>>     CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
>>>>>         () -> someOutput,
>>>>>         Executors.newSingleThreadExecutor());
>>>>>     outputReceiver.output(future);
>>>>>   }
>>>>> }
>>>>>
>>>>> The neat thing about this API is that the user can choose their own async framework and we only expect the output to be a CompletionStage.
>>>>>
>>>>>
>>>>> For the implementation of bundling, can we compose a CompletableFuture from each element in the bundle, e.g. CompletableFuture.allOf(...), and then invoke @FinishBundle when this future is complete? Seems this might work.
>>>>>
>>>>> Thanks,
>>>>> Xinyu
>>>>>
>>>>>
>>>>> [1]
>>>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>>>
>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sn...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> I'd love to see something like this as well.  Also +1 to process(@Element
>>>>>> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I
>>>>>> don't know if there's much benefit to passing a future in, since the
>>>>>> framework itself could hook up the process function to complete when the
>>>>>> future completes.
>>>>>>
>>>>>> I feel like I've spent a bunch of time writing very similar "kick off
>>>>>> a future in ProcessElement, join it in FinishBundle" code, and looking
>>>>>> around beam itself a lot of built-in transforms do it as well.  Scio
>>>>>> provides a few AsyncDoFn implementations [1] but it'd be great to see this
>>>>>> as a first-class concept in beam itself.  Doing error handling,
>>>>>> concurrency, etc correctly can be tricky.
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>>>>
>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> If the input is a CompletionStage<InputT> then the output should
>>>>>>> also be a CompletionStage<OutputT>, since all you should do is async
>>>>>>> chaining. We could enforce this by giving the DoFn an
>>>>>>> OutputReceiver(CompletionStage<OutputT>).
>>>>>>>
>>>>>>> Another possibility that might be even more robust against poor
>>>>>>> future use could be process(@Element InputT element, @Output
>>>>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>>>>>> itself will be async chained, rather than counting on the user to do the
>>>>>>> right thing.
>>>>>>>
>>>>>>> We should see how these look in real use cases. The way that
>>>>>>> processing is split between @ProcessElement and @FinishBundle might
>>>>>>> complicate things.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xi...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi, guys,
>>>>>>>>
>>>>>>>> As more users try out Beam running on the SamzaRunner, we got a lot
>>>>>>>> of asks for an asynchronous processing API. There are a few reasons for
>>>>>>>> these asks:
>>>>>>>>
>>>>>>>>    - The users here are experienced in asynchronous programming.
>>>>>>>>    With async frameworks such as Netty and ParSeq and libs like async jersey
>>>>>>>>    client, they are able to make remote calls efficiently and the libraries
>>>>>>>>    help manage the execution threads underneath. Async remote calls are very
>>>>>>>>    common in most of our streaming applications today.
>>>>>>>>    - Many jobs are running on a multi-tenancy cluster. Async
>>>>>>>>    processing helps for less resource usage and fast computation (less context
>>>>>>>>    switch).
>>>>>>>>
>>>>>>>> I asked about the async support in a previous email thread. The
>>>>>>>> following API was mentioned in the reply:
>>>>>>>>
>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>     @ProcessElement
>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>> ...) {
>>>>>>>>       element.thenApply(...)
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>>
>>>>>>>> We are wondering whether there are any discussions on this API and
>>>>>>>> related docs. It is awesome that you guys already considered having DoFn to
>>>>>>>> process asynchronously. Out of curiosity, this API seems to create a
>>>>>>>> CompletionState out of the input element (probably using framework's
>>>>>>>> executor) and then allow user to chain on it. To us, it seems more
>>>>>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>>>>>>> CompletionStage<OutputT> to invoke upon completion.
>>>>>>>>
>>>>>>>> We would like to discuss further on the async API and hopefully we
>>>>>>>> will have a great support in Beam. Really appreciate the feedback!
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Xinyu
>>>>>>>>
>>>>>>>

-- 




Got feedback? tinyurl.com/swegner-feedback

Re: [DISCUSSION] ParDo Async Java API

Posted by Kenneth Knowles <kl...@google.com>.
When executed over the portable APIs, it will be primarily the Java SDK
harness that makes all of these decisions. If we wanted runners to have
some insight into it we would have to add it to the Beam model protos. I
don't have any suggestions there, so I would leave it out of this
discussion until there's good ideas. We could learn a lot by trying it out
just in the SDK harness.

Kenn

On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xi...@gmail.com> wrote:

> I don't have a strong opinion on the resolution of the futures regarding
> to @FinishBundle invocation. Leaving it to be unspecified does give runners
> more room to implement it with their own support.
>
> Optimization is also another great point. Fuse seems pretty complex to me
> too if we need to find a way to chain the resulting future into the next
> transform, or leave the async transform as a standalone stage initially?
>
> Btw, I was counting the number of replies before we hit the portability.
> Seems after 4 replies fuse finally showed up :).
>
> Thanks,
> Xinyu
>
>
> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <kl...@google.com> wrote:
>
>>
>>
>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com wrote:
>>
>>>
>>>
>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xi...@gmail.com> wrote:
>>>
>>>> @Steve: it's good to see that this is going to be useful in your use
>>>> cases as well. Thanks for sharing the code from Scio! I can see in your
>>>> implementation that waiting for the future completion is part of the
>>>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>>>> async support so the user-level code won't need to implement this logic,
>>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>>>> after future completion[1], and Flink also has AsyncFunction api [2] which
>>>> provides a ResultFuture similar to the API we discussed.
>>>>
>>>
>>> Can this be done correctly? What I mean is that if the process dies, can
>>> you guarantee that no data is lost? Beam currently guarantees this for
>>> FinishBundle, but if you use an arbitrary async framework this might not be
>>> true.
>>>
>>
>> What a Beam runner guarantees is that *if* the bundle is committed,
>> *then* finishbundle has run. So it seems just as easy to say *if* a bundle
>> is committed, *then* every async result has been resolved.
>>
>> If the process dies the two cases should be naturally analogous.
>>
>> But it raises the question of whether they should be resolved prior to
>> finishbundle, after, or unspecified. I lean toward unspecified.
>>
>> That's for a single ParDo. Where this could get complex is optimizing
>> fused stages for greater asynchrony.
>>
>> Kenn
>>
>>
>>>
>>>> A simple use case for this is to execute a Runnable asynchronously in
>>>> user's own executor. The following code illustrates Kenn's option #2, with
>>>> a very simple single-thread pool being the executor:
>>>>
>>>> new DoFn<InputT, OutputT>() {
>>>>   @ProcessElement
>>>>   public void process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>>>     CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
>>>>         () -> someOutput,
>>>>         Executors.newSingleThreadExecutor());
>>>>     outputReceiver.output(future);
>>>>   }
>>>> }
>>>>
>>>> The neat thing about this API is that the user can choose their own async framework and we only expect the output to be a CompletionStage.
>>>>
>>>>
>>>> For the implementation of bundling, can we compose a CompletableFuture from each element in the bundle, e.g. CompletableFuture.allOf(...), and then invoke @FinishBundle when this future is complete? Seems this might work.
>>>>
>>>> Thanks,
>>>> Xinyu
>>>>
>>>>
>>>> [1]
>>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>>
>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sn...@apache.org>
>>>> wrote:
>>>>
>>>>> I'd love to see something like this as well.  Also +1 to process(@Element
>>>>> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I
>>>>> don't know if there's much benefit to passing a future in, since the
>>>>> framework itself could hook up the process function to complete when the
>>>>> future completes.
>>>>>
>>>>> I feel like I've spent a bunch of time writing very similar "kick off
>>>>> a future in ProcessElement, join it in FinishBundle" code, and looking
>>>>> around beam itself a lot of built-in transforms do it as well.  Scio
>>>>> provides a few AsyncDoFn implementations [1] but it'd be great to see this
>>>>> as a first-class concept in beam itself.  Doing error handling,
>>>>> concurrency, etc correctly can be tricky.
>>>>>
>>>>> [1]
>>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>>>
>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com>
>>>>> wrote:
>>>>>
>>>>>> If the input is a CompletionStage<InputT> then the output should also
>>>>>> be a CompletionStage<OutputT>, since all you should do is async chaining.
>>>>>> We could enforce this by giving the DoFn an
>>>>>> OutputReceiver(CompletionStage<OutputT>).
>>>>>>
>>>>>> Another possibility that might be even more robust against poor
>>>>>> future use could be process(@Element InputT element, @Output
>>>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>>>>> itself will be async chained, rather than counting on the user to do the
>>>>>> right thing.
>>>>>>
>>>>>> We should see how these look in real use cases. The way that
>>>>>> processing is split between @ProcessElement and @FinishBundle might
>>>>>> complicate things.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi, guys,
>>>>>>>
>>>>>>> As more users try out Beam running on the SamzaRunner, we got a lot
>>>>>>> of asks for an asynchronous processing API. There are a few reasons for
>>>>>>> these asks:
>>>>>>>
>>>>>>>    - The users here are experienced in asynchronous programming.
>>>>>>>    With async frameworks such as Netty and ParSeq and libs like async jersey
>>>>>>>    client, they are able to make remote calls efficiently and the libraries
>>>>>>>    help manage the execution threads underneath. Async remote calls are very
>>>>>>>    common in most of our streaming applications today.
>>>>>>>    - Many jobs are running on a multi-tenancy cluster. Async
>>>>>>>    processing helps for less resource usage and fast computation (less context
>>>>>>>    switch).
>>>>>>>
>>>>>>> I asked about the async support in a previous email thread. The
>>>>>>> following API was mentioned in the reply:
>>>>>>>
>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>     @ProcessElement
>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>> ...) {
>>>>>>>       element.thenApply(...)
>>>>>>>     }
>>>>>>>   }
>>>>>>>
>>>>>>> We are wondering whether there are any discussions on this API and
>>>>>>> related docs. It is awesome that you guys already considered having DoFn to
>>>>>>> process asynchronously. Out of curiosity, this API seems to create a
>>>>>>> CompletionState out of the input element (probably using framework's
>>>>>>> executor) and then allow user to chain on it. To us, it seems more
>>>>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>>>>>> CompletionStage<OutputT> to invoke upon completion.
>>>>>>>
>>>>>>> We would like to discuss further on the async API and hopefully we
>>>>>>> will have a great support in Beam. Really appreciate the feedback!
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Xinyu
>>>>>>>
>>>>>>

Re: [DISCUSSION] ParDo Async Java API

Posted by Xinyu Liu <xi...@gmail.com>.
I don't have a strong opinion on the resolution of the futures regarding
to @FinishBundle invocation. Leaving it to be unspecified does give runners
more room to implement it with their own support.

Optimization is also another great point. Fuse seems pretty complex to me
too if we need to find a way to chain the resulting future into the next
transform, or leave the async transform as a standalone stage initially?

Btw, I was counting the number of replies before we hit the portability.
Seems after 4 replies fuse finally showed up :).

Thanks,
Xinyu


On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <kl...@google.com> wrote:

>
>
> On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com wrote:
>
>>
>>
>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xi...@gmail.com> wrote:
>>
>>> @Steve: it's good to see that this is going to be useful in your use
>>> cases as well. Thanks for sharing the code from Scio! I can see in your
>>> implementation that waiting for the future completion is part of the
>>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>>> async support so the user-level code won't need to implement this logic,
>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>>> after future completion[1], and Flink also has AsyncFunction api [2] which
>>> provides a ResultFuture similar to the API we discussed.
>>>
>>
>> Can this be done correctly? What I mean is that if the process dies, can
>> you guarantee that no data is lost? Beam currently guarantees this for
>> FinishBundle, but if you use an arbitrary async framework this might not be
>> true.
>>
>
> What a Beam runner guarantees is that *if* the bundle is committed, *then*
> finishbundle has run. So it seems just as easy to say *if* a bundle is
> committed, *then* every async result has been resolved.
>
> If the process dies the two cases should be naturally analogous.
>
> But it raises the question of whether they should be resolved prior to
> finishbundle, after, or unspecified. I lean toward unspecified.
>
> That's for a single ParDo. Where this could get complex is optimizing
> fused stages for greater asynchrony.
>
> Kenn
>
>
>>
>>> A simple use case for this is to execute a Runnable asynchronously in
>>> user's own executor. The following code illustrates Kenn's option #2, with
>>> a very simple single-thread pool being the executor:
>>>
>>> new DoFn<InputT, OutputT>() {
>>>   @ProcessElement
>>>   public void process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>>     CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
>>>         () -> someOutput,
>>>         Executors.newSingleThreadExecutor());
>>>     outputReceiver.output(future);
>>>   }
>>> }
>>>
>>> The neat thing about this API is that the user can choose their own async framework and we only expect the output to be a CompletionStage.
>>>
>>>
>>> For the implementation of bundling, can we compose a CompletableFuture from each element in the bundle, e.g. CompletableFuture.allOf(...), and then invoke @FinishBundle when this future is complete? Seems this might work.
>>>
>>> Thanks,
>>> Xinyu
>>>
>>>
>>> [1]
>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>
>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sn...@apache.org>
>>> wrote:
>>>
>>>> I'd love to see something like this as well.  Also +1 to process(@Element
>>>> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I
>>>> don't know if there's much benefit to passing a future in, since the
>>>> framework itself could hook up the process function to complete when the
>>>> future completes.
>>>>
>>>> I feel like I've spent a bunch of time writing very similar "kick off a
>>>> future in ProcessElement, join it in FinishBundle" code, and looking around
>>>> beam itself a lot of built-in transforms do it as well.  Scio provides a
>>>> few AsyncDoFn implementations [1] but it'd be great to see this as a
>>>> first-class concept in beam itself.  Doing error handling, concurrency, etc
>>>> correctly can be tricky.
>>>>
>>>> [1]
>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>>
>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com> wrote:
>>>>
>>>>> If the input is a CompletionStage<InputT> then the output should also
>>>>> be a CompletionStage<OutputT>, since all you should do is async chaining.
>>>>> We could enforce this by giving the DoFn an
>>>>> OutputReceiver(CompletionStage<OutputT>).
>>>>>
>>>>> Another possibility that might be even more robust against poor future
>>>>> use could be process(@Element InputT element, @Output
>>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>>>> itself will be async chained, rather than counting on the user to do the
>>>>> right thing.
>>>>>
>>>>> We should see how these look in real use cases. The way that
>>>>> processing is split between @ProcessElement and @FinishBundle might
>>>>> complicate things.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, guys,
>>>>>>
>>>>>> As more users try out Beam running on the SamzaRunner, we got a lot
>>>>>> of asks for an asynchronous processing API. There are a few reasons for
>>>>>> these asks:
>>>>>>
>>>>>>    - The users here are experienced in asynchronous programming.
>>>>>>    With async frameworks such as Netty and ParSeq and libs like async jersey
>>>>>>    client, they are able to make remote calls efficiently and the libraries
>>>>>>    help manage the execution threads underneath. Async remote calls are very
>>>>>>    common in most of our streaming applications today.
>>>>>>    - Many jobs are running on a multi-tenancy cluster. Async
>>>>>>    processing helps for less resource usage and fast computation (less context
>>>>>>    switch).
>>>>>>
>>>>>> I asked about the async support in a previous email thread. The
>>>>>> following API was mentioned in the reply:
>>>>>>
>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>     @ProcessElement
>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>> ...) {
>>>>>>       element.thenApply(...)
>>>>>>     }
>>>>>>   }
>>>>>>
>>>>>> We are wondering whether there are any discussions on this API and
>>>>>> related docs. It is awesome that you guys already considered having DoFn to
>>>>>> process asynchronously. Out of curiosity, this API seems to create a
>>>>>> CompletionState out of the input element (probably using framework's
>>>>>> executor) and then allow user to chain on it. To us, it seems more
>>>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>>>>> CompletionStage<OutputT> to invoke upon completion.
>>>>>>
>>>>>> We would like to discuss further on the async API and hopefully we
>>>>>> will have a great support in Beam. Really appreciate the feedback!
>>>>>>
>>>>>> Thanks,
>>>>>> Xinyu
>>>>>>
>>>>>

Re: [DISCUSSION] ParDo Async Java API

Posted by Kenneth Knowles <kl...@google.com>.
On Tue, Jan 22, 2019, 17:23 Reuven Lax <relax@google.com wrote:

>
>
> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xi...@gmail.com> wrote:
>
>> @Steve: it's good to see that this is going to be useful in your use
>> cases as well. Thanks for sharing the code from Scio! I can see in your
>> implementation that waiting for the future completion is part of the
>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>> async support so the user-level code won't need to implement this logic,
>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>> after future completion[1], and Flink also has AsyncFunction api [2] which
>> provides a ResultFuture similar to the API we discussed.
>>
>
> Can this be done correctly? What I mean is that if the process dies, can
> you guarantee that no data is lost? Beam currently guarantees this for
> FinishBundle, but if you use an arbitrary async framework this might not be
> true.
>

What a Beam runner guarantees is that *if* the bundle is committed, *then*
finishbundle has run. So it seems just as easy to say *if* a bundle is
committed, *then* every async result has been resolved.

If the process dies the two cases should be naturally analogous.

But it raises the question of whether they should be resolved prior to
finishbundle, after, or unspecified. I lean toward unspecified.

That's for a single ParDo. Where this could get complex is optimizing fused
stages for greater asynchrony.

Kenn


>
>> A simple use case for this is to execute a Runnable asynchronously in
>> user's own executor. The following code illustrates Kenn's option #2, with
>> a very simple single-thread pool being the executor:
>>
>> new DoFn<InputT, OutputT>() {
>>   @ProcessElement
>>   public void process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>     CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
>>         () -> someOutput,
>>         Executors.newSingleThreadExecutor());
>>     outputReceiver.output(future);
>>   }
>> }
>>
>> The neat thing about this API is that the user can choose their own async framework and we only expect the output to be a CompletionStage.
>>
>>
>> For the implementation of bundling, can we compose a CompletableFuture from each element in the bundle, e.g. CompletableFuture.allOf(...), and then invoke @FinishBundle when this future is complete? Seems this might work.
>>
>> Thanks,
>> Xinyu
>>
>>
>> [1]
>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>
>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sn...@apache.org>
>> wrote:
>>
>>> I'd love to see something like this as well.  Also +1 to process(@Element
>>> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I
>>> don't know if there's much benefit to passing a future in, since the
>>> framework itself could hook up the process function to complete when the
>>> future completes.
>>>
>>> I feel like I've spent a bunch of time writing very similar "kick off a
>>> future in ProcessElement, join it in FinishBundle" code, and looking around
>>> beam itself a lot of built-in transforms do it as well.  Scio provides a
>>> few AsyncDoFn implementations [1] but it'd be great to see this as a
>>> first-class concept in beam itself.  Doing error handling, concurrency, etc
>>> correctly can be tricky.
>>>
>>> [1]
>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>
>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com> wrote:
>>>
>>>> If the input is a CompletionStage<InputT> then the output should also
>>>> be a CompletionStage<OutputT>, since all you should do is async chaining.
>>>> We could enforce this by giving the DoFn an
>>>> OutputReceiver(CompletionStage<OutputT>).
>>>>
>>>> Another possibility that might be even more robust against poor future
>>>> use could be process(@Element InputT element, @Output
>>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>>> itself will be async chained, rather than counting on the user to do the
>>>> right thing.
>>>>
>>>> We should see how these look in real use cases. The way that processing
>>>> is split between @ProcessElement and @FinishBundle might complicate things.
>>>>
>>>> Kenn
>>>>
>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi, guys,
>>>>>
>>>>> As more users try out Beam running on the SamzaRunner, we got a lot of
>>>>> asks for an asynchronous processing API. There are a few reasons for these
>>>>> asks:
>>>>>
>>>>>    - The users here are experienced in asynchronous programming. With
>>>>>    async frameworks such as Netty and ParSeq and libs like async jersey
>>>>>    client, they are able to make remote calls efficiently and the libraries
>>>>>    help manage the execution threads underneath. Async remote calls are very
>>>>>    common in most of our streaming applications today.
>>>>>    - Many jobs are running on a multi-tenancy cluster. Async
>>>>>    processing helps for less resource usage and fast computation (less context
>>>>>    switch).
>>>>>
>>>>> I asked about the async support in a previous email thread. The
>>>>> following API was mentioned in the reply:
>>>>>
>>>>>   new DoFn<InputT, OutputT>() {
>>>>>     @ProcessElement
>>>>>     public void process(@Element CompletionStage<InputT> element, ...)
>>>>> {
>>>>>       element.thenApply(...)
>>>>>     }
>>>>>   }
>>>>>
>>>>> We are wondering whether there are any discussions on this API and
>>>>> related docs. It is awesome that you guys already considered having DoFn to
>>>>> process asynchronously. Out of curiosity, this API seems to create a
>>>>> CompletionState out of the input element (probably using framework's
>>>>> executor) and then allow user to chain on it. To us, it seems more
>>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>>>> CompletionStage<OutputT> to invoke upon completion.
>>>>>
>>>>> We would like to discuss further on the async API and hopefully we
>>>>> will have a great support in Beam. Really appreciate the feedback!
>>>>>
>>>>> Thanks,
>>>>> Xinyu
>>>>>
>>>>

Re: [DISCUSSION] ParDo Async Java API

Posted by Reuven Lax <re...@google.com>.
On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xi...@gmail.com> wrote:

> @Steve: it's good to see that this is going to be useful in your use cases
> as well. Thanks for sharing the code from Scio! I can see in your
> implementation that waiting for the future completion is part of the
> @FinishBundle. We are thinking of taking advantage of the underlying runner
> async support so the user-level code won't need to implement this logic,
> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
> after future completion[1], and Flink also has AsyncFunction api [2] which
> provides a ResultFuture similar to the API we discussed.
>

Can this be done correctly? What I mean is that if the process dies, can
you guarantee that no data is lost? Beam currently guarantees this for
FinishBundle, but if you use an arbitrary async framework this might not be
true.


> A simple use case for this is to execute a Runnable asynchronously in
> user's own executor. The following code illustrates Kenn's option #2, with
> a very simple single-thread pool being the executor:
>
> new DoFn<InputT, OutputT>() {
>   @ProcessElement
>   public void process(@Element InputT element, @Output OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>     CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
>         () -> someOutput,
>         Executors.newSingleThreadExecutor());
>     outputReceiver.output(future);
>   }
> }
>
> The neat thing about this API is that the user can choose their own async framework and we only expect the output to be a CompletionStage.
>
>
> For the implementation of bundling, can we compose a CompletableFuture from each element in the bundle, e.g. CompletableFuture.allOf(...), and then invoke @FinishBundle when this future is complete? Seems this might work.
>
> Thanks,
> Xinyu
>
>
> [1]
> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>
> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sn...@apache.org> wrote:
>
>> I'd love to see something like this as well.  Also +1 to process(@Element
>> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I
>> don't know if there's much benefit to passing a future in, since the
>> framework itself could hook up the process function to complete when the
>> future completes.
>>
>> I feel like I've spent a bunch of time writing very similar "kick off a
>> future in ProcessElement, join it in FinishBundle" code, and looking around
>> beam itself a lot of built-in transforms do it as well.  Scio provides a
>> few AsyncDoFn implementations [1] but it'd be great to see this as a
>> first-class concept in beam itself.  Doing error handling, concurrency, etc
>> correctly can be tricky.
>>
>> [1]
>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>
>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com> wrote:
>>
>>> If the input is a CompletionStage<InputT> then the output should also be
>>> a CompletionStage<OutputT>, since all you should do is async chaining. We
>>> could enforce this by giving the DoFn an
>>> OutputReceiver(CompletionStage<OutputT>).
>>>
>>> Another possibility that might be even more robust against poor future
>>> use could be process(@Element InputT element, @Output
>>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>>> itself will be async chained, rather than counting on the user to do the
>>> right thing.
>>>
>>> We should see how these look in real use cases. The way that processing
>>> is split between @ProcessElement and @FinishBundle might complicate things.
>>>
>>> Kenn
>>>
>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xi...@gmail.com>
>>> wrote:
>>>
>>>> Hi, guys,
>>>>
>>>> As more users try out Beam running on the SamzaRunner, we got a lot of
>>>> asks for an asynchronous processing API. There are a few reasons for these
>>>> asks:
>>>>
>>>>    - The users here are experienced in asynchronous programming. With
>>>>    async frameworks such as Netty and ParSeq and libs like async jersey
>>>>    client, they are able to make remote calls efficiently and the libraries
>>>>    help manage the execution threads underneath. Async remote calls are very
>>>>    common in most of our streaming applications today.
>>>>    - Many jobs are running on a multi-tenancy cluster. Async
>>>>    processing helps for less resource usage and fast computation (less context
>>>>    switch).
>>>>
>>>> I asked about the async support in a previous email thread. The
>>>> following API was mentioned in the reply:
>>>>
>>>>   new DoFn<InputT, OutputT>() {
>>>>     @ProcessElement
>>>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>>>       element.thenApply(...)
>>>>     }
>>>>   }
>>>>
>>>> We are wondering whether there are any discussions on this API and
>>>> related docs. It is awesome that you guys already considered having DoFn to
>>>> process asynchronously. Out of curiosity, this API seems to create a
>>>> CompletionState out of the input element (probably using framework's
>>>> executor) and then allow user to chain on it. To us, it seems more
>>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>>> CompletionStage<OutputT> to invoke upon completion.
>>>>
>>>> We would like to discuss further on the async API and hopefully we will
>>>> have a great support in Beam. Really appreciate the feedback!
>>>>
>>>> Thanks,
>>>> Xinyu
>>>>
>>>

Re: [DISCUSSION] ParDo Async Java API

Posted by Xinyu Liu <xi...@gmail.com>.
@Steve: it's good to see that this is going to be useful in your use cases
as well. Thanks for sharing the code from Scio! I can see in your
implementation that waiting for the future completion is part of the
@FinishBundle. We are thinking of taking advantage of the underlying runner
async support so the user-level code won't need to implement this logic,
e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
after future completion[1], and Flink also has AsyncFunction api [2] which
provides a ResultFuture similar to the API we discussed.

A simple use case for this is to execute a Runnable asynchronously in
user's own executor. The following code illustrates Kenn's option #2, with
a very simple single-thread pool being the executor:

new DoFn<InputT, OutputT>() {
  @ProcessElement
  public void process(@Element InputT element, @Output
OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
    CompletableFuture<OutputT> future = CompletableFuture.supplyAsync(
        () -> someOutput,
        Executors.newSingleThreadExecutor());
    outputReceiver.output(future);
  }
}

The neat thing about this API is that the user can choose their own
async framework and we only expect the output to be a CompletionStage.


For the implementation of bundling, can we compose a CompletableFuture
from each element in the bundle, e.g. CompletableFuture.allOf(...),
and then invoke @FinishBundle when this future is complete? Seems this
might work.

Thanks,
Xinyu


[1]
https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz <sn...@apache.org> wrote:

> I'd love to see something like this as well.  Also +1 to process(@Element
> InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I
> don't know if there's much benefit to passing a future in, since the
> framework itself could hook up the process function to complete when the
> future completes.
>
> I feel like I've spent a bunch of time writing very similar "kick off a
> future in ProcessElement, join it in FinishBundle" code, and looking around
> beam itself a lot of built-in transforms do it as well.  Scio provides a
> few AsyncDoFn implementations [1] but it'd be great to see this as a
> first-class concept in beam itself.  Doing error handling, concurrency, etc
> correctly can be tricky.
>
> [1]
> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>
> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com> wrote:
>
>> If the input is a CompletionStage<InputT> then the output should also be
>> a CompletionStage<OutputT>, since all you should do is async chaining. We
>> could enforce this by giving the DoFn an
>> OutputReceiver(CompletionStage<OutputT>).
>>
>> Another possibility that might be even more robust against poor future
>> use could be process(@Element InputT element, @Output
>> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
>> itself will be async chained, rather than counting on the user to do the
>> right thing.
>>
>> We should see how these look in real use cases. The way that processing
>> is split between @ProcessElement and @FinishBundle might complicate things.
>>
>> Kenn
>>
>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xi...@gmail.com> wrote:
>>
>>> Hi, guys,
>>>
>>> As more users try out Beam running on the SamzaRunner, we got a lot of
>>> asks for an asynchronous processing API. There are a few reasons for these
>>> asks:
>>>
>>>    - The users here are experienced in asynchronous programming. With
>>>    async frameworks such as Netty and ParSeq and libs like async jersey
>>>    client, they are able to make remote calls efficiently and the libraries
>>>    help manage the execution threads underneath. Async remote calls are very
>>>    common in most of our streaming applications today.
>>>    - Many jobs are running on a multi-tenancy cluster. Async processing
>>>    helps for less resource usage and fast computation (less context switch).
>>>
>>> I asked about the async support in a previous email thread. The
>>> following API was mentioned in the reply:
>>>
>>>   new DoFn<InputT, OutputT>() {
>>>     @ProcessElement
>>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>>       element.thenApply(...)
>>>     }
>>>   }
>>>
>>> We are wondering whether there are any discussions on this API and
>>> related docs. It is awesome that you guys already considered having DoFn to
>>> process asynchronously. Out of curiosity, this API seems to create a
>>> CompletionState out of the input element (probably using framework's
>>> executor) and then allow user to chain on it. To us, it seems more
>>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>>> CompletionStage<OutputT> to invoke upon completion.
>>>
>>> We would like to discuss further on the async API and hopefully we will
>>> have a great support in Beam. Really appreciate the feedback!
>>>
>>> Thanks,
>>> Xinyu
>>>
>>

Re: [DISCUSSION] ParDo Async Java API

Posted by Steve Niemitz <sn...@apache.org>.
I'd love to see something like this as well.  Also +1 to process(@Element
InputT element, @Output OutputReceiver<CompletionStage<OutputT>>).  I don't
know if there's much benefit to passing a future in, since the framework
itself could hook up the process function to complete when the future
completes.

I feel like I've spent a bunch of time writing very similar "kick off a
future in ProcessElement, join it in FinishBundle" code, and looking around
beam itself a lot of built-in transforms do it as well.  Scio provides a
few AsyncDoFn implementations [1] but it'd be great to see this as a
first-class concept in beam itself.  Doing error handling, concurrency, etc
correctly can be tricky.

[1]
https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java

On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles <kl...@google.com> wrote:

> If the input is a CompletionStage<InputT> then the output should also be a
> CompletionStage<OutputT>, since all you should do is async chaining. We
> could enforce this by giving the DoFn an
> OutputReceiver(CompletionStage<OutputT>).
>
> Another possibility that might be even more robust against poor future use
> could be process(@Element InputT element, @Output
> OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
> itself will be async chained, rather than counting on the user to do the
> right thing.
>
> We should see how these look in real use cases. The way that processing is
> split between @ProcessElement and @FinishBundle might complicate things.
>
> Kenn
>
> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xi...@gmail.com> wrote:
>
>> Hi, guys,
>>
>> As more users try out Beam running on the SamzaRunner, we got a lot of
>> asks for an asynchronous processing API. There are a few reasons for these
>> asks:
>>
>>    - The users here are experienced in asynchronous programming. With
>>    async frameworks such as Netty and ParSeq and libs like async jersey
>>    client, they are able to make remote calls efficiently and the libraries
>>    help manage the execution threads underneath. Async remote calls are very
>>    common in most of our streaming applications today.
>>    - Many jobs are running on a multi-tenancy cluster. Async processing
>>    helps for less resource usage and fast computation (less context switch).
>>
>> I asked about the async support in a previous email thread. The following
>> API was mentioned in the reply:
>>
>>   new DoFn<InputT, OutputT>() {
>>     @ProcessElement
>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>       element.thenApply(...)
>>     }
>>   }
>>
>> We are wondering whether there are any discussions on this API and
>> related docs. It is awesome that you guys already considered having DoFn to
>> process asynchronously. Out of curiosity, this API seems to create a
>> CompletionState out of the input element (probably using framework's
>> executor) and then allow user to chain on it. To us, it seems more
>> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
>> CompletionStage<OutputT> to invoke upon completion.
>>
>> We would like to discuss further on the async API and hopefully we will
>> have a great support in Beam. Really appreciate the feedback!
>>
>> Thanks,
>> Xinyu
>>
>

Re: [DISCUSSION] ParDo Async Java API

Posted by Kenneth Knowles <kl...@google.com>.
If the input is a CompletionStage<InputT> then the output should also be a
CompletionStage<OutputT>, since all you should do is async chaining. We
could enforce this by giving the DoFn an
OutputReceiver(CompletionStage<OutputT>).

Another possibility that might be even more robust against poor future use
could be process(@Element InputT element, @Output
OutputReceiver<CompletionStage<OutputT>>). In this way, the process method
itself will be async chained, rather than counting on the user to do the
right thing.

We should see how these look in real use cases. The way that processing is
split between @ProcessElement and @FinishBundle might complicate things.

Kenn

On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu <xi...@gmail.com> wrote:

> Hi, guys,
>
> As more users try out Beam running on the SamzaRunner, we got a lot of
> asks for an asynchronous processing API. There are a few reasons for these
> asks:
>
>    - The users here are experienced in asynchronous programming. With
>    async frameworks such as Netty and ParSeq and libs like async jersey
>    client, they are able to make remote calls efficiently and the libraries
>    help manage the execution threads underneath. Async remote calls are very
>    common in most of our streaming applications today.
>    - Many jobs are running on a multi-tenancy cluster. Async processing
>    helps for less resource usage and fast computation (less context switch).
>
> I asked about the async support in a previous email thread. The following
> API was mentioned in the reply:
>
>   new DoFn<InputT, OutputT>() {
>     @ProcessElement
>     public void process(@Element CompletionStage<InputT> element, ...) {
>       element.thenApply(...)
>     }
>   }
>
> We are wondering whether there are any discussions on this API and related
> docs. It is awesome that you guys already considered having DoFn to process
> asynchronously. Out of curiosity, this API seems to create a
> CompletionState out of the input element (probably using framework's
> executor) and then allow user to chain on it. To us, it seems more
> convenient if the DoFn output a CompletionStage<OutputT> or pass in a
> CompletionStage<OutputT> to invoke upon completion.
>
> We would like to discuss further on the async API and hopefully we will
> have a great support in Beam. Really appreciate the feedback!
>
> Thanks,
> Xinyu
>